feat(ingest): tail session transcript into memory
This commit is contained in:
213
cron_tasks/ingest_transcript_to_memory.py
Normal file
213
cron_tasks/ingest_transcript_to_memory.py
Normal file
@@ -0,0 +1,213 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tail OpenClaw session transcript JSONL files and append new messages into
|
||||
workspace/memory/YYYY-MM-DD.md so the existing ingest_memory pipeline can pick
|
||||
them up.
|
||||
|
||||
Why: when chat_autosave hooks are missed/aborted, the "memory/*.md -> DB" ingest
|
||||
doesn't see the latest conversation. This bridges transcript -> memory.
|
||||
|
||||
Safety: read-only access to transcript files; state stored in workspace/memory/.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
try:
|
||||
from zoneinfo import ZoneInfo # py3.9+
|
||||
except Exception: # pragma: no cover
|
||||
ZoneInfo = None # type: ignore
|
||||
|
||||
|
||||
WORKSPACE = Path("/root/.openclaw/workspace")
|
||||
MEMORY_DIR = WORKSPACE / "memory"
|
||||
SOURCES_PATH = MEMORY_DIR / "session_sources.json"
|
||||
STATE_PATH = MEMORY_DIR / "session_ingest_state.json"
|
||||
|
||||
|
||||
def _local_tz():
|
||||
tz = os.environ.get("TZ") or "Europe/Berlin"
|
||||
if ZoneInfo:
|
||||
try:
|
||||
return ZoneInfo(tz)
|
||||
except Exception:
|
||||
return timezone.utc
|
||||
return timezone.utc
|
||||
|
||||
|
||||
def _load_json(path: Path, default: Any) -> Any:
|
||||
try:
|
||||
if not path.exists():
|
||||
return default
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
|
||||
def _save_json(path: Path, payload: Any) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def _iso_to_dt(ts: str) -> Optional[datetime]:
|
||||
try:
|
||||
if ts.endswith("Z"):
|
||||
ts = ts[:-1] + "+00:00"
|
||||
return datetime.fromisoformat(ts)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _ms_to_dt(ms: int) -> datetime:
|
||||
return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)
|
||||
|
||||
|
||||
def _extract_text(content: Any) -> str:
|
||||
if isinstance(content, str):
|
||||
return content.strip()
|
||||
if isinstance(content, list):
|
||||
parts: List[str] = []
|
||||
for c in content:
|
||||
if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str):
|
||||
parts.append(c["text"])
|
||||
elif isinstance(c, str):
|
||||
parts.append(c)
|
||||
return "\n".join(p.strip() for p in parts if p and p.strip()).strip()
|
||||
if isinstance(content, dict):
|
||||
if isinstance(content.get("text"), str):
|
||||
return content["text"].strip()
|
||||
return str(content).strip()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Source:
|
||||
label: str
|
||||
transcript_path: Path
|
||||
|
||||
|
||||
def _load_sources() -> List[Source]:
|
||||
"""
|
||||
Sources file format:
|
||||
{
|
||||
"sources": [
|
||||
{ "label": "telegram:263887248", "path": "/root/.openclaw/agents/main/sessions/<id>.jsonl" }
|
||||
]
|
||||
}
|
||||
"""
|
||||
payload = _load_json(SOURCES_PATH, {"sources": []})
|
||||
sources: List[Source] = []
|
||||
for item in payload.get("sources", []) if isinstance(payload, dict) else []:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
label = str(item.get("label") or "session")
|
||||
p = item.get("path")
|
||||
if not isinstance(p, str) or not p:
|
||||
continue
|
||||
sources.append(Source(label=label, transcript_path=Path(p)))
|
||||
return sources
|
||||
|
||||
|
||||
def _memory_path_for(dt: datetime) -> Path:
|
||||
tz = _local_tz()
|
||||
local = dt.astimezone(tz)
|
||||
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return MEMORY_DIR / f"{local.date().isoformat()}.md"
|
||||
|
||||
|
||||
def _append_memory(dt: datetime, label: str, role: str, text: str) -> None:
|
||||
if not text.strip():
|
||||
return
|
||||
tz = _local_tz()
|
||||
local = dt.astimezone(tz)
|
||||
mem = _memory_path_for(dt)
|
||||
if not mem.exists():
|
||||
mem.write_text(f"# {local.date().isoformat()}\n\n", encoding="utf-8")
|
||||
|
||||
header = f"## {local.strftime('%H:%M:%S')} - {label} ({role})"
|
||||
body = text.strip()
|
||||
mem.write_text(mem.read_text(encoding="utf-8") + f"{header}\n\n{body}\n\n", encoding="utf-8")
|
||||
|
||||
|
||||
def _iter_new_lines(path: Path, *, start_offset: int) -> Iterable[tuple[int, str]]:
|
||||
with open(path, "rb") as f:
|
||||
f.seek(max(0, int(start_offset)))
|
||||
while True:
|
||||
raw = f.readline()
|
||||
if not raw:
|
||||
break
|
||||
try:
|
||||
line = raw.decode("utf-8", errors="ignore").strip()
|
||||
except Exception:
|
||||
line = ""
|
||||
if not line:
|
||||
continue
|
||||
yield (f.tell(), line)
|
||||
|
||||
|
||||
def run() -> Dict[str, Any]:
|
||||
sources = _load_sources()
|
||||
state = _load_json(STATE_PATH, {"offsets": {}})
|
||||
offsets: Dict[str, int] = state.get("offsets", {}) if isinstance(state, dict) else {}
|
||||
|
||||
out = {
|
||||
"success": True,
|
||||
"time": datetime.now(timezone.utc).isoformat(),
|
||||
"sources": len(sources),
|
||||
"messages_appended": 0,
|
||||
"errors": [],
|
||||
}
|
||||
|
||||
for src in sources:
|
||||
try:
|
||||
if not src.transcript_path.exists():
|
||||
continue
|
||||
key = str(src.transcript_path)
|
||||
start = int(offsets.get(key, 0))
|
||||
for new_off, line in _iter_new_lines(src.transcript_path, start_offset=start):
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except Exception:
|
||||
offsets[key] = new_off
|
||||
continue
|
||||
|
||||
if not isinstance(obj, dict) or obj.get("type") != "message":
|
||||
offsets[key] = new_off
|
||||
continue
|
||||
|
||||
msg = obj.get("message") if isinstance(obj.get("message"), dict) else {}
|
||||
role = str(msg.get("role") or "unknown")
|
||||
content = msg.get("content")
|
||||
text = _extract_text(content)
|
||||
|
||||
dt = None
|
||||
if isinstance(msg.get("timestamp"), (int, float)):
|
||||
dt = _ms_to_dt(int(msg["timestamp"]))
|
||||
elif isinstance(obj.get("timestamp"), str):
|
||||
dt = _iso_to_dt(obj["timestamp"])
|
||||
if dt is None:
|
||||
dt = datetime.now(timezone.utc)
|
||||
|
||||
if len(text.strip()) < 5:
|
||||
offsets[key] = new_off
|
||||
continue
|
||||
|
||||
_append_memory(dt, src.label, role, text)
|
||||
out["messages_appended"] += 1
|
||||
offsets[key] = new_off
|
||||
except Exception as e:
|
||||
out["success"] = False
|
||||
out["errors"].append(f"{src.transcript_path}: {e}")
|
||||
|
||||
_save_json(STATE_PATH, {"offsets": offsets, "updated_at": out["time"]})
|
||||
return out
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
res = run()
|
||||
print(json.dumps(res, ensure_ascii=False, indent=2))
|
||||
Reference in New Issue
Block a user