diff --git a/cron_tasks/ingest_transcript_to_db.py b/cron_tasks/ingest_transcript_to_db.py index 40a74ee..7978097 100644 --- a/cron_tasks/ingest_transcript_to_db.py +++ b/cron_tasks/ingest_transcript_to_db.py @@ -13,13 +13,14 @@ import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Optional from uuid import NAMESPACE_URL, uuid5 WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" SOURCES_PATH = MEMORY_DIR / "session_sources.json" STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json" +SESSIONS_INDEX = Path("/root/.openclaw/agents/main/sessions/sessions.json") BRAIN_DIR = WORKSPACE / "second-brain" DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite"))) @@ -71,6 +72,19 @@ class Source: transcript_path: Path +def _discover_current_transcript(label: str) -> Optional[Path]: + parts = label.rsplit(":", 1) + if len(parts) != 2 or not parts[1].isdigit(): + return None + payload = _load_json(SESSIONS_INDEX, {}) + direct = payload.get(f"agent:main:telegram:direct:{parts[1]}") if isinstance(payload, dict) else None + session_file = direct.get("sessionFile") if isinstance(direct, dict) else None + if not isinstance(session_file, str): + return None + path = Path(session_file) + return path if path.exists() else None + + def _load_sources() -> List[Source]: payload = _load_json(SOURCES_PATH, {"sources": []}) sources: List[Source] = [] @@ -81,7 +95,8 @@ def _load_sources() -> List[Source]: p = item.get("path") if not isinstance(p, str) or not p: continue - sources.append(Source(label=label, transcript_path=Path(p))) + transcript_path = _discover_current_transcript(label) or Path(p) + sources.append(Source(label=label, transcript_path=transcript_path)) return sources @@ -157,4 +172,3 @@ def run() -> Dict[str, Any]: if __name__ == "__main__": print(json.dumps(run(), ensure_ascii=False, indent=2)) - diff --git a/cron_tasks/ingest_transcript_to_memory.py b/cron_tasks/ingest_transcript_to_memory.py index a629f75..2ef232b 100644 --- a/cron_tasks/ingest_transcript_to_memory.py +++ b/cron_tasks/ingest_transcript_to_memory.py @@ -14,6 +14,7 @@ from __future__ import annotations import json import os +import re from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -29,6 +30,8 @@ WORKSPACE = Path("/root/.openclaw/workspace") MEMORY_DIR = WORKSPACE / "memory" SOURCES_PATH = MEMORY_DIR / "session_sources.json" STATE_PATH = MEMORY_DIR / "session_ingest_state.json" +SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions") +SESSIONS_INDEX = SESSIONS_DIR / "sessions.json" def _local_tz(): @@ -91,6 +94,24 @@ class Source: transcript_path: Path +def _extract_chat_id(label: str) -> Optional[str]: + match = re.match(r"^[a-zA-Z0-9_-]+:(\d+)$", (label or "").strip()) + return match.group(1) if match else None + + +def _discover_transcript_for_label(label: str) -> Optional[Path]: + chat_id = _extract_chat_id(label) + if not chat_id: + return None + index = _load_json(SESSIONS_INDEX, {}) + direct = index.get(f"agent:main:telegram:direct:{chat_id}") if isinstance(index, dict) else None + session_file = direct.get("sessionFile") if isinstance(direct, dict) else None + if not isinstance(session_file, str): + return None + path = Path(session_file) + return path if path.exists() else None + + def _load_sources() -> List[Source]: """ Sources file format: @@ -109,7 +130,8 @@ def _load_sources() -> List[Source]: p = item.get("path") if not isinstance(p, str) or not p: continue - sources.append(Source(label=label, transcript_path=Path(p))) + transcript_path = _discover_transcript_for_label(label) or Path(p) + sources.append(Source(label=label, transcript_path=transcript_path)) return sources