diff --git a/cron_tasks/ingest_transcript_to_memory.py b/cron_tasks/ingest_transcript_to_memory.py new file mode 100644 index 0000000..a629f75 --- /dev/null +++ b/cron_tasks/ingest_transcript_to_memory.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Tail OpenClaw session transcript JSONL files and append new messages into +workspace/memory/YYYY-MM-DD.md so the existing ingest_memory pipeline can pick +them up. + +Why: when chat_autosave hooks are missed/aborted, the "memory/*.md -> DB" ingest +doesn't see the latest conversation. This bridges transcript -> memory. + +Safety: read-only access to transcript files; state stored in workspace/memory/. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional + +try: + from zoneinfo import ZoneInfo # py3.9+ +except Exception: # pragma: no cover + ZoneInfo = None # type: ignore + + +WORKSPACE = Path("/root/.openclaw/workspace") +MEMORY_DIR = WORKSPACE / "memory" +SOURCES_PATH = MEMORY_DIR / "session_sources.json" +STATE_PATH = MEMORY_DIR / "session_ingest_state.json" + + +def _local_tz(): + tz = os.environ.get("TZ") or "Europe/Berlin" + if ZoneInfo: + try: + return ZoneInfo(tz) + except Exception: + return timezone.utc + return timezone.utc + + +def _load_json(path: Path, default: Any) -> Any: + try: + if not path.exists(): + return default + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return default + + +def _save_json(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + + +def _iso_to_dt(ts: str) -> Optional[datetime]: + try: + if ts.endswith("Z"): + ts = ts[:-1] + "+00:00" + return datetime.fromisoformat(ts) + except Exception: + return None + + +def _ms_to_dt(ms: int) -> datetime: + return datetime.fromtimestamp(ms / 1000, tz=timezone.utc) + + +def _extract_text(content: Any) -> str: + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + parts: List[str] = [] + for c in content: + if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): + parts.append(c["text"]) + elif isinstance(c, str): + parts.append(c) + return "\n".join(p.strip() for p in parts if p and p.strip()).strip() + if isinstance(content, dict): + if isinstance(content.get("text"), str): + return content["text"].strip() + return str(content).strip() + + +@dataclass +class Source: + label: str + transcript_path: Path + + +def _load_sources() -> List[Source]: + """ + Sources file format: + { + "sources": [ + { "label": "telegram:263887248", "path": "/root/.openclaw/agents/main/sessions/.jsonl" } + ] + } + """ + payload = _load_json(SOURCES_PATH, {"sources": []}) + sources: List[Source] = [] + for item in payload.get("sources", []) if isinstance(payload, dict) else []: + if not isinstance(item, dict): + continue + label = str(item.get("label") or "session") + p = item.get("path") + if not isinstance(p, str) or not p: + continue + sources.append(Source(label=label, transcript_path=Path(p))) + return sources + + +def _memory_path_for(dt: datetime) -> Path: + tz = _local_tz() + local = dt.astimezone(tz) + MEMORY_DIR.mkdir(parents=True, exist_ok=True) + return MEMORY_DIR / f"{local.date().isoformat()}.md" + + +def _append_memory(dt: datetime, label: str, role: str, text: str) -> None: + if not text.strip(): + return + tz = _local_tz() + local = dt.astimezone(tz) + mem = _memory_path_for(dt) + if not mem.exists(): + mem.write_text(f"# {local.date().isoformat()}\n\n", encoding="utf-8") + + header = f"## {local.strftime('%H:%M:%S')} - {label} ({role})" + body = text.strip() + mem.write_text(mem.read_text(encoding="utf-8") + f"{header}\n\n{body}\n\n", encoding="utf-8") + + +def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]: + with open(path, "rb") as f: + f.seek(max(0, int(start_offset))) + while True: + raw = f.readline() + if not raw: + break + try: + line = raw.decode("utf-8", errors="ignore").strip() + except Exception: + line = "" + if not line: + continue + yield (f.tell(), line) + + +def run() -> Dict[str, Any]: + sources = _load_sources() + state = _load_json(STATE_PATH, {"offsets": {}}) + offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {} + + out = { + "success": True, + "time": datetime.now(timezone.utc).isoformat(), + "sources": len(sources), + "messages_appended": 0, + "errors": [], + } + + for src in sources: + try: + if not src.transcript_path.exists(): + continue + key = str(src.transcript_path) + start = int(offsets.get(key, 0)) + for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start): + try: + obj = json.loads(line) + except Exception: + offsets[key] = new_off + continue + + if not isinstance(obj, dict) or obj.get("type") != "message": + offsets[key] = new_off + continue + + msg = obj.get("message") if isinstance(obj.get("message"), dict) else {} + role = str(msg.get("role") or "unknown") + content = msg.get("content") + text = _extract_text(content) + + dt = None + if isinstance(msg.get("timestamp"), (int, float)): + dt = _ms_to_dt(int(msg["timestamp"])) + elif isinstance(obj.get("timestamp"), str): + dt = _iso_to_dt(obj["timestamp"]) + if dt is None: + dt = datetime.now(timezone.utc) + + if len(text.strip()) < 5: + offsets[key] = new_off + continue + + _append_memory(dt, src.label, role, text) + out["messages_appended"] += 1 + offsets[key] = new_off + except Exception as e: + out["success"] = False + out["errors"].append(f"{src.transcript_path}: {e}") + + _save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]}) + return out + + +if __name__ == "__main__": + res = run() + print(json.dumps(res, ensure_ascii=False, indent=2)) diff --git a/systemd/openclaw-secondbrain-ingest-transcript-to-memory.service b/systemd/openclaw-secondbrain-ingest-transcript-to-memory.service new file mode 100644 index 0000000..8cae52d --- /dev/null +++ b/systemd/openclaw-secondbrain-ingest-transcript-to-memory.service @@ -0,0 +1,8 @@ +[Unit] +Description=OpenClaw Second-Brain ingest transcript -> memory/*.md +OnFailure=openclaw-secondbrain-notify@%n.service + +[Service] +Type=oneshot +WorkingDirectory=/root/.openclaw/workspace +ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/second-brain/cron_tasks/ingest_transcript_to_memory.py' diff --git a/systemd/openclaw-secondbrain-ingest-transcript-to-memory.timer b/systemd/openclaw-secondbrain-ingest-transcript-to-memory.timer new file mode 100644 index 0000000..2aae3a9 --- /dev/null +++ b/systemd/openclaw-secondbrain-ingest-transcript-to-memory.timer @@ -0,0 +1,11 @@ +[Unit] +Description=OpenClaw Second-Brain ingest transcript -> memory (every 1 min) + +[Timer] +OnBootSec=30s +OnUnitActiveSec=60s +Unit=openclaw-secondbrain-ingest-transcript-to-memory.service + +[Install] +WantedBy=timers.target +