#!/usr/bin/env python3 """ Ingest OpenClaw session transcript JSONL directly into the Second-Brain DB. State is tracked with byte offsets per transcript file. Sources are configured via workspace/memory/session_sources.json. """ from __future__ import annotations import json import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List from uuid import NAMESPACE_URL, uuid5 WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" SOURCES_PATH = MEMORY_DIR / "session_sources.json" STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json" BRAIN_DIR = WORKSPACE / "second-brain" DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite"))) import sys sys.path.insert(0, str(BRAIN_DIR)) from src.engram import Engram, Grounding # type: ignore from src.store import EngramStore # type: ignore def _now() -> str: return datetime.now(timezone.utc).isoformat() def _load_json(path: Path, default: Any) -> Any: try: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def _save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def _extract_text(content: Any) -> str: if isinstance(content, str): return content.strip() if isinstance(content, list): parts: List[str] = [] for c in content: if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): parts.append(c["text"]) elif isinstance(c, str): parts.append(c) return "\n".join(p.strip() for p in parts if p and p.strip()).strip() if isinstance(content, dict) and isinstance(content.get("text"), str): return content["text"].strip() return str(content).strip() @dataclass class Source: label: str transcript_path: Path def _load_sources() -> List[Source]: payload = _load_json(SOURCES_PATH, {"sources": []}) sources: List[Source] = [] for item in payload.get("sources", []) if isinstance(payload, dict) else []: if not isinstance(item, dict): continue label = str(item.get("label") or "session") p = item.get("path") if not isinstance(p, str) or not p: continue sources.append(Source(label=label, transcript_path=Path(p))) return sources def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]: with open(path, "rb") as f: f.seek(max(0, int(start_offset))) while True: raw = f.readline() if not raw: break line = raw.decode("utf-8", errors="ignore").strip() if not line: continue yield (f.tell(), line) def run() -> Dict[str, Any]: if not DB_PATH.exists(): return {"success": False, "time": _now(), "error": f"db missing: {DB_PATH}"} sources = _load_sources() state = _load_json(STATE_PATH, {"offsets": {}}) offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {} store = EngramStore(str(DB_PATH)) out = {"success": True, "time": _now(), "sources": len(sources), "messages_saved": 0, "messages_skipped": 0, "errors": []} for src in sources: try: if not src.transcript_path.exists(): continue key = str(src.transcript_path) start = int(offsets.get(key, 0)) for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start): offsets[key] = new_off obj = json.loads(line) if not isinstance(obj, dict) or obj.get("type") != "message": continue msg = obj.get("message") if isinstance(obj.get("message"), dict) else {} role = str(msg.get("role") or "unknown") content = _extract_text(msg.get("content")) if len(content.strip()) < 5: continue mid = str(obj.get("id") or msg.get("id") or msg.get("messageId") or msg.get("message_id") or "") if not mid: mid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{role}:{content[:200]}")) eid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{mid}")) if store.get(eid): out["messages_skipped"] += 1 continue eg = Engram.create( content=f"[transcript:{src.label}] [{role}] [{mid}]\n\n{content}"[:4000], source="session", tags=["session", "openclaw", "transcript", f"role:{role}"], session_id=src.label, confidence=0.55, grounding=Grounding.ASSUMPTION, ) eg.id = uuid5(NAMESPACE_URL, eid) eg.metadata["source"] = "session" eg.metadata["session_id"] = src.label eg.metadata["role"] = role eg.metadata["message_id"] = mid store.save(eg) out["messages_saved"] += 1 except Exception as e: out["success"] = False out["errors"].append(f"{src.transcript_path}: {e}") _save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]}) return out if __name__ == "__main__": print(json.dumps(run(), ensure_ascii=False, indent=2))