Files
second-brain/cron_tasks/ingest_transcript_to_memory.py

236 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
Tail OpenClaw session transcript JSONL files and append new messages into
workspace/memory/YYYY-MM-DD.md so the existing ingest_memory pipeline can pick
them up.
Why: when chat_autosave hooks are missed/aborted, the "memory/*.md -> DB" ingest
doesn't see the latest conversation. This bridges transcript -> memory.
Safety: read-only access to transcript files; state stored in workspace/memory/.
"""
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
try:
from zoneinfo import ZoneInfo # py3.9+
except Exception: # pragma: no cover
ZoneInfo = None # type: ignore
WORKSPACE = Path("/root/.openclaw/workspace")
MEMORY_DIR = WORKSPACE / "memory"
SOURCES_PATH = MEMORY_DIR / "session_sources.json"
STATE_PATH = MEMORY_DIR / "session_ingest_state.json"
SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions")
SESSIONS_INDEX = SESSIONS_DIR / "sessions.json"
def _local_tz():
tz = os.environ.get("TZ") or "Europe/Berlin"
if ZoneInfo:
try:
return ZoneInfo(tz)
except Exception:
return timezone.utc
return timezone.utc
def _load_json(path: Path, default: Any) -> Any:
try:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def _save_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _iso_to_dt(ts: str) -> Optional[datetime]:
try:
if ts.endswith("Z"):
ts = ts[:-1] + "+00:00"
return datetime.fromisoformat(ts)
except Exception:
return None
def _ms_to_dt(ms: int) -> datetime:
return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)
def _extract_text(content: Any) -> str:
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: List[str] = []
for c in content:
if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str):
parts.append(c["text"])
elif isinstance(c, str):
parts.append(c)
return "\n".join(p.strip() for p in parts if p and p.strip()).strip()
if isinstance(content, dict):
if isinstance(content.get("text"), str):
return content["text"].strip()
return str(content).strip()
@dataclass
class Source:
label: str
transcript_path: Path
def _extract_chat_id(label: str) -> Optional[str]:
match = re.match(r"^[a-zA-Z0-9_-]+:(\d+)$", (label or "").strip())
return match.group(1) if match else None
def _discover_transcript_for_label(label: str) -> Optional[Path]:
chat_id = _extract_chat_id(label)
if not chat_id:
return None
index = _load_json(SESSIONS_INDEX, {})
direct = index.get(f"agent:main:telegram:direct:{chat_id}") if isinstance(index, dict) else None
session_file = direct.get("sessionFile") if isinstance(direct, dict) else None
if not isinstance(session_file, str):
return None
path = Path(session_file)
return path if path.exists() else None
def _load_sources() -> List[Source]:
"""
Sources file format:
{
"sources": [
{ "label": "telegram:263887248", "path": "/root/.openclaw/agents/main/sessions/<id>.jsonl" }
]
}
"""
payload = _load_json(SOURCES_PATH, {"sources": []})
sources: List[Source] = []
for item in payload.get("sources", []) if isinstance(payload, dict) else []:
if not isinstance(item, dict):
continue
label = str(item.get("label") or "session")
p = item.get("path")
if not isinstance(p, str) or not p:
continue
transcript_path = _discover_transcript_for_label(label) or Path(p)
sources.append(Source(label=label, transcript_path=transcript_path))
return sources
def _memory_path_for(dt: datetime) -> Path:
tz = _local_tz()
local = dt.astimezone(tz)
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
return MEMORY_DIR / f"{local.date().isoformat()}.md"
def _append_memory(dt: datetime, label: str, role: str, text: str) -> None:
if not text.strip():
return
tz = _local_tz()
local = dt.astimezone(tz)
mem = _memory_path_for(dt)
if not mem.exists():
mem.write_text(f"# {local.date().isoformat()}\n\n", encoding="utf-8")
header = f"## {local.strftime('%H:%M:%S')} - {label} ({role})"
body = text.strip()
mem.write_text(mem.read_text(encoding="utf-8") + f"{header}\n\n{body}\n\n", encoding="utf-8")
def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]:
with open(path, "rb") as f:
f.seek(max(0, int(start_offset)))
while True:
raw = f.readline()
if not raw:
break
try:
line = raw.decode("utf-8", errors="ignore").strip()
except Exception:
line = ""
if not line:
continue
yield (f.tell(), line)
def run() -> Dict[str, Any]:
sources = _load_sources()
state = _load_json(STATE_PATH, {"offsets": {}})
offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {}
out = {
"success": True,
"time": datetime.now(timezone.utc).isoformat(),
"sources": len(sources),
"messages_appended": 0,
"errors": [],
}
for src in sources:
try:
if not src.transcript_path.exists():
continue
key = str(src.transcript_path)
start = int(offsets.get(key, 0))
for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start):
try:
obj = json.loads(line)
except Exception:
offsets[key] = new_off
continue
if not isinstance(obj, dict) or obj.get("type") != "message":
offsets[key] = new_off
continue
msg = obj.get("message") if isinstance(obj.get("message"), dict) else {}
role = str(msg.get("role") or "unknown")
content = msg.get("content")
text = _extract_text(content)
dt = None
if isinstance(msg.get("timestamp"), (int, float)):
dt = _ms_to_dt(int(msg["timestamp"]))
elif isinstance(obj.get("timestamp"), str):
dt = _iso_to_dt(obj["timestamp"])
if dt is None:
dt = datetime.now(timezone.utc)
if len(text.strip()) < 5:
offsets[key] = new_off
continue
_append_memory(dt, src.label, role, text)
out["messages_appended"] += 1
offsets[key] = new_off
except Exception as e:
out["success"] = False
out["errors"].append(f"{src.transcript_path}: {e}")
_save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]})
return out
if __name__ == "__main__":
res = run()
print(json.dumps(res, ensure_ascii=False, indent=2))