fix: follow active Telegram transcript after rotation

This commit is contained in:
2026-06-02 20:23:37 +02:00
parent f656cb02dc
commit 6abe4d36e8
2 changed files with 40 additions and 4 deletions

View File

@@ -13,13 +13,14 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List from typing import Any, Dict, Iterable, List, Optional
from uuid import NAMESPACE_URL, uuid5 from uuid import NAMESPACE_URL, uuid5
WORKSPACE = Path("/root/.openclaw/workspace") WORKSPACE = Path("/root/.openclaw/workspace")
MEMORY_DIR = WORKSPACE / "memory" MEMORY_DIR = WORKSPACE / "memory"
SOURCES_PATH = MEMORY_DIR / "session_sources.json" SOURCES_PATH = MEMORY_DIR / "session_sources.json"
STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json" STATE_PATH = MEMORY_DIR / "session_db_ingest_state.json"
SESSIONS_INDEX = Path("/root/.openclaw/agents/main/sessions/sessions.json")
BRAIN_DIR = WORKSPACE / "second-brain" BRAIN_DIR = WORKSPACE / "second-brain"
DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite"))) DB_PATH = Path(os.environ.get("BRAIN_DB", str(BRAIN_DIR / "data" / "brain.sqlite")))
@@ -71,6 +72,19 @@ class Source:
transcript_path: Path transcript_path: Path
def _discover_current_transcript(label: str) -> Optional[Path]:
parts = label.rsplit(":", 1)
if len(parts) != 2 or not parts[1].isdigit():
return None
payload = _load_json(SESSIONS_INDEX, {})
direct = payload.get(f"agent:main:telegram:direct:{parts[1]}") if isinstance(payload, dict) else None
session_file = direct.get("sessionFile") if isinstance(direct, dict) else None
if not isinstance(session_file, str):
return None
path = Path(session_file)
return path if path.exists() else None
def _load_sources() -> List[Source]: def _load_sources() -> List[Source]:
payload = _load_json(SOURCES_PATH, {"sources": []}) payload = _load_json(SOURCES_PATH, {"sources": []})
sources: List[Source] = [] sources: List[Source] = []
@@ -81,7 +95,8 @@ def _load_sources() -> List[Source]:
p = item.get("path") p = item.get("path")
if not isinstance(p, str) or not p: if not isinstance(p, str) or not p:
continue continue
sources.append(Source(label=label, transcript_path=Path(p))) transcript_path = _discover_current_transcript(label) or Path(p)
sources.append(Source(label=label, transcript_path=transcript_path))
return sources return sources
@@ -157,4 +172,3 @@ def run() -> Dict[str, Any]:
if __name__ == "__main__": if __name__ == "__main__":
print(json.dumps(run(), ensure_ascii=False, indent=2)) print(json.dumps(run(), ensure_ascii=False, indent=2))

View File

@@ -14,6 +14,7 @@ from __future__ import annotations
import json import json
import os import os
import re
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -29,6 +30,8 @@ WORKSPACE = Path("/root/.openclaw/workspace")
MEMORY_DIR = WORKSPACE / "memory" MEMORY_DIR = WORKSPACE / "memory"
SOURCES_PATH = MEMORY_DIR / "session_sources.json" SOURCES_PATH = MEMORY_DIR / "session_sources.json"
STATE_PATH = MEMORY_DIR / "session_ingest_state.json" STATE_PATH = MEMORY_DIR / "session_ingest_state.json"
SESSIONS_DIR = Path("/root/.openclaw/agents/main/sessions")
SESSIONS_INDEX = SESSIONS_DIR / "sessions.json"
def _local_tz(): def _local_tz():
@@ -91,6 +94,24 @@ class Source:
transcript_path: Path transcript_path: Path
def _extract_chat_id(label: str) -> Optional[str]:
match = re.match(r"^[a-zA-Z0-9_-]+:(\d+)$", (label or "").strip())
return match.group(1) if match else None
def _discover_transcript_for_label(label: str) -> Optional[Path]:
chat_id = _extract_chat_id(label)
if not chat_id:
return None
index = _load_json(SESSIONS_INDEX, {})
direct = index.get(f"agent:main:telegram:direct:{chat_id}") if isinstance(index, dict) else None
session_file = direct.get("sessionFile") if isinstance(direct, dict) else None
if not isinstance(session_file, str):
return None
path = Path(session_file)
return path if path.exists() else None
def _load_sources() -> List[Source]: def _load_sources() -> List[Source]:
""" """
Sources file format: Sources file format:
@@ -109,7 +130,8 @@ def _load_sources() -> List[Source]:
p = item.get("path") p = item.get("path")
if not isinstance(p, str) or not p: if not isinstance(p, str) or not p:
continue continue
sources.append(Source(label=label, transcript_path=Path(p))) transcript_path = _discover_transcript_for_label(label) or Path(p)
sources.append(Source(label=label, transcript_path=transcript_path))
return sources return sources