Files
second-brain/cron_tasks/ingest_transcript_to_db.py

161 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
Ingest OpenClaw session transcript JSONL directly into the Second-Brain DB.
State is tracked with byte offsets per transcript file.
Sources are configured via workspace/memory/session_sources.json.
"""
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List
from uuid import NAMESPACE_URL, uuid5
WORKSPACE = Path("/root/.openclaw/workspace")
MEMORY_DIR = WORKSPACE / "memory"
SOURCES_PATH = MEMORY_DIR / "session_sources.json"
STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json"
BRAIN_DIR = WORKSPACE / "second-brain"
DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite")))
import sys
sys.path.insert(0, str(BRAIN_DIR))
from src.engram import Engram, Grounding # type: ignore
from src.store import EngramStore # type: ignore
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _load_json(path: Path, default: Any) -> Any:
try:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def _save_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _extract_text(content: Any) -> str:
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: List[str] = []
for c in content:
if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str):
parts.append(c["text"])
elif isinstance(c, str):
parts.append(c)
return "\n".join(p.strip() for p in parts if p and p.strip()).strip()
if isinstance(content, dict) and isinstance(content.get("text"), str):
return content["text"].strip()
return str(content).strip()
@dataclass
class Source:
label: str
transcript_path: Path
def _load_sources() -> List[Source]:
payload = _load_json(SOURCES_PATH, {"sources": []})
sources: List[Source] = []
for item in payload.get("sources", []) if isinstance(payload, dict) else []:
if not isinstance(item, dict):
continue
label = str(item.get("label") or "session")
p = item.get("path")
if not isinstance(p, str) or not p:
continue
sources.append(Source(label=label, transcript_path=Path(p)))
return sources
def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]:
with open(path, "rb") as f:
f.seek(max(0, int(start_offset)))
while True:
raw = f.readline()
if not raw:
break
line = raw.decode("utf-8", errors="ignore").strip()
if not line:
continue
yield (f.tell(), line)
def run() -> Dict[str, Any]:
if not DB_PATH.exists():
return {"success": False, "time": _now(), "error": f"db missing: {DB_PATH}"}
sources = _load_sources()
state = _load_json(STATE_PATH, {"offsets": {}})
offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {}
store = EngramStore(str(DB_PATH))
out = {"success": True, "time": _now(), "sources": len(sources), "messages_saved": 0, "messages_skipped": 0, "errors": []}
for src in sources:
try:
if not src.transcript_path.exists():
continue
key = str(src.transcript_path)
start = int(offsets.get(key, 0))
for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start):
offsets[key] = new_off
obj = json.loads(line)
if not isinstance(obj, dict) or obj.get("type") != "message":
continue
msg = obj.get("message") if isinstance(obj.get("message"), dict) else {}
role = str(msg.get("role") or "unknown")
content = _extract_text(msg.get("content"))
if len(content.strip()) < 5:
continue
mid = str(obj.get("id") or msg.get("id") or msg.get("messageId") or msg.get("message_id") or "")
if not mid:
mid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{role}:{content[:200]}"))
eid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{mid}"))
if store.get(eid):
out["messages_skipped"] += 1
continue
eg = Engram.create(
content=f"[transcript:{src.label}] [{role}] [{mid}]\n\n{content}"[:4000],
source="session",
tags=["session", "openclaw", "transcript", f"role:{role}"],
session_id=src.label,
confidence=0.55,
grounding=Grounding.ASSUMPTION,
)
eg.id = uuid5(NAMESPACE_URL, eid)
eg.metadata["source"] = "session"
eg.metadata["session_id"] = src.label
eg.metadata["role"] = role
eg.metadata["message_id"] = mid
store.save(eg)
out["messages_saved"] += 1
except Exception as e:
out["success"] = False
out["errors"].append(f"{src.transcript_path}: {e}")
_save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]})
return out
if __name__ == "__main__":
print(json.dumps(run(), ensure_ascii=False, indent=2))