#!/usr/bin/env python3 """ Tail OpenClaw session transcript JSONL files and append new messages into workspace/memory/YYYY-MM-DD.md so the existing ingest_memory pipeline can pick them up. Why: when chat_autosave hooks are missed/aborted, the "memory/*.md -> DB" ingest doesn't see the latest conversation. This bridges transcript -> memory. Safety: read-only access to transcript files; state stored in workspace/memory/. """ from __future__ import annotations import json import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional try: from zoneinfo import ZoneInfo # py3.9+ except Exception: # pragma: no cover ZoneInfo = None # type: ignore WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" SOURCES_PATH = MEMORY_DIR / "session_sources.json" STATE_PATH = MEMORY_DIR / "session_ingest_state.json" def _local_tz(): tz = os.environ.get("TZ") or "Europe/Berlin" if ZoneInfo: try: return ZoneInfo(tz) except Exception: return timezone.utc return timezone.utc def _load_json(path: Path, default: Any) -> Any: try: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def _save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def _iso_to_dt(ts: str) -> Optional[datetime]: try: if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) except Exception: return None def _ms_to_dt(ms: int) -> datetime: return datetime.fromtimestamp(ms / 1000, tz=timezone.utc) def _extract_text(content: Any) -> str: if isinstance(content, str): return content.strip() if isinstance(content, list): parts: List[str] = [] for c in content: if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): parts.append(c["text"]) elif isinstance(c, str): parts.append(c) return "\n".join(p.strip() for p in parts if p and p.strip()).strip() if isinstance(content, dict): if isinstance(content.get("text"), str): return content["text"].strip() return str(content).strip() @dataclass class Source: label: str transcript_path: Path def _load_sources() -> List[Source]: """ Sources file format: { "sources": [ { "label": "telegram:263887248", "path": "/root/.openclaw/agents/main/sessions/.jsonl" } ] } """ payload = _load_json(SOURCES_PATH, {"sources": []}) sources: List[Source] = [] for item in payload.get("sources", []) if isinstance(payload, dict) else []: if not isinstance(item, dict): continue label = str(item.get("label") or "session") p = item.get("path") if not isinstance(p, str) or not p: continue sources.append(Source(label=label, transcript_path=Path(p))) return sources def _memory_path_for(dt: datetime) -> Path: tz = _local_tz() local = dt.astimezone(tz) MEMORY_DIR.mkdir(parents=True, exist_ok=True) return MEMORY_DIR / f"{local.date().isoformat()}.md" def _append_memory(dt: datetime, label: str, role: str, text: str) -> None: if not text.strip(): return tz = _local_tz() local = dt.astimezone(tz) mem = _memory_path_for(dt) if not mem.exists(): mem.write_text(f"# {local.date().isoformat()}\n\n", encoding="utf-8") header = f"## {local.strftime('%H:%M:%S')} - {label} ({role})" body = text.strip() mem.write_text(mem.read_text(encoding="utf-8") + f"{header}\n\n{body}\n\n", encoding="utf-8") def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]: with open(path, "rb") as f: f.seek(max(0, int(start_offset))) while True: raw = f.readline() if not raw: break try: line = raw.decode("utf-8", errors="ignore").strip() except Exception: line = "" if not line: continue yield (f.tell(), line) def run() -> Dict[str, Any]: sources = _load_sources() state = _load_json(STATE_PATH, {"offsets": {}}) offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {} out = { "success": True, "time": datetime.now(timezone.utc).isoformat(), "sources": len(sources), "messages_appended": 0, "errors": [], } for src in sources: try: if not src.transcript_path.exists(): continue key = str(src.transcript_path) start = int(offsets.get(key, 0)) for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start): try: obj = json.loads(line) except Exception: offsets[key] = new_off continue if not isinstance(obj, dict) or obj.get("type") != "message": offsets[key] = new_off continue msg = obj.get("message") if isinstance(obj.get("message"), dict) else {} role = str(msg.get("role") or "unknown") content = msg.get("content") text = _extract_text(content) dt = None if isinstance(msg.get("timestamp"), (int, float)): dt = _ms_to_dt(int(msg["timestamp"])) elif isinstance(obj.get("timestamp"), str): dt = _iso_to_dt(obj["timestamp"]) if dt is None: dt = datetime.now(timezone.utc) if len(text.strip()) < 5: offsets[key] = new_off continue _append_memory(dt, src.label, role, text) out["messages_appended"] += 1 offsets[key] = new_off except Exception as e: out["success"] = False out["errors"].append(f"{src.transcript_path}: {e}") _save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]}) return out if __name__ == "__main__": res = run() print(json.dumps(res, ensure_ascii=False, indent=2))