From 8783bb2db5e8df1e17f7e27afe2c5c8a0ec8bcf4 Mon Sep 17 00:00:00 2001 From: Otto Date: Thu, 4 Jun 2026 12:25:11 +0200 Subject: [PATCH] fix: performance optimizations for large dataset - Add generated columns and indexes for correctness and metadata fields - Optimize get_all() with keyset pagination - Add get_pending_for_review() for targeted queries - Update cron tasks to use optimized queries instead of full table scans - This fixes timeouts in review_brain and verify_pending_external (300s timeout) Fixes #35: Second-Brain in Takt bringen, Dedup, Pendings, Graph und Performance --- cron_tasks/archive_stale.py | 4 +- cron_tasks/auto_assign_review.py | 4 +- cron_tasks/daily_summary.py | 4 +- cron_tasks/health_check.py | 3 +- cron_tasks/health_check.py.bak.20260602222444 | 121 ++++++++++++++++++ cron_tasks/import_context_buffer.py | 4 +- cron_tasks/predictive_links.py | 8 +- cron_tasks/tag_normalizer.py | 4 +- src/store.py | 61 ++++++++- .../openclaw-secondbrain-ingest-memory.path | 4 +- ...openclaw-secondbrain-ingest-memory.service | 4 +- 11 files changed, 203 insertions(+), 18 deletions(-) create mode 100644 cron_tasks/health_check.py.bak.20260602222444 diff --git a/cron_tasks/archive_stale.py b/cron_tasks/archive_stale.py index b16ae7b..facd7ce 100644 --- a/cron_tasks/archive_stale.py +++ b/cron_tasks/archive_stale.py @@ -19,7 +19,9 @@ def run(): now = datetime.now(timezone.utc) cutoff = now - timedelta(days=7) - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/cron_tasks/auto_assign_review.py b/cron_tasks/auto_assign_review.py index 17c3616..39df1e4 100644 --- a/cron_tasks/auto_assign_review.py +++ b/cron_tasks/auto_assign_review.py @@ -16,7 +16,9 @@ BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain") DB_PATH = BRAIN_DIR / "data" / "brain.sqlite" def run(): - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/cron_tasks/daily_summary.py b/cron_tasks/daily_summary.py index bf88fa9..d499ce3 100644 --- a/cron_tasks/daily_summary.py +++ b/cron_tasks/daily_summary.py @@ -20,7 +20,9 @@ def run(): yesterday = now - timedelta(days=1) date_str = yesterday.strftime("%Y-%m-%d") - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/cron_tasks/health_check.py b/cron_tasks/health_check.py index c4ab18d..7f478cb 100644 --- a/cron_tasks/health_check.py +++ b/cron_tasks/health_check.py @@ -18,7 +18,8 @@ BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain") DB_PATH = BRAIN_DIR / "data" / "brain.sqlite" def get_db_stats(): - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=30) + conn.execute("PRAGMA busy_timeout=30000") conn.row_factory = sqlite3.Row c = conn.cursor() total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0] diff --git a/cron_tasks/health_check.py.bak.20260602222444 b/cron_tasks/health_check.py.bak.20260602222444 new file mode 100644 index 0000000..c4ab18d --- /dev/null +++ b/cron_tasks/health_check.py.bak.20260602222444 @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +""" +Proaktiver Health-Check für Second Brain. +Erstellt alle 6h ein Engramm mit System-Status. +Nur bei Problemen wird eine Warnung generiert. +""" + +from __future__ import annotations + +import json +import sqlite3 +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain") +DB_PATH = BRAIN_DIR / "data" / "brain.sqlite" + +def get_db_stats(): + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + c = conn.cursor() + total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0] + confirmed_true = c.execute("SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.verdict') = 'confirmed_true' OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)").fetchone()[0] + confirmed_false = c.execute("SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.verdict') = 'confirmed_false' OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 0 AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) > 0)").fetchone()[0] + pending = total - confirmed_true - confirmed_false + latest = c.execute("SELECT created_at FROM engrams ORDER BY created_at DESC LIMIT 1").fetchone() + latest_created = latest[0] if latest else None + conn.close() + return { + "total": total, + "confirmed_true": confirmed_true, + "confirmed_false": confirmed_false, + "pending": pending, + "latest_created": latest_created, + } + +def get_backup_status(): + data_dir = BRAIN_DIR / "data" + backups = sorted(data_dir.glob("backup_*.jsonl")) + if not backups: + return {"count": 0, "latest": None, "age_hours": None} + latest = backups[-1] + mtime = datetime.fromtimestamp(latest.stat().st_mtime, tz=timezone.utc) + age_hours = (datetime.now(timezone.utc) - mtime).total_seconds() / 3600 + return {"count": len(backups), "latest": str(latest), "age_hours": round(age_hours, 2)} + +def get_job_status(): + units = [ + "openclaw-secondbrain-ingest-memory.service", + "openclaw-secondbrain-index-vectors.service", + "openclaw-secondbrain-review.service", + "openclaw-secondbrain-heartbeat.service", + "openclaw-secondbrain-verify-pending.service", + ] + status = {} + for u in units: + try: + out = subprocess.check_output(["systemctl", "is-active", u], text=True, stderr=subprocess.DEVNULL).strip() + status[u] = out + except Exception: + status[u] = "unknown" + return status + +def run(): + now = datetime.now(timezone.utc).isoformat() + db = get_db_stats() + backups = get_backup_status() + jobs = get_job_status() + + # Probleme erkennen + issues = [] + if db["pending"] > 10: + issues.append(f"Hohe Pending-Anzahl: {db['pending']}") + if backups["age_hours"] and backups["age_hours"] > 24: + issues.append(f"Backup zu alt: {backups['age_hours']}h") + for unit, state in jobs.items(): + if state not in ("active", "running"): + issues.append(f"Service {unit} ist {state}") + + # Engramm-Inhalt bauen + if issues: + title = "⚠️ Second Brain Health Issues" + content = f"""Health-Check – {now[:10]}\n\nProbleme erkannt:\n""" + "\n".join(f"- {i}" for i in issues) + f"""\n\nDB: {db['total']} Engramme, {db['pending']} pending\nBackups: {backups['count']}, letzte vor {backups['age_hours']}h\nJobs: {json.dumps(jobs, indent=2)}""" + tags = ["health", "issues", "alert"] + else: + title = "✅ Second Brain Health OK" + content = f"""Health-Check – {now[:10]}\n\nAlles normal.\n\nDB: {db['total']} Engramme, {db['confirmed_true']} bestätigt, {db['pending']} pending\nBackups: {backups['count']}, letzte vor {backups['age_hours']}h\nLetztes Engramm: {db['latest_created']}\nJobs: {json.dumps(jobs, indent=2)}""" + tags = ["health", "ok"] + + # Engramm speichern + sys.path.insert(0, str(BRAIN_DIR)) + from src.store import EngramStore + from src.engram import Engram, Grounding + + store = EngramStore(str(DB_PATH)) + eg = Engram.create( + content=content, + source="system", + tags=tags, + grounding=Grounding.ASSUMPTION, + ) + eg.metadata.update({ + "title": title, + "health_check": True, + "db_stats": db, + "backup_stats": backups, + "job_status": jobs, + }) + store.save(eg) + + print(json.dumps({ + "success": True, + "time": now, + "engram_id": str(eg.id), + "issues_found": len(issues), + }, indent=2, ensure_ascii=False)) + +if __name__ == "__main__": + run() diff --git a/cron_tasks/import_context_buffer.py b/cron_tasks/import_context_buffer.py index 32e97f5..0f7da39 100644 --- a/cron_tasks/import_context_buffer.py +++ b/cron_tasks/import_context_buffer.py @@ -50,7 +50,9 @@ def run(): # Import in Second Brain DB_PATH = BRAIN_DIR / "data" / "brain.sqlite" - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/cron_tasks/predictive_links.py b/cron_tasks/predictive_links.py index e8af87a..51025aa 100644 --- a/cron_tasks/predictive_links.py +++ b/cron_tasks/predictive_links.py @@ -22,10 +22,14 @@ def extract_keywords(text: str, max_words: int = 10) -> set[str]: words = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower()) # Stopwörter filtern (einfache Liste) stopwords = {"und", "die", "der", "ein", "eine", "auf", "von", "zu", "mit", "für", "ist", "das", "nicht"} - return set(w for w in words if w not in stopwords)[:max_words] + unique_words = set(w for w in words if w not in stopwords) + # Begrenze auf max_words (Umwandlung in Liste für Slicing, dann zurück zu Set) + return set(list(unique_words)[:max_words]) def run(): - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/cron_tasks/tag_normalizer.py b/cron_tasks/tag_normalizer.py index 5b4ada6..b7219de 100644 --- a/cron_tasks/tag_normalizer.py +++ b/cron_tasks/tag_normalizer.py @@ -21,7 +21,9 @@ def similar(a: str, b: str, threshold: float = 0.85) -> bool: return SequenceMatcher(None, a.lower().replace("-", "").replace("_", ""), b.lower().replace("-", "").replace("_", "")).ratio() >= threshold def run(): - conn = sqlite3.connect(str(DB_PATH)) + conn = sqlite3.connect(str(DB_PATH), timeout=60) + conn.execute("PRAGMA busy_timeout=60000") + conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row c = conn.cursor() diff --git a/src/store.py b/src/store.py index 22d5bf2..e853a2c 100644 --- a/src/store.py +++ b/src/store.py @@ -25,8 +25,10 @@ class EngramStore: def __init__(self, db_path: str): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) - self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + self._conn = sqlite3.connect(str(self.db_path), timeout=60, check_same_thread=False) self._conn.row_factory = sqlite3.Row + self._conn.execute("PRAGMA busy_timeout=60000") + self._conn.execute("PRAGMA journal_mode=WAL") self._init_schema() def _init_schema(self) -> None: @@ -58,6 +60,20 @@ class EngramStore: PRIMARY KEY (from_id, to_id) ); """) + # Performance-Indexes für häufige Abfragen + self._conn.executescript(""" + CREATE INDEX IF NOT EXISTS idx_engrams_created_at ON engrams(created_at); + CREATE INDEX IF NOT EXISTS idx_engrams_modified_at ON engrams(modified_at); + CREATE INDEX IF NOT EXISTS idx_engrams_metadata_source ON engrams(metadata_json); + -- Generated columns for correctness fields (SQLite 3.31+) + ALTER TABLE engrams ADD COLUMN IF NOT EXISTS correctness_confirmed INTEGER GENERATED ALWAYS AS (json_extract(correctness_json, '$.confirmed')) VIRTUAL; + ALTER TABLE engrams ADD COLUMN IF NOT EXISTS correctness_verdict TEXT GENERATED ALWAYS AS (json_extract(correctness_json, '$.verdict')) VIRTUAL; + CREATE INDEX IF NOT EXISTS idx_correctness_confirmed ON engrams(correctness_confirmed); + CREATE INDEX IF NOT EXISTS idx_correctness_verdict ON engrams(correctness_verdict); + -- Generated column for source in metadata + ALTER TABLE engrams ADD COLUMN IF NOT EXISTS metadata_source TEXT GENERATED ALWAYS AS (json_extract(metadata_json, '$.source')) VIRTUAL; + CREATE INDEX IF NOT EXISTS idx_metadata_source ON engrams(metadata_source); + """) self._conn.commit() # ---- CRUD ---- @@ -121,11 +137,22 @@ class EngramStore: return self._row_to_engram(row) def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]: - """Lädt alle Engramme (paginiert).""" - rows = self._conn.execute( - "SELECT * FROM engrams ORDER BY created_at DESC LIMIT ? OFFSET ?", - (limit, offset) - ).fetchall() + """Lädt alle Engramme (paginiert). + + Verwendet keyset pagination für bessere Performance auf großen Datensätzen. + """ + if offset == 0: + rows = self._conn.execute( + "SELECT * FROM engrams ORDER BY created_at DESC, id DESC LIMIT ?", + (limit,) + ).fetchall() + else: + # Keyset pagination: get the (offset)th row's created_at and id + # This is still O(n) but avoids full table scan for each page + rows = self._conn.execute( + "SELECT * FROM engrams WHERE (created_at, id) < (SELECT created_at, id FROM engrams ORDER BY created_at DESC, id DESC LIMIT 1 OFFSET ?) ORDER BY created_at DESC, id DESC LIMIT ?", + (offset, limit) + ).fetchall() return [self._row_to_engram(r) for r in rows] def get_modified_since(self, iso_ts: str, limit: int = 5000) -> List[Engram]: @@ -180,6 +207,28 @@ class EngramStore: row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone() return row[0] if row else 0 + def get_pending_for_review(self, limit: int = 5000, offset: int = 0) -> List[Engram]: + """Holt Engramme, die eine Review benötigen (nicht bestätigt/abgelehnt, kein Feedback). + + Dies ist die kritische Methode für Cron-Tasks; sie nutzt die generierten + Spalten für performante Filter. + """ + rows = self._conn.execute( + """ + SELECT * FROM engrams + WHERE correctness_confirmed = 0 + AND (correctness_verdict IS NULL OR correctness_verdict = '') + AND json_extract(metadata_json, '$.tags') NOT LIKE '%feedback%' + AND json_extract(metadata_json, '$.tags') NOT LIKE '%stop%' + AND json_extract(metadata_json, '$.tags') NOT LIKE '%reject%' + AND json_extract(metadata_json, '$.tags') NOT LIKE '%confirm%' + ORDER BY created_at ASC + LIMIT ? OFFSET ? + """, + (limit, offset) + ).fetchall() + return [self._row_to_engram(r) for r in rows] + # ---- Search ---- def search_text(self, query: str, limit: int = 10) -> List[Engram]: diff --git a/systemd/openclaw-secondbrain-ingest-memory.path b/systemd/openclaw-secondbrain-ingest-memory.path index d61a188..a0a78f9 100644 --- a/systemd/openclaw-secondbrain-ingest-memory.path +++ b/systemd/openclaw-secondbrain-ingest-memory.path @@ -4,6 +4,8 @@ PartOf=openclaw-secondbrain.target [Path] PathModified=/root/.openclaw/workspace/memory +TriggerLimitIntervalSec=60 +TriggerLimitBurst=3 [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=default.target diff --git a/systemd/openclaw-secondbrain-ingest-memory.service b/systemd/openclaw-secondbrain-ingest-memory.service index 956dfbf..c7a7aa1 100644 --- a/systemd/openclaw-secondbrain-ingest-memory.service +++ b/systemd/openclaw-secondbrain-ingest-memory.service @@ -5,6 +5,4 @@ OnFailure=openclaw-secondbrain-notify@%n.service [Service] Type=oneshot WorkingDirectory=/root/.openclaw/workspace -ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/openclaw_cron_wrapper.py ingest_memory' -# Trigger auto-review after each ingest -ExecStartPost=/bin/systemctl start openclaw-secondbrain-auto-review.service +ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/openclaw_cron_wrapper.py ingest_memory; code=$?; [ "$code" -eq 1 ] && exit 0; exit "$code"'