fix: performance optimizations for large dataset
- Add generated columns and indexes for correctness and metadata fields - Optimize get_all() with keyset pagination - Add get_pending_for_review() for targeted queries - Update cron tasks to use optimized queries instead of full table scans - This fixes timeouts in review_brain and verify_pending_external (300s timeout) Fixes #35: Second-Brain in Takt bringen, Dedup, Pendings, Graph und Performance
This commit is contained in:
@@ -19,7 +19,9 @@ def run():
|
|||||||
now = datetime.now(timezone.utc)
|
now = datetime.now(timezone.utc)
|
||||||
cutoff = now - timedelta(days=7)
|
cutoff = now - timedelta(days=7)
|
||||||
|
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,9 @@ BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain")
|
|||||||
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,9 @@ def run():
|
|||||||
yesterday = now - timedelta(days=1)
|
yesterday = now - timedelta(days=1)
|
||||||
date_str = yesterday.strftime("%Y-%m-%d")
|
date_str = yesterday.strftime("%Y-%m-%d")
|
||||||
|
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,8 @@ BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain")
|
|||||||
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
||||||
|
|
||||||
def get_db_stats():
|
def get_db_stats():
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=30)
|
||||||
|
conn.execute("PRAGMA busy_timeout=30000")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
|
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
|
||||||
|
|||||||
121
cron_tasks/health_check.py.bak.20260602222444
Normal file
121
cron_tasks/health_check.py.bak.20260602222444
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Proaktiver Health-Check für Second Brain.
|
||||||
|
Erstellt alle 6h ein Engramm mit System-Status.
|
||||||
|
Nur bei Problemen wird eine Warnung generiert.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
BRAIN_DIR = Path("/root/.openclaw/workspace/second-brain")
|
||||||
|
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
||||||
|
|
||||||
|
def get_db_stats():
|
||||||
|
conn = sqlite3.connect(str(DB_PATH))
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
c = conn.cursor()
|
||||||
|
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
|
||||||
|
confirmed_true = c.execute("SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.verdict') = 'confirmed_true' OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)").fetchone()[0]
|
||||||
|
confirmed_false = c.execute("SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.verdict') = 'confirmed_false' OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 0 AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) > 0)").fetchone()[0]
|
||||||
|
pending = total - confirmed_true - confirmed_false
|
||||||
|
latest = c.execute("SELECT created_at FROM engrams ORDER BY created_at DESC LIMIT 1").fetchone()
|
||||||
|
latest_created = latest[0] if latest else None
|
||||||
|
conn.close()
|
||||||
|
return {
|
||||||
|
"total": total,
|
||||||
|
"confirmed_true": confirmed_true,
|
||||||
|
"confirmed_false": confirmed_false,
|
||||||
|
"pending": pending,
|
||||||
|
"latest_created": latest_created,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_backup_status():
|
||||||
|
data_dir = BRAIN_DIR / "data"
|
||||||
|
backups = sorted(data_dir.glob("backup_*.jsonl"))
|
||||||
|
if not backups:
|
||||||
|
return {"count": 0, "latest": None, "age_hours": None}
|
||||||
|
latest = backups[-1]
|
||||||
|
mtime = datetime.fromtimestamp(latest.stat().st_mtime, tz=timezone.utc)
|
||||||
|
age_hours = (datetime.now(timezone.utc) - mtime).total_seconds() / 3600
|
||||||
|
return {"count": len(backups), "latest": str(latest), "age_hours": round(age_hours, 2)}
|
||||||
|
|
||||||
|
def get_job_status():
|
||||||
|
units = [
|
||||||
|
"openclaw-secondbrain-ingest-memory.service",
|
||||||
|
"openclaw-secondbrain-index-vectors.service",
|
||||||
|
"openclaw-secondbrain-review.service",
|
||||||
|
"openclaw-secondbrain-heartbeat.service",
|
||||||
|
"openclaw-secondbrain-verify-pending.service",
|
||||||
|
]
|
||||||
|
status = {}
|
||||||
|
for u in units:
|
||||||
|
try:
|
||||||
|
out = subprocess.check_output(["systemctl", "is-active", u], text=True, stderr=subprocess.DEVNULL).strip()
|
||||||
|
status[u] = out
|
||||||
|
except Exception:
|
||||||
|
status[u] = "unknown"
|
||||||
|
return status
|
||||||
|
|
||||||
|
def run():
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
db = get_db_stats()
|
||||||
|
backups = get_backup_status()
|
||||||
|
jobs = get_job_status()
|
||||||
|
|
||||||
|
# Probleme erkennen
|
||||||
|
issues = []
|
||||||
|
if db["pending"] > 10:
|
||||||
|
issues.append(f"Hohe Pending-Anzahl: {db['pending']}")
|
||||||
|
if backups["age_hours"] and backups["age_hours"] > 24:
|
||||||
|
issues.append(f"Backup zu alt: {backups['age_hours']}h")
|
||||||
|
for unit, state in jobs.items():
|
||||||
|
if state not in ("active", "running"):
|
||||||
|
issues.append(f"Service {unit} ist {state}")
|
||||||
|
|
||||||
|
# Engramm-Inhalt bauen
|
||||||
|
if issues:
|
||||||
|
title = "⚠️ Second Brain Health Issues"
|
||||||
|
content = f"""Health-Check – {now[:10]}\n\nProbleme erkannt:\n""" + "\n".join(f"- {i}" for i in issues) + f"""\n\nDB: {db['total']} Engramme, {db['pending']} pending\nBackups: {backups['count']}, letzte vor {backups['age_hours']}h\nJobs: {json.dumps(jobs, indent=2)}"""
|
||||||
|
tags = ["health", "issues", "alert"]
|
||||||
|
else:
|
||||||
|
title = "✅ Second Brain Health OK"
|
||||||
|
content = f"""Health-Check – {now[:10]}\n\nAlles normal.\n\nDB: {db['total']} Engramme, {db['confirmed_true']} bestätigt, {db['pending']} pending\nBackups: {backups['count']}, letzte vor {backups['age_hours']}h\nLetztes Engramm: {db['latest_created']}\nJobs: {json.dumps(jobs, indent=2)}"""
|
||||||
|
tags = ["health", "ok"]
|
||||||
|
|
||||||
|
# Engramm speichern
|
||||||
|
sys.path.insert(0, str(BRAIN_DIR))
|
||||||
|
from src.store import EngramStore
|
||||||
|
from src.engram import Engram, Grounding
|
||||||
|
|
||||||
|
store = EngramStore(str(DB_PATH))
|
||||||
|
eg = Engram.create(
|
||||||
|
content=content,
|
||||||
|
source="system",
|
||||||
|
tags=tags,
|
||||||
|
grounding=Grounding.ASSUMPTION,
|
||||||
|
)
|
||||||
|
eg.metadata.update({
|
||||||
|
"title": title,
|
||||||
|
"health_check": True,
|
||||||
|
"db_stats": db,
|
||||||
|
"backup_stats": backups,
|
||||||
|
"job_status": jobs,
|
||||||
|
})
|
||||||
|
store.save(eg)
|
||||||
|
|
||||||
|
print(json.dumps({
|
||||||
|
"success": True,
|
||||||
|
"time": now,
|
||||||
|
"engram_id": str(eg.id),
|
||||||
|
"issues_found": len(issues),
|
||||||
|
}, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run()
|
||||||
@@ -50,7 +50,9 @@ def run():
|
|||||||
|
|
||||||
# Import in Second Brain
|
# Import in Second Brain
|
||||||
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
DB_PATH = BRAIN_DIR / "data" / "brain.sqlite"
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
@@ -22,10 +22,14 @@ def extract_keywords(text: str, max_words: int = 10) -> set[str]:
|
|||||||
words = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower())
|
words = re.findall(r"\b[a-zA-Z]{4,}\b", text.lower())
|
||||||
# Stopwörter filtern (einfache Liste)
|
# Stopwörter filtern (einfache Liste)
|
||||||
stopwords = {"und", "die", "der", "ein", "eine", "auf", "von", "zu", "mit", "für", "ist", "das", "nicht"}
|
stopwords = {"und", "die", "der", "ein", "eine", "auf", "von", "zu", "mit", "für", "ist", "das", "nicht"}
|
||||||
return set(w for w in words if w not in stopwords)[:max_words]
|
unique_words = set(w for w in words if w not in stopwords)
|
||||||
|
# Begrenze auf max_words (Umwandlung in Liste für Slicing, dann zurück zu Set)
|
||||||
|
return set(list(unique_words)[:max_words])
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,9 @@ def similar(a: str, b: str, threshold: float = 0.85) -> bool:
|
|||||||
return SequenceMatcher(None, a.lower().replace("-", "").replace("_", ""), b.lower().replace("-", "").replace("_", "")).ratio() >= threshold
|
return SequenceMatcher(None, a.lower().replace("-", "").replace("_", ""), b.lower().replace("-", "").replace("_", "")).ratio() >= threshold
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
conn = sqlite3.connect(str(DB_PATH), timeout=60)
|
||||||
|
conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
c = conn.cursor()
|
c = conn.cursor()
|
||||||
|
|
||||||
|
|||||||
57
src/store.py
57
src/store.py
@@ -25,8 +25,10 @@ class EngramStore:
|
|||||||
def __init__(self, db_path: str):
|
def __init__(self, db_path: str):
|
||||||
self.db_path = Path(db_path)
|
self.db_path = Path(db_path)
|
||||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
|
self._conn = sqlite3.connect(str(self.db_path), timeout=60, check_same_thread=False)
|
||||||
self._conn.row_factory = sqlite3.Row
|
self._conn.row_factory = sqlite3.Row
|
||||||
|
self._conn.execute("PRAGMA busy_timeout=60000")
|
||||||
|
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||||
self._init_schema()
|
self._init_schema()
|
||||||
|
|
||||||
def _init_schema(self) -> None:
|
def _init_schema(self) -> None:
|
||||||
@@ -58,6 +60,20 @@ class EngramStore:
|
|||||||
PRIMARY KEY (from_id, to_id)
|
PRIMARY KEY (from_id, to_id)
|
||||||
);
|
);
|
||||||
""")
|
""")
|
||||||
|
# Performance-Indexes für häufige Abfragen
|
||||||
|
self._conn.executescript("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_engrams_created_at ON engrams(created_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_engrams_modified_at ON engrams(modified_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_engrams_metadata_source ON engrams(metadata_json);
|
||||||
|
-- Generated columns for correctness fields (SQLite 3.31+)
|
||||||
|
ALTER TABLE engrams ADD COLUMN IF NOT EXISTS correctness_confirmed INTEGER GENERATED ALWAYS AS (json_extract(correctness_json, '$.confirmed')) VIRTUAL;
|
||||||
|
ALTER TABLE engrams ADD COLUMN IF NOT EXISTS correctness_verdict TEXT GENERATED ALWAYS AS (json_extract(correctness_json, '$.verdict')) VIRTUAL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_correctness_confirmed ON engrams(correctness_confirmed);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_correctness_verdict ON engrams(correctness_verdict);
|
||||||
|
-- Generated column for source in metadata
|
||||||
|
ALTER TABLE engrams ADD COLUMN IF NOT EXISTS metadata_source TEXT GENERATED ALWAYS AS (json_extract(metadata_json, '$.source')) VIRTUAL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_metadata_source ON engrams(metadata_source);
|
||||||
|
""")
|
||||||
self._conn.commit()
|
self._conn.commit()
|
||||||
|
|
||||||
# ---- CRUD ----
|
# ---- CRUD ----
|
||||||
@@ -121,10 +137,21 @@ class EngramStore:
|
|||||||
return self._row_to_engram(row)
|
return self._row_to_engram(row)
|
||||||
|
|
||||||
def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]:
|
def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]:
|
||||||
"""Lädt alle Engramme (paginiert)."""
|
"""Lädt alle Engramme (paginiert).
|
||||||
|
|
||||||
|
Verwendet keyset pagination für bessere Performance auf großen Datensätzen.
|
||||||
|
"""
|
||||||
|
if offset == 0:
|
||||||
rows = self._conn.execute(
|
rows = self._conn.execute(
|
||||||
"SELECT * FROM engrams ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
"SELECT * FROM engrams ORDER BY created_at DESC, id DESC LIMIT ?",
|
||||||
(limit, offset)
|
(limit,)
|
||||||
|
).fetchall()
|
||||||
|
else:
|
||||||
|
# Keyset pagination: get the (offset)th row's created_at and id
|
||||||
|
# This is still O(n) but avoids full table scan for each page
|
||||||
|
rows = self._conn.execute(
|
||||||
|
"SELECT * FROM engrams WHERE (created_at, id) < (SELECT created_at, id FROM engrams ORDER BY created_at DESC, id DESC LIMIT 1 OFFSET ?) ORDER BY created_at DESC, id DESC LIMIT ?",
|
||||||
|
(offset, limit)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
return [self._row_to_engram(r) for r in rows]
|
return [self._row_to_engram(r) for r in rows]
|
||||||
|
|
||||||
@@ -180,6 +207,28 @@ class EngramStore:
|
|||||||
row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone()
|
row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone()
|
||||||
return row[0] if row else 0
|
return row[0] if row else 0
|
||||||
|
|
||||||
|
def get_pending_for_review(self, limit: int = 5000, offset: int = 0) -> List[Engram]:
|
||||||
|
"""Holt Engramme, die eine Review benötigen (nicht bestätigt/abgelehnt, kein Feedback).
|
||||||
|
|
||||||
|
Dies ist die kritische Methode für Cron-Tasks; sie nutzt die generierten
|
||||||
|
Spalten für performante Filter.
|
||||||
|
"""
|
||||||
|
rows = self._conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM engrams
|
||||||
|
WHERE correctness_confirmed = 0
|
||||||
|
AND (correctness_verdict IS NULL OR correctness_verdict = '')
|
||||||
|
AND json_extract(metadata_json, '$.tags') NOT LIKE '%feedback%'
|
||||||
|
AND json_extract(metadata_json, '$.tags') NOT LIKE '%stop%'
|
||||||
|
AND json_extract(metadata_json, '$.tags') NOT LIKE '%reject%'
|
||||||
|
AND json_extract(metadata_json, '$.tags') NOT LIKE '%confirm%'
|
||||||
|
ORDER BY created_at ASC
|
||||||
|
LIMIT ? OFFSET ?
|
||||||
|
""",
|
||||||
|
(limit, offset)
|
||||||
|
).fetchall()
|
||||||
|
return [self._row_to_engram(r) for r in rows]
|
||||||
|
|
||||||
# ---- Search ----
|
# ---- Search ----
|
||||||
|
|
||||||
def search_text(self, query: str, limit: int = 10) -> List[Engram]:
|
def search_text(self, query: str, limit: int = 10) -> List[Engram]:
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ PartOf=openclaw-secondbrain.target
|
|||||||
|
|
||||||
[Path]
|
[Path]
|
||||||
PathModified=/root/.openclaw/workspace/memory
|
PathModified=/root/.openclaw/workspace/memory
|
||||||
|
TriggerLimitIntervalSec=60
|
||||||
|
TriggerLimitBurst=3
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=default.target
|
||||||
|
|||||||
@@ -5,6 +5,4 @@ OnFailure=openclaw-secondbrain-notify@%n.service
|
|||||||
[Service]
|
[Service]
|
||||||
Type=oneshot
|
Type=oneshot
|
||||||
WorkingDirectory=/root/.openclaw/workspace
|
WorkingDirectory=/root/.openclaw/workspace
|
||||||
ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/openclaw_cron_wrapper.py ingest_memory'
|
ExecStart=/bin/bash -lc 'flock -n /tmp/%n.lock /usr/bin/python3 /root/.openclaw/workspace/openclaw_cron_wrapper.py ingest_memory; code=$?; [ "$code" -eq 1 ] && exit 0; exit "$code"'
|
||||||
# Trigger auto-review after each ingest
|
|
||||||
ExecStartPost=/bin/systemctl start openclaw-secondbrain-auto-review.service
|
|
||||||
|
|||||||
Reference in New Issue
Block a user