#!/usr/bin/env python3 """ Tail OpenClaw session transcript JSONL files and append new messages into workspace/memory/YYYY-MM-DD.md so the existing ingest_memory pipeline can pick them up. Why: when chat_autosave hooks are missed/aborted, the "memory/*.md -> DB" ingest doesn't see the latest conversation. This bridges transcript -> memory. Safety: read-only access to transcript files; state stored in workspace/memory/. """ from __future__ import annotations import json import os import re from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional try: from zoneinfo import ZoneInfo # py3.9+ except Exception: # pragma: no cover ZoneInfo = None # type: ignore WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" SOURCES_PATH = MEMORY_DIR / "session_sources.json" STATE_PATH = MEMORY_DIR / "session_ingest_state.json" SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions") SESSIONS_INDEX = SESSIONS_DIR / "sessions.json" def _local_tz(): tz = os.environ.get("TZ") or "Europe/Berlin" if ZoneInfo: try: return ZoneInfo(tz) except Exception: return timezone.utc return timezone.utc def _load_json(path: Path, default: Any) -> Any: try: if not path.exists(): return default return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def _save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def _iso_to_dt(ts: str) -> Optional[datetime]: try: if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) except Exception: return None def _ms_to_dt(ms: int) -> datetime: return datetime.fromtimestamp(ms / 1000, tz=timezone.utc) def _extract_text(content: Any) -> str: if isinstance(content, str): return content.strip() if isinstance(content, list): parts: List[str] = [] for c in content: if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): parts.append(c["text"]) elif isinstance(c, str): parts.append(c) return "\n".join(p.strip() for p in parts if p and p.strip()).strip() if isinstance(content, dict): if isinstance(content.get("text"), str): return content["text"].strip() return str(content).strip() @dataclass class Source: label: str transcript_path: Path def _extract_chat_id(label: str) -> Optional[str]: match = re.match(r"^[a-zA-Z0-9_-]+:(\d+)$", (label or "").strip()) return match.group(1) if match else None def _discover_transcript_for_label(label: str) -> Optional[Path]: chat_id = _extract_chat_id(label) if not chat_id: return None index = _load_json(SESSIONS_INDEX, {}) direct = index.get(f"agent:main:telegram:direct:{chat_id}") if isinstance(index, dict) else None session_file = direct.get("sessionFile") if isinstance(direct, dict) else None if not isinstance(session_file, str): return None path = Path(session_file) return path if path.exists() else None def _load_sources() -> List[Source]: """ Sources file format: { "sources": [ { "label": "telegram:263887248", "path": "/root/.openclaw/agents/main/sessions/.jsonl" } ] } """ payload = _load_json(SOURCES_PATH, {"sources": []}) sources: List[Source] = [] for item in payload.get("sources", []) if isinstance(payload, dict) else []: if not isinstance(item, dict): continue label = str(item.get("label") or "session") p = item.get("path") if not isinstance(p, str) or not p: continue transcript_path = _discover_transcript_for_label(label) or Path(p) sources.append(Source(label=label, transcript_path=transcript_path)) return sources def _memory_path_for(dt: datetime) -> Path: tz = _local_tz() local = dt.astimezone(tz) MEMORY_DIR.mkdir(parents=True, exist_ok=True) return MEMORY_DIR / f"{local.date().isoformat()}.md" def _append_memory(dt: datetime, label: str, role: str, text: str) -> None: if not text.strip(): return tz = _local_tz() local = dt.astimezone(tz) mem = _memory_path_for(dt) if not mem.exists(): mem.write_text(f"# {local.date().isoformat()}\n\n", encoding="utf-8") header = f"## {local.strftime('%H:%M:%S')} - {label} ({role})" body = text.strip() mem.write_text(mem.read_text(encoding="utf-8") + f"{header}\n\n{body}\n\n", encoding="utf-8") def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]: with open(path, "rb") as f: f.seek(max(0, int(start_offset))) while True: raw = f.readline() if not raw: break try: line = raw.decode("utf-8", errors="ignore").strip() except Exception: line = "" if not line: continue yield (f.tell(), line) def run() -> Dict[str, Any]: sources = _load_sources() state = _load_json(STATE_PATH, {"offsets": {}}) offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {} out = { "success": True, "time": datetime.now(timezone.utc).isoformat(), "sources": len(sources), "messages_appended": 0, "errors": [], } for src in sources: try: if not src.transcript_path.exists(): continue key = str(src.transcript_path) start = int(offsets.get(key, 0)) for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start): try: obj = json.loads(line) except Exception: offsets[key] = new_off continue if not isinstance(obj, dict) or obj.get("type") != "message": offsets[key] = new_off continue msg = obj.get("message") if isinstance(obj.get("message"), dict) else {} role = str(msg.get("role") or "unknown") content = msg.get("content") text = _extract_text(content) dt = None if isinstance(msg.get("timestamp"), (int, float)): dt = _ms_to_dt(int(msg["timestamp"])) elif isinstance(obj.get("timestamp"), str): dt = _iso_to_dt(obj["timestamp"]) if dt is None: dt = datetime.now(timezone.utc) if len(text.strip()) < 5: offsets[key] = new_off continue _append_memory(dt, src.label, role, text) out["messages_appended"] += 1 offsets[key] = new_off except Exception as e: out["success"] = False out["errors"].append(f"{src.transcript_path}: {e}") _save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]}) return out if __name__ == "__main__": res = run() print(json.dumps(res, ensure_ascii=False, indent=2))