#!/usr/bin/env python3 """ Import Markdown files from workspace/memory/ into Second Brain DB. Reads daily notes (YYYY-MM-DD.md) and topic files (topic-*.md), splits into engrams by headers, and stores them with proper metadata. """ from __future__ import annotations import hashlib import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional # Add second-brain src to path BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain") sys.path.insert(0, str(BRAIN_DIR)) from src.store import EngramStore from src.engram import Engram, Grounding import sqlite3 WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" STATE_PATH = MEMORY_DIR / "ingest_state.json" def _load_json(path: Path, default: Any) -> Any: try: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def _save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def _compute_hash(content: str) -> str: return hashlib.sha256(content.strip().encode("utf-8")).hexdigest()[:16] def _slugify(text: str) -> str: slug = re.sub(r"[^a-zA-Z0-9]+", "_", text).strip("_").lower() return slug[:50] if slug else "untitled" def _parse_frontmatter_and_body(md: str) -> tuple[Optional[Dict[str, Any]], str]: frontmatter = {} body = md if md.startswith("---"): parts = md.split("---", 2) if len(parts) >= 3: try: frontmatter = json.loads(parts[1]) body = parts[2].strip() except Exception: frontmatter = {} return frontmatter, body def _split_by_headers(md: str, filename: str) -> List[Dict[str, Any]]: """ Split markdown into sections by headers. For files starting with 'topic-' (context-buffer topics), H1 is treated as a section title. For daily notes (YYYY-MM-DD*.md), H1 is skipped (date header). """ is_topic = filename.startswith("topic-") lines = md.splitlines(keepends=True) current_title = None current_content = [] sections = [] for line in lines: if line.startswith("# "): if is_topic: title = line[2:].strip() if current_title is not None: sections.append({"title": current_title, "content": "".join(current_content).strip()}) current_title = title current_content = [] else: # Daily note: skip H1 (date header) current_title = None current_content = [] # Note: lines after H1 will be ignored until a H2 appears elif line.startswith("## "): title = line[3:].strip() if current_title is not None: sections.append({"title": current_title, "content": "".join(current_content).strip()}) current_title = title current_content = [] else: if current_title is not None: current_content.append(line) if current_title is not None: sections.append({"title": current_title, "content": "".join(current_content).strip()}) if not sections and md.strip(): return [{"title": None, "content": md.strip()}] return sections def _parse_date_from_filename(filename: str) -> Optional[datetime]: m = re.search(r"(\d{4}-\d{2}-\d{2})", filename) if m: try: return datetime.strptime(m.group(1), "%Y-%m-%d").replace(tzinfo=timezone.utc) except Exception: pass return None def _find_link_suggestions(store: EngramStore, new_id: str, new_tags: List[str]) -> List[Dict[str, Any]]: """Find existing engrams that share at least 2 tags with the new one. Returns a list of suggestion dicts: { "engram_id": ..., "common_tags": [...] } """ if not new_tags: return [] # Get all engrams (could be optimized with index) all_egs = store.get_all(limit=5000) # limit for performance suggestions = [] new_tag_set = set(new_tags) for eg in all_egs: if str(eg.id) == new_id: continue eg_tags = set(eg.metadata.get("tags", [])) common = new_tag_set & eg_tags if len(common) >= 2: suggestions.append({ "engram_id": str(eg.id), "common_tags": list(common), "preview": eg.content[:60], }) # Return top 5 sorted by number of common tags suggestions.sort(key=lambda s: len(s["common_tags"]), reverse=True) return suggestions[:5] def run() -> Dict[str, Any]: state = _load_json(STATE_PATH, {"processed": {}}) processed: Dict[str, str] = state.get("processed", {}) store = EngramStore(str(BRAIN_DIR / "data" / "brain.sqlite")) out = { "success": True, "time": datetime.now(timezone.utc).isoformat(), "files_seen": 0, "files_processed": 0, "sections_saved": 0, "duplicates": 0, "errors": [], "self_healed": 0, "link_suggestions": 0, } # Self-healing: if today's memory file is missing or empty, create a system check entry today = datetime.now(timezone.utc).strftime("%Y-%m-%d") today_md = MEMORY_DIR / f"{today}.md" if not today_md.exists() or today_md.stat().st_size == 0: try: system_content = f"# System Check\n\nAutomatischer Health-Check Eintrag – {today}\n\n- Uhrzeit: {datetime.now().strftime('%H:%M')}\n- Status: OK\n- Hinweis: Diese Datei wurde automatisch erstellt, um den Datenfluss sicherzustellen." today_md.write_text(system_content, encoding="utf-8") out["self_healed"] += 1 except Exception as e: out["errors"].append(f"Self-healing failed: {e}") for md_path in MEMORY_DIR.glob("*.md"): out["files_seen"] += 1 try: md = md_path.read_text(encoding="utf-8") current_hash = _compute_hash(md) last_hash = processed.get(str(md_path)) if current_hash == last_hash: continue frontmatter, body = _parse_frontmatter_and_body(md) sections = _split_by_headers(body, md_path.name) file_date = _parse_date_from_filename(md_path.name) file_source = frontmatter.get("source") or "memory" file_tags = frontmatter.get("tags", []) if isinstance(file_tags, str): file_tags = [file_tags] base_meta = { "source": file_source, "tags": file_tags, "filepath": str(md_path.relative_to(WORKSPACE)), } for idx, sec in enumerate(sections): title = sec["title"] or (frontmatter.get("title") if idx == 0 else None) or md_path.stem content = sec["content"] if not content.strip(): continue content_hash = _compute_hash(content) if content_hash in [h for h in processed.values() if h != last_hash]: out["duplicates"] += 1 continue tags = list(file_tags) if title: tags.append(_slugify(title)) meta = dict(base_meta) meta["title"] = title meta["section_index"] = idx eg = Engram.create( content=content, source=file_source, tags=tags, grounding=Grounding.ASSUMPTION, ) eg.metadata.update(meta) # Link-Vorschläge generieren (Punkt 1) suggestions = _find_link_suggestions(store, str(eg.id), tags) if suggestions: meta["link_suggestions"] = suggestions out["link_suggestions"] += len(suggestions) store.save(eg) out["sections_saved"] += 1 processed[str(md_path)] = current_hash out["files_processed"] += 1 except Exception as e: out["errors"].append(f"{md_path.name}: {e}") _save_json(STATE_PATH, {"processed": processed, "updated_at": out["time"]}) return out if __name__ == "__main__": res = run() print(json.dumps(res, ensure_ascii=False, indent=2))