#!/usr/bin/env python3 """ Importiert abgeschlossene Topics aus context-buffer/ als Engramme. Ein Topic gilt als abgeschlossen, wenn es den Status 'done' oder 'completed' hat. """ from __future__ import annotations import json import sqlite3 import subprocess import sys from datetime import datetime, timezone from pathlib import Path BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain") WORKSPACE = Path("/root/.openclaw/workspace") HANDLER = WORKSPACE / "context-buffer" / "handler.py" def run(): # Hole alle Topics mit status done/completed via handler try: result = subprocess.run( ["python3", str(HANDLER), "search", "--status", "done"], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: raise Exception(f"Handler error: {result.stderr}") topics = json.loads(result.stdout) except Exception as e: print(json.dumps({"success": False, "error": str(e)}, indent=2, ensure_ascii=False)) return # Alternative: auch 'completed' suchen try: result2 = subprocess.run( ["python3", str(HANDLER), "search", "--status", "completed"], capture_output=True, text=True, timeout=30 ) if result2.returncode == 0: topics_completed = json.loads(result2.stdout) topics.extend(topics_completed) except Exception: pass if not topics: print(json.dumps({"success": True, "imported": 0, "message": "No completed topics found"}, indent=2, ensure_ascii=False)) return # Import in Second Brain DB_PATH = BRAIN_DIR / "data" / "brain.sqlite" conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row c = conn.cursor() sys.path.insert(0, str(BRAIN_DIR)) from src.store import EngramStore from src.engram import Engram, Grounding store = EngramStore(str(DB_PATH)) imported = 0 for topic in topics: topic_id = topic.get("id") title = topic.get("title", "Untitled Topic") content = topic.get("content", "") if not content.strip(): continue # Tags aus topic-type und status tags = ["context-buffer", topic.get("status", "unknown")] if topic.get("type"): tags.append(topic["type"]) eg = Engram.create( content=content, source="context-buffer", tags=tags, grounding=Grounding.ASSUMPTION, ) eg.metadata.update({ "title": title, "context_buffer_id": topic_id, "imported_from": "context-buffer", "original_status": topic.get("status"), }) store.save(eg) imported += 1 conn.close() print(json.dumps({ "success": True, "time": datetime.now(timezone.utc).isoformat(), "topics_found": len(topics), "imported": imported, }, indent=2, ensure_ascii=False)) if __name__ == "__main__": import sys sys.path.insert(0, str(BRAIN_DIR)) run()