Files
second-brain/cron_tasks/ingest_memory.py

250 lines
8.4 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Import Markdown files from workspace/memory/ into Second Brain DB.
Reads daily notes (YYYY-MM-DD.md) and topic files (topic-*.md), splits into
engrams by headers, and stores them with proper metadata.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# Add second-brain src to path
BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain")
sys.path.insert(0, str(BRAIN_DIR))
from src.store import EngramStore
from src.engram import Engram, Grounding
import sqlite3
WORKSPACE = Path("/root/.openclaw/workspace")
MEMORY_DIR = WORKSPACE / "memory"
STATE_PATH = MEMORY_DIR / "ingest_state.json"
def _load_json(path: Path, default: Any) -> Any:
try:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def _save_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _compute_hash(content: str) -> str:
return hashlib.sha256(content.strip().encode("utf-8")).hexdigest()[:16]
def _slugify(text: str) -> str:
slug = re.sub(r"[^a-zA-Z0-9]+", "_", text).strip("_").lower()
return slug[:50] if slug else "untitled"
def _parse_frontmatter_and_body(md: str) -> tuple[Optional[Dict[str, Any]], str]:
frontmatter = {}
body = md
if md.startswith("---"):
parts = md.split("---", 2)
if len(parts) >= 3:
try:
frontmatter = json.loads(parts[1])
body = parts[2].strip()
except Exception:
frontmatter = {}
return frontmatter, body
def _split_by_headers(md: str, filename: str) -> List[Dict[str, Any]]:
"""
Split markdown into sections by headers.
For files starting with 'topic-' (context-buffer topics), H1 is treated as a section title.
For daily notes (YYYY-MM-DD*.md), H1 is skipped (date header).
"""
is_topic = filename.startswith("topic-")
lines = md.splitlines(keepends=True)
current_title = None
current_content = []
sections = []
for line in lines:
if line.startswith("# "):
if is_topic:
title = line[2:].strip()
if current_title is not None:
sections.append({"title": current_title, "content": "".join(current_content).strip()})
current_title = title
current_content = []
else:
# Daily note: skip H1 (date header)
current_title = None
current_content = []
# Note: lines after H1 will be ignored until a H2 appears
elif line.startswith("## "):
title = line[3:].strip()
if current_title is not None:
sections.append({"title": current_title, "content": "".join(current_content).strip()})
current_title = title
current_content = []
else:
if current_title is not None:
current_content.append(line)
if current_title is not None:
sections.append({"title": current_title, "content": "".join(current_content).strip()})
if not sections and md.strip():
return [{"title": None, "content": md.strip()}]
return sections
def _parse_date_from_filename(filename: str) -> Optional[datetime]:
m = re.search(r"(\d{4}-\d{2}-\d{2})", filename)
if m:
try:
return datetime.strptime(m.group(1), "%Y-%m-%d").replace(tzinfo=timezone.utc)
except Exception:
pass
return None
def _find_link_suggestions(store: EngramStore, new_id: str, new_tags: List[str]) -> List[Dict[str, Any]]:
"""Find existing engrams that share at least 2 tags with the new one.
Returns a list of suggestion dicts: { "engram_id": ..., "common_tags": [...] }
"""
if not new_tags:
return []
# Get all engrams (could be optimized with index)
all_egs = store.get_all(limit=5000) # limit for performance
suggestions = []
new_tag_set = set(new_tags)
for eg in all_egs:
if str(eg.id) == new_id:
continue
eg_tags = set(eg.metadata.get("tags", []))
common = new_tag_set & eg_tags
if len(common) >= 2:
suggestions.append({
"engram_id": str(eg.id),
"common_tags": list(common),
"preview": eg.content[:60],
})
# Return top 5 sorted by number of common tags
suggestions.sort(key=lambda s: len(s["common_tags"]), reverse=True)
return suggestions[:5]
def run() -> Dict[str, Any]:
state = _load_json(STATE_PATH, {"processed": {}})
processed: Dict[str, str] = state.get("processed", {})
store = EngramStore(str(BRAIN_DIR / "data" / "brain.sqlite"))
out = {
"success": True,
"time": datetime.now(timezone.utc).isoformat(),
"files_seen": 0,
"files_processed": 0,
"sections_saved": 0,
"duplicates": 0,
"errors": [],
"self_healed": 0,
"link_suggestions": 0,
}
# Self-healing: if today's memory file is missing or empty, create a system check entry
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
today_md = MEMORY_DIR / f"{today}.md"
if not today_md.exists() or today_md.stat().st_size == 0:
try:
system_content = f"# System Check\n\nAutomatischer Health-Check Eintrag {today}\n\n- Uhrzeit: {datetime.now().strftime('%H:%M')}\n- Status: OK\n- Hinweis: Diese Datei wurde automatisch erstellt, um den Datenfluss sicherzustellen."
today_md.write_text(system_content, encoding="utf-8")
out["self_healed"] += 1
except Exception as e:
out["errors"].append(f"Self-healing failed: {e}")
for md_path in MEMORY_DIR.glob("*.md"):
out["files_seen"] += 1
try:
md = md_path.read_text(encoding="utf-8")
current_hash = _compute_hash(md)
last_hash = processed.get(str(md_path))
if current_hash == last_hash:
continue
frontmatter, body = _parse_frontmatter_and_body(md)
sections = _split_by_headers(body, md_path.name)
file_date = _parse_date_from_filename(md_path.name)
file_source = frontmatter.get("source") or "memory"
file_tags = frontmatter.get("tags", [])
if isinstance(file_tags, str):
file_tags = [file_tags]
base_meta = {
"source": file_source,
"tags": file_tags,
"filepath": str(md_path.relative_to(WORKSPACE)),
}
for idx, sec in enumerate(sections):
title = sec["title"] or (frontmatter.get("title") if idx == 0 else None) or md_path.stem
content = sec["content"]
if not content.strip():
continue
content_hash = _compute_hash(content)
if content_hash in [h for h in processed.values() if h != last_hash]:
out["duplicates"] += 1
continue
tags = list(file_tags)
if title:
tags.append(_slugify(title))
meta = dict(base_meta)
meta["title"] = title
meta["section_index"] = idx
eg = Engram.create(
content=content,
source=file_source,
tags=tags,
grounding=Grounding.ASSUMPTION,
)
eg.metadata.update(meta)
# Link-Vorschläge generieren (Punkt 1)
suggestions = _find_link_suggestions(store, str(eg.id), tags)
if suggestions:
meta["link_suggestions"] = suggestions
out["link_suggestions"] += len(suggestions)
store.save(eg)
out["sections_saved"] += 1
processed[str(md_path)] = current_hash
out["files_processed"] += 1
except Exception as e:
out["errors"].append(f"{md_path.name}: {e}")
_save_json(STATE_PATH, {"processed": processed, "updated_at": out["time"]})
return out
if __name__ == "__main__":
res = run()
print(json.dumps(res, ensure_ascii=False, indent=2))