From d52e3a7f744bef4bd2d2e0def01d6219f22c79c5 Mon Sep 17 00:00:00 2001 From: Otto Date: Wed, 27 May 2026 01:14:42 +0200 Subject: [PATCH] feat(ingest): transcript direct to DB --- cron_tasks/ingest_transcript_to_db.py | 160 ++++++++++++++++++ ...econdbrain-ingest-transcript-to-db.service | 9 + ...-secondbrain-ingest-transcript-to-db.timer | 12 ++ 3 files changed, 181 insertions(+) create mode 100644 cron_tasks/ingest_transcript_to_db.py create mode 100644 systemd/openclaw-secondbrain-ingest-transcript-to-db.service create mode 100644 systemd/openclaw-secondbrain-ingest-transcript-to-db.timer diff --git a/cron_tasks/ingest_transcript_to_db.py b/cron_tasks/ingest_transcript_to_db.py new file mode 100644 index 0000000..40a74ee --- /dev/null +++ b/cron_tasks/ingest_transcript_to_db.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Ingest OpenClaw session transcript JSONL directly into the Second-Brain DB. + +State is tracked with byte offsets per transcript file. +Sources are configured via workspace/memory/session_sources.json. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List +from uuid import NAMESPACE_URL, uuid5 + +WORKSPACE = Path("/root/.openclaw/workspace") +MEMORY_DIR = WORKSPACE / "memory" +SOURCES_PATH = MEMORY_DIR / "session_sources.json" +STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json" + +BRAIN_DIR = WORKSPACE / "second-brain" +DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite"))) + +import sys + +sys.path.insert(0, str(BRAIN_DIR)) +from src.engram import Engram, Grounding # type: ignore +from src.store import EngramStore # type: ignore + + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _load_json(path: Path, default: Any) -> Any: + try: + if not path.exists(): + return default + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return default + + +def _save_json(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + + +def _extract_text(content: Any) -> str: + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + parts: List[str] = [] + for c in content: + if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): + parts.append(c["text"]) + elif isinstance(c, str): + parts.append(c) + return "\n".join(p.strip() for p in parts if p and p.strip()).strip() + if isinstance(content, dict) and isinstance(content.get("text"), str): + return content["text"].strip() + return str(content).strip() + + +@dataclass +class Source: + label: str + transcript_path: Path + + +def _load_sources() -> List[Source]: + payload = _load_json(SOURCES_PATH, {"sources": []}) + sources: List[Source] = [] + for item in payload.get("sources", []) if isinstance(payload, dict) else []: + if not isinstance(item, dict): + continue + label = str(item.get("label") or "session") + p = item.get("path") + if not isinstance(p, str) or not p: + continue + sources.append(Source(label=label, transcript_path=Path(p))) + return sources + + +def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]: + with open(path, "rb") as f: + f.seek(max(0, int(start_offset))) + while True: + raw = f.readline() + if not raw: + break + line = raw.decode("utf-8", errors="ignore").strip() + if not line: + continue + yield (f.tell(), line) + + +def run() -> Dict[str, Any]: + if not DB_PATH.exists(): + return {"success": False, "time": _now(), "error": f"db missing: {DB_PATH}"} + + sources = _load_sources() + state = _load_json(STATE_PATH, {"offsets": {}}) + offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {} + + store = EngramStore(str(DB_PATH)) + out = {"success": True, "time": _now(), "sources": len(sources), "messages_saved": 0, "messages_skipped": 0, "errors": []} + + for src in sources: + try: + if not src.transcript_path.exists(): + continue + key = str(src.transcript_path) + start = int(offsets.get(key, 0)) + for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start): + offsets[key] = new_off + obj = json.loads(line) + if not isinstance(obj, dict) or obj.get("type") != "message": + continue + msg = obj.get("message") if isinstance(obj.get("message"), dict) else {} + role = str(msg.get("role") or "unknown") + content = _extract_text(msg.get("content")) + if len(content.strip()) < 5: + continue + mid = str(obj.get("id") or msg.get("id") or msg.get("messageId") or msg.get("message_id") or "") + if not mid: + mid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{role}:{content[:200]}")) + eid = str(uuid5(NAMESPACE_URL, f"openclaw-transcript:{src.label}:{mid}")) + if store.get(eid): + out["messages_skipped"] += 1 + continue + eg = Engram.create( + content=f"[transcript:{src.label}] [{role}] [{mid}]\n\n{content}"[:4000], + source="session", + tags=["session", "openclaw", "transcript", f"role:{role}"], + session_id=src.label, + confidence=0.55, + grounding=Grounding.ASSUMPTION, + ) + eg.id = uuid5(NAMESPACE_URL, eid) + eg.metadata["source"] = "session" + eg.metadata["session_id"] = src.label + eg.metadata["role"] = role + eg.metadata["message_id"] = mid + store.save(eg) + out["messages_saved"] += 1 + except Exception as e: + out["success"] = False + out["errors"].append(f"{src.transcript_path}: {e}") + + _save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]}) + return out + + +if __name__ == "__main__": + print(json.dumps(run(), ensure_ascii=False, indent=2)) + diff --git a/systemd/openclaw-secondbrain-ingest-transcript-to-db.service b/systemd/openclaw-secondbrain-ingest-transcript-to-db.service new file mode 100644 index 0000000..d8c8019 --- /dev/null +++ b/systemd/openclaw-secondbrain-ingest-transcript-to-db.service @@ -0,0 +1,9 @@ +[Unit] +Description=OpenClaw Second-Brain ingest transcript -> DB +OnFailure=openclaw-secondbrain-notify@%n.service + +[Service] +Type=oneshot +WorkingDirectory=/root/.openclaw/workspace +ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/openclaw_cron_wrapper.py ingest_transcript_to_db' + diff --git a/systemd/openclaw-secondbrain-ingest-transcript-to-db.timer b/systemd/openclaw-secondbrain-ingest-transcript-to-db.timer new file mode 100644 index 0000000..cac2d44 --- /dev/null +++ b/systemd/openclaw-secondbrain-ingest-transcript-to-db.timer @@ -0,0 +1,12 @@ +[Unit] +Description=OpenClaw Second-Brain ingest transcript -> DB (every 5 min) + +[Timer] +OnBootSec=90s +OnUnitActiveSec=300s +Unit=openclaw-secondbrain-ingest-transcript-to-db.service +Persistent=true + +[Install] +WantedBy=timers.target +