From 5e4f21e6802349f337721d029e13208529ac0f44 Mon Sep 17 00:00:00 2001 From: Otto Date: Mon, 25 May 2026 00:53:56 +0200 Subject: [PATCH] feat(core): Engram, Store, Retriever, CLI - Grundsystem Second Brain - src/engram.py: Gedaechtniseinheit mit Confidence, Correctness, Links - src/store.py: SQLite FTS5 persistenter Speicher - src/retriever.py: Hybrid Suche + Reranking - src/cli.py: Kommandozeilen-Interface Issue: #1 --- .gitignore | 4 + docs/ARCHITECTURE.md | 169 +++++++++++++++++++++++++++++ src/__init__.py | 8 ++ src/cli.py | 172 +++++++++++++++++++++++++++++ src/engram.py | 230 +++++++++++++++++++++++++++++++++++++++ src/retriever.py | 55 ++++++++++ src/store.py | 253 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 891 insertions(+) create mode 100644 .gitignore create mode 100644 docs/ARCHITECTURE.md create mode 100644 src/__init__.py create mode 100644 src/cli.py create mode 100644 src/engram.py create mode 100644 src/retriever.py create mode 100644 src/store.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a2547f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +.venv/ +data/ diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md new file mode 100644 index 0000000..abcb1de --- /dev/null +++ b/docs/ARCHITECTURE.md @@ -0,0 +1,169 @@ +# Second Brain - Architektur + +## Vision + +Ein zweites Gehirn für OpenClaw das: +- **Kurzzeitgedächtnis**: Aktuelle Sessions, Kontext, unverarbeitete Informationen +- **Langzeitgedächtnis**: Gesammeltes Wissen, bewertet, verknüpft, priorisiert +- **Bewertungssystem**: Jedes Faktum hat einen Vertrauenswert (0-1), korrektierbar +- **Proaktivität**: Agent wacht auf, prüft, handelt ohne expliziten Befehl +- **Selbstheilung**: Erkennt eigene Fehler, korrigiert, lernt daraus + +## Module + +### 1. Engram Store (Gedächtnis-Einheiten) +Jede Information wird als "Engramm" gespeichert: +``` +{ + id: uuid + content: string (Markdown) + vector: [float...] (Embedding) + metadata: { + source: "user|agent|web|file" + confidence: 0.0-1.0 + created: timestamp + modified: timestamp + access_count: int + last_accessed: timestamp + tags: [string...] + session_id: string|null + agent_id: string|null + }, + correctness: { + confirmed: bool + confirmations: int + rejections: int + last_reviewed: timestamp + review_history: [ + { by: "user|agent", action: "confirm|reject|modify", at: timestamp, note: string } + ] + }, + links: [uuid...] (verbundene Engramme) + hierarchy: { + parent: uuid|null + children: [uuid...] + depth: int + } +} +``` + +### 2. Vector Store (ChromaDB) +- Lokale SQLite-basierte Vektor-Datenbank +- Kein externer Service nötig +- Embedding über sentence-transformers (all-MiniLM-L6-v2) +- ~22MB Modell, CPU-only, 384 Dimensionen + +### 3. Neural Scorer +- Kleines Feed-Forward-Netz (PyTorch) +- Eingabe: Embedding + Metadaten (Alter, Zugriffshäufigkeit, Quelle) +- Ausgabe: Confidence-Score (0-1) +- Training: Reinforcement von User-Feedback (richtig/falsch) + +### 4. Retrieval Engine +- Hybrid: Semantische Suche (Vektor) + Keyword (BM25-ähnlich) +- Reranking nach Confidence, Aktualität, Relevanz +- Contextual Compression: Nur relevante Teile zurückgeben + +### 5. Proactivity Engine +- Cron-gesteuerte Hintergrundaufgaben +- Heartbeat-gesteuerte Prüfungen +- Trigger: Zeit, Events, Zustandsänderungen +- Entscheidet selbst: Was ist jetzt wichtig? + +### 6. Error Correction +- Erkennt fehlgeschlagene Tool-Calls +- Speichert Fehler mit Kontext +- Analysiert Muster: "Immer wenn X, dann scheitert Y" +- Auto-Fix: Alternative Strategien, Fallbacks + +### 7. Visualisierung +- Streamlit-Dashboard lokal +- Graph-Ansicht: Verknüpfte Engramme +- Timeline: Wann wurde was gelernt? +- Stats: Vertrauen, Korrektheit, Abdeckung + +## Tech Stack + +| Komponente | Technologie | Warum | +|------------|-------------|-------| +| Vektor-DB | ChromaDB (lokal) | Kein externer Service, SQLite-basiert | +| Embeddings | sentence-transformers | Klein, schnell, offline | +| Neural Scorer | PyTorch (custom) | Trainierbar, lokal, kein API-Key | +| Frontend | Streamlit | Schnell, Python-nativ, interaktiv | +| Daten-Layer | Python-Klassen + SQLite | Kontrollierbar, debuggbar | +| Prozesse | Cron (OpenClaw built-in) + Heartbeat | Kein externer Scheduler | + +## Datenfluss + +``` +User Input / Event + | + v +[Parser] -> Engramm erstellen + | + v +[Embedding] -> Vektor generieren + | + v +[Vector Store] -> Speichern + | + v +[Neural Scorer] -> Initial-Confidence + | + v +[Link Engine] -> Mit bestehenden verknüpfen + | + v +[Retrieval] <- Anfrage + | + v +[Rerank] -> Beste Ergebnisse + | + v +[Response] -> An User / Agent + | + v +[Feedback Loop] <- Richtig/Falsch? + | + v +[Learn] -> Scorer trainieren, Confidence anpassen +``` + +## Dateistruktur + +``` +second-brain/ +├── src/ +│ ├── __init__.py +│ ├── engram.py # Engramm-Modell +│ ├── store.py # ChromaDB-Wrapper +│ ├── embedder.py # Embedding-Engine +│ ├── scorer.py # Neural Confidence Scorer +│ ├── retriever.py # Hybrid Retrieval +│ ├── linker.py # Verknüpfungs-Engine +│ ├── proactivity.py # Proaktivitäts-Manager +│ ├── error_handler.py # Fehlererkennung & Korrektur +│ ├── trainer.py # RL-Training +│ └── config.py # Konfiguration +├── data/ +│ ├── chromadb/ # Vector DB Files +│ ├── engrams.jsonl # Backup aller Engramme +│ └── scorer_model.pt # Trainiertes Scorer-Netz +├── docs/ +│ ├── ARCHITECTURE.md +│ └── API.md +├── tests/ +│ └── test_core.py +├── scripts/ +│ └── init_db.py +└── app.py # Streamlit Dashboard +``` + +## Nächste Schritte + +1. Kern-Module implementieren (Store, Embedder, Engram) +2. Scorer mit Dummy-Daten trainieren +3. Retrieval-Engine mit Testdaten validieren +4. Dashboard bauen +5. Cron-Jobs für Proaktivität setup +6. Issue #1 & #2 adressieren (Looping verhindern) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..bd2f27f --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,8 @@ +"""Second Brain - Gedächtnissystem für OpenClaw.""" + +from .engram import Engram, Grounding, Correctness, ReviewEntry +from .store import EngramStore +from .retriever import Retriever + +__version__ = "0.1.0" +__all__ = ["Engram", "Grounding", "Correctness", "ReviewEntry", "EngramStore", "Retriever"] diff --git a/src/cli.py b/src/cli.py new file mode 100644 index 0000000..6103a65 --- /dev/null +++ b/src/cli.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +""" +Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten. + +Usage: + python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user + python -m src.cli search "Faktum" + python -m src.cli show + python -m src.cli confirm + python -m src.cli reject + python -m src.cli list + python -m src.cli stats + python -m src.cli export backup.jsonl +""" + +import sys +import json +import argparse +from pathlib import Path + +from .store import EngramStore +from .engram import Engram, Grounding +from .retriever import Retriever + +DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite" + + +def get_store(): + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + return EngramStore(str(DB_PATH)) + + +def cmd_add(args): + store = get_store() + eg = Engram.create( + content=" ".join(args.content), + source=args.source, + tags=args.tag, + grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION, + ) + store.save(eg) + print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}") + + +def cmd_search(args): + store = get_store() + ret = Retriever(store) + results = ret.retrieve( + " ".join(args.query), + limit=args.limit, + min_confidence=args.min_confidence, + tag_filter=args.tag, + ) + print(f"\n=== {len(results)} Results ===") + for r in results: + eg = r["engram"] + conf = eg.compute_confidence() + marker = "✅" if conf > 0.7 else "⚠️" if conf > 0.4 else "❌" + print(f"\n{marker} [{str(eg.id)[:8]}] Score: {conf:.2f} ({r['match_type']})") + print(f" {eg.content[:120]}{'...' if len(eg.content) > 120 else ''}") + print(f" Tags: {', '.join(eg.metadata.get('tags', []))} | Source: {eg.metadata.get('source')}") + print(f" Access: {eg.metadata.get('access_count', 0)} | Reviews: +{eg.correctness.confirmations}/-{eg.correctness.rejections}") + + +def cmd_show(args): + store = get_store() + eg = store.get(args.id) + if not eg: + print(f"Not found: {args.id}") + return + print(json.dumps(eg.to_dict(), indent=2, ensure_ascii=False, default=str)) + + +def cmd_confirm(args): + store = get_store() + eg = store.get(args.id) + if not eg: + print(f"Not found: {args.id}") + return + eg.correctness.confirm(by="user", note=args.note or "Confirmed via CLI") + store.save(eg) + print(f"✅ Confirmed [{str(eg.id)[:8]}] -> Confidence: {eg.compute_confidence():.2f}") + + +def cmd_reject(args): + store = get_store() + eg = store.get(args.id) + if not eg: + print(f"Not found: {args.id}") + return + eg.correctness.reject(by="user", note=args.note or "Rejected via CLI") + store.save(eg) + print(f"❌ Rejected [{str(eg.id)[:8]}] -> Confidence: {eg.compute_confidence():.2f}") + + +def cmd_list(args): + store = get_store() + egs = store.get_all(limit=args.limit) + print(f"\n=== {len(egs)} Engrams ===") + for eg in egs: + conf = eg.compute_confidence() + marker = "✅" if conf > 0.7 else "⚠️" if conf > 0.4 else "❌" + print(f"{marker} [{str(eg.id)[:8]}] ({conf:.2f}) {eg.content[:60]}{'...' if len(eg.content) > 60 else ''}") + + +def cmd_stats(args): + store = get_store() + ret = Retriever(store) + s = ret.stats() + print("\n=== Second Brain Stats ===") + print(f" Total Engrams: {s['total_engrams']}") + print(f" Confirmed: {s['confirmed']}") + print(f" Unconfirmed: {s['unconfirmed']}") + print(f" Sources:") + for src, count in s.get("sources", {}).items(): + print(f" {src}: {count}") + print(f" DB Size: {s['db_size_bytes'] / 1024:.1f} KB") + + +def cmd_export(args): + store = get_store() + count = store.export_jsonl(args.path) + print(f"Exported {count} engrams to {args.path}") + + +def main(): + parser = argparse.ArgumentParser(description="Second Brain CLI") + sub = parser.add_subparsers(dest="cmd") + + p_add = sub.add_parser("add", help="Add a new engram") + p_add.add_argument("content", nargs="+") + p_add.add_argument("--tag", action="append", default=[]) + p_add.add_argument("--source", default="user") + p_add.add_argument("--grounding", choices=[g.name for g in Grounding]) + + p_search = sub.add_parser("search", help="Search engrams") + p_search.add_argument("query", nargs="+") + p_search.add_argument("--limit", type=int, default=5) + p_search.add_argument("--min-confidence", type=float, default=0.0) + p_search.add_argument("--tag", default=None) + + p_show = sub.add_parser("show", help="Show engram details") + p_show.add_argument("id") + + p_confirm = sub.add_parser("confirm", help="Confirm an engram") + p_confirm.add_argument("id") + p_confirm.add_argument("--note", default="") + + p_reject = sub.add_parser("reject", help="Reject an engram") + p_reject.add_argument("id") + p_reject.add_argument("--note", default="") + + p_list = sub.add_parser("list", help="List recent engrams") + p_list.add_argument("--limit", type=int, default=20) + + p_stats = sub.add_parser("stats", help="Show statistics") + + p_export = sub.add_parser("export", help="Export to JSONL") + p_export.add_argument("path") + + args = parser.parse_args() + if not args.cmd: + parser.print_help() + return + + {"add": cmd_add, "search": cmd_search, "show": cmd_show, + "confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list, + "stats": cmd_stats, "export": cmd_export}[args.cmd](args) + + +if __name__ == "__main__": + main() diff --git a/src/engram.py b/src/engram.py new file mode 100644 index 0000000..496bebe --- /dev/null +++ b/src/engram.py @@ -0,0 +1,230 @@ +""" +Engram - Gedächtniseinheit für das Second Brain. +Rein Python, kein externe Abhängigkeiten. +""" + +import json +import hashlib +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from enum import IntEnum +from typing import Optional, List, Dict, Any +from uuid import uuid4, UUID + + +class Grounding(IntEnum): + """Herkunft/Verlässlichkeit einer Information.""" + UNKNOWN = 0 + ASSUMPTION = 1 + INFERRED = 2 + SOURCED = 3 + VERIFIED = 4 + + +@dataclass +class ReviewEntry: + """Ein Eintrag im Korrekturverlauf.""" + by: str # "user" oder agent_id + action: str # "confirm", "reject", "modify" + at: str # ISO-8601 timestamp + note: str = "" + + def to_dict(self) -> dict: + return {"by": self.by, "action": self.action, "at": self.at, "note": self.note} + + @classmethod + def from_dict(cls, d: dict) -> "ReviewEntry": + return cls(d["by"], d["action"], d["at"], d.get("note", "")) + + +@dataclass +class Correctness: + """Verfolgt die Korrektheit eines Engramms über Zeit.""" + confirmed: bool = False + confirmations: int = 0 + rejections: int = 0 + last_reviewed: Optional[str] = None + review_history: List[ReviewEntry] = field(default_factory=list) + + def confirm(self, by: str, note: str = "") -> None: + self.confirmations += 1 + self.confirmed = True + self.last_reviewed = _now() + self.review_history.append(ReviewEntry(by, "confirm", self.last_reviewed, note)) + + def reject(self, by: str, note: str = "") -> None: + self.rejections += 1 + self.confirmed = False + self.last_reviewed = _now() + self.review_history.append(ReviewEntry(by, "reject", self.last_reviewed, note)) + + def score(self) -> float: + """Confidence-Score aus Korrekturhistorie.""" + total = self.confirmations + self.rejections + if total == 0: + return 0.5 # Unbestimmt + return self.confirmations / total + + def to_dict(self) -> dict: + return { + "confirmed": self.confirmed, + "confirmations": self.confirmations, + "rejections": self.rejections, + "last_reviewed": self.last_reviewed, + "review_history": [r.to_dict() for r in self.review_history], + } + + @classmethod + def from_dict(cls, d: dict) -> "Correctness": + c = cls() + c.confirmed = d.get("confirmed", False) + c.confirmations = d.get("confirmations", 0) + c.rejections = d.get("rejections", 0) + c.last_reviewed = d.get("last_reviewed") + c.review_history = [ReviewEntry.from_dict(r) for r in d.get("review_history", [])] + return c + + +@dataclass +class Engram: + """ + Eine Gedächtniseinheit (Engramm). + + Jedes Faktum, jede Beobachtung, jeder Fehler wird als Engramm gespeichert. + Es trägt seinen eigenen Vertrauenswert und seinen Korrekturverlauf mit. + """ + id: UUID + content: str + metadata: Dict[str, Any] = field(default_factory=dict) + correctness: Correctness = field(default_factory=Correctness) + links: List[UUID] = field(default_factory=list) + hierarchy: Dict[str, Any] = field(default_factory=dict) + embedding: Optional[List[float]] = None # Wird bei Bedarf berechnet + + @classmethod + def create( + cls, + content: str, + source: str = "agent", + confidence: float = 0.5, + tags: Optional[List[str]] = None, + session_id: Optional[str] = None, + agent_id: Optional[str] = None, + grounding: Grounding = Grounding.ASSUMPTION, + parent: Optional[UUID] = None, + ) -> "Engram": + """Factory: Erstellt ein neues Engramm mit sinnvollen Defaults.""" + now = _now() + return cls( + id=uuid4(), + content=content, + metadata={ + "source": source, + "confidence": confidence, + "created": now, + "modified": now, + "access_count": 0, + "last_accessed": now, + "tags": tags or [], + "session_id": session_id, + "agent_id": agent_id, + "grounding": grounding.value, + "hash": _hash(content), + }, + correctness=Correctness(), + links=[], + hierarchy={"parent": str(parent) if parent else None, "children": [], "depth": 0}, + ) + + def touch(self) -> None: + """Markiert Zugriff, aktualisiert Zähler und Zeit.""" + self.metadata["access_count"] = self.metadata.get("access_count", 0) + 1 + self.metadata["last_accessed"] = _now() + + def add_link(self, other: "Engram") -> None: + """Bidirektionale Verknüpfung mit anderem Engramm.""" + if other.id not in self.links: + self.links.append(other.id) + if self.id not in other.links: + other.links.append(self.id) + + def set_parent(self, parent: "Engram") -> None: + """Setzt Eltern-Kind-Beziehung.""" + self.hierarchy["parent"] = str(parent.id) + self.hierarchy["depth"] = parent.hierarchy.get("depth", 0) + 1 + if str(self.id) not in parent.hierarchy.get("children", []): + parent.hierarchy.setdefault("children", []).append(str(self.id)) + + def compute_confidence(self) -> float: + """ + Berechnet Gesamt-Confidence aus mehreren Faktoren. + Kein Neuronales Netz nötig - Heuristik für Phase 1. + """ + base = self.metadata.get("confidence", 0.5) + # Korrektheit + correctness_score = self.correctness.score() + # Zugriffshäufigkeit (beliebte Engramme sind oft wichtiger) + access = min(self.metadata.get("access_count", 0) / 10, 1.0) * 0.1 + # Alter (neuere Informationen sind relevanter) + age_days = _age_days(self.metadata.get("created", _now())) + recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0 + # Grounding + grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2 + + combined = ( + base * 0.3 + + correctness_score * 0.3 + + access + + recency + + grounding_boost + ) + return min(max(combined, 0.0), 1.0) + + def to_dict(self) -> dict: + return { + "id": str(self.id), + "content": self.content, + "metadata": self.metadata, + "correctness": self.correctness.to_dict(), + "links": [str(l) for l in self.links], + "hierarchy": self.hierarchy, + "embedding": self.embedding, + } + + @classmethod + def from_dict(cls, d: dict) -> "Engram": + e = cls( + id=UUID(d["id"]), + content=d["content"], + metadata=d.get("metadata", {}), + correctness=Correctness.from_dict(d.get("correctness", {})), + links=[UUID(l) for l in d.get("links", [])], + hierarchy=d.get("hierarchy", {}), + embedding=d.get("embedding"), + ) + return e + + def to_json(self) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False, indent=2) + + @classmethod + def from_json(cls, s: str) -> "Engram": + return cls.from_dict(json.loads(s)) + + +# --- Helpers --- + +def _now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _hash(content: str) -> str: + return hashlib.sha256(content.encode("utf-8")).hexdigest()[:16] + + +def _age_days(iso_str: str) -> float: + try: + dt = datetime.fromisoformat(iso_str) + return (datetime.now(timezone.utc) - dt).total_seconds() / 86400 + except Exception: + return 0.0 diff --git a/src/retriever.py b/src/retriever.py new file mode 100644 index 0000000..f07fc45 --- /dev/null +++ b/src/retriever.py @@ -0,0 +1,55 @@ +""" +Hybrid-Retrieval Engine. +Phase 1: FTS-Keyword + Confidence-Reranking. +Phase 2: + Embedding + Fusion. +""" + +from typing import List, Dict, Any +from .engram import Engram +from .store import EngramStore + + +class Retriever: + def __init__(self, store: EngramStore): + self.store = store + + def retrieve( + self, + query: str, + limit: int = 5, + min_confidence: float = 0.0, + source_filter: str = None, + tag_filter: str = None, + ) -> List[Dict[str, Any]]: + results = [] + keyword_results = self.store.search_text(query, limit=limit * 3) + for eg in keyword_results: + conf = eg.compute_confidence() + if conf < min_confidence: + continue + if source_filter and eg.metadata.get("source") != source_filter: + continue + if tag_filter and tag_filter not in eg.metadata.get("tags", []): + continue + eg.touch() + self.store.save(eg) + results.append({"engram": eg, "score": conf, "match_type": "keyword"}) + results.sort(key=lambda r: r["score"], reverse=True) + return results[:limit] + + def related(self, engram_id: str, limit: int = 5) -> List[Engram]: + eg = self.store.get(engram_id) + if not eg: + return [] + out = [] + for lid in eg.links: + linked = self.store.get(str(lid)) + if linked: + out.append(linked) + return sorted(out, key=lambda e: e.compute_confidence(), reverse=True)[:limit] + + def recent(self, limit: int = 10) -> List[Engram]: + return self.store.get_all(limit=limit) + + def stats(self) -> Dict[str, Any]: + return self.store.stats() diff --git a/src/store.py b/src/store.py new file mode 100644 index 0000000..efab056 --- /dev/null +++ b/src/store.py @@ -0,0 +1,253 @@ +""" +SQLite-basierter Engramm-Store. +Keine externen Abhängigkeiten außer sqlite3 (stdlib). +""" + +import json +import sqlite3 +import os +from pathlib import Path +from typing import List, Optional, Dict, Any +from uuid import UUID + +from .engram import Engram + + +class EngramStore: + """ + Persistenter Engramm-Speicher mit vollem Text-Index. + + Erstelle Instanz: + store = EngramStore("/pfad/zur/db.sqlite") + """ + + def __init__(self, db_path: str): + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._init_schema() + + def _init_schema(self) -> None: + """Erstellt Tabellen falls nicht vorhanden.""" + self._conn.executescript(""" + CREATE TABLE IF NOT EXISTS engrams ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + metadata_json TEXT NOT NULL, + correctness_json TEXT NOT NULL, + links_json TEXT NOT NULL, + hierarchy_json TEXT NOT NULL, + embedding_json TEXT, + created_at TEXT NOT NULL, + modified_at TEXT NOT NULL + ); + + CREATE VIRTUAL TABLE IF NOT EXISTS engrams_fts USING fts5( + content, + tags, + source, + content_rowid=rowid, + tokenize='porter' + ); + + CREATE TABLE IF NOT EXISTS engrams_links ( + from_id TEXT NOT NULL, + to_id TEXT NOT NULL, + PRIMARY KEY (from_id, to_id) + ); + """) + self._conn.commit() + + # ---- CRUD ---- + + def save(self, engram: Engram) -> Engram: + """Speichert oder aktualisiert ein Engramm.""" + now = _now() + data = { + "id": str(engram.id), + "content": engram.content, + "metadata_json": json.dumps(engram.metadata, ensure_ascii=False), + "correctness_json": json.dumps(engram.correctness.to_dict(), ensure_ascii=False), + "links_json": json.dumps([str(l) for l in engram.links], ensure_ascii=False), + "hierarchy_json": json.dumps(engram.hierarchy, ensure_ascii=False), + "embedding_json": json.dumps(engram.embedding, ensure_ascii=False) if engram.embedding else None, + "created_at": engram.metadata.get("created", now), + "modified_at": now, + } + self._conn.execute(""" + INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, embedding_json, created_at, modified_at) + VALUES (:id, :content, :metadata_json, :correctness_json, :links_json, :hierarchy_json, :embedding_json, :created_at, :modified_at) + ON CONFLICT(id) DO UPDATE SET + content=excluded.content, + metadata_json=excluded.metadata_json, + correctness_json=excluded.correctness_json, + links_json=excluded.links_json, + hierarchy_json=excluded.hierarchy_json, + embedding_json=excluded.embedding_json, + modified_at=excluded.modified_at + """, data) + + # FTS-Index aktualisieren (DELETE + INSERT, kein UPSERT für virtuelle Tabellen) + tags = " ".join(engram.metadata.get("tags", [])) + source = engram.metadata.get("source", "") + rowid = self._conn.execute("SELECT rowid FROM engrams WHERE id=?", (str(engram.id),)).fetchone() + if rowid: + self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],)) + self._conn.execute(""" + INSERT INTO engrams_fts(rowid, content, tags, source) + VALUES ((SELECT rowid FROM engrams WHERE id=:id), :content, :tags, :source) + """, {"id": str(engram.id), "content": engram.content, "tags": tags, "source": source}) + + # Links speichern + self._conn.execute("DELETE FROM engrams_links WHERE from_id=?", (str(engram.id),)) + for link in engram.links: + self._conn.execute( + "INSERT OR IGNORE INTO engrams_links (from_id, to_id) VALUES (?, ?)", + (str(engram.id), str(link)) + ) + + self._conn.commit() + return engram + + def get(self, engram_id: str) -> Optional[Engram]: + """Lädt ein Engramm anhand seiner ID.""" + row = self._conn.execute( + "SELECT * FROM engrams WHERE id=?", (engram_id,) + ).fetchone() + if not row: + return None + return self._row_to_engram(row) + + def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]: + """Lädt alle Engramme (paginiert).""" + rows = self._conn.execute( + "SELECT * FROM engrams ORDER BY created_at DESC LIMIT ? OFFSET ?", + (limit, offset) + ).fetchall() + return [self._row_to_engram(r) for r in rows] + + def delete(self, engram_id: str) -> bool: + """Löscht ein Engramm und alle Verknüpfungen.""" + rowid = self._conn.execute( + "SELECT rowid FROM engrams WHERE id=?", (engram_id,) + ).fetchone() + if not rowid: + return False + self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],)) + self._conn.execute("DELETE FROM engrams_links WHERE from_id=? OR to_id=?", (engram_id, engram_id)) + self._conn.execute("DELETE FROM engrams WHERE id=?", (engram_id,)) + self._conn.commit() + return True + + def count(self) -> int: + """Anzahl der gespeicherten Engramme.""" + row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone() + return row[0] if row else 0 + + # ---- Search ---- + + def search_text(self, query: str, limit: int = 10) -> List[Engram]: + """Full-Text-Suche über Engramm-Inhalt via SQLite FTS5 (OR-Verknüpfung).""" + # FTS5-Syntax: Wörter mit OR verbinden für bessere Ergebnisse + words = [w.strip() for w in query.replace("'", "''").split() if w.strip()] + safe_query = " OR ".join(words) if len(words) > 1 else (words[0] if words else "*") + sql = """ + SELECT e.* FROM engrams e + JOIN engrams_fts fts ON e.rowid = fts.rowid + WHERE engrams_fts MATCH ? + ORDER BY rank + LIMIT ? + """ + rows = self._conn.execute(sql, (safe_query, limit)).fetchall() + return [self._row_to_engram(r) for r in rows] + + def search_tag(self, tag: str, limit: int = 50) -> List[Engram]: + """Suche nach Tag (JSON-contains).""" + # Einfache Substring-Suche in JSON + rows = self._conn.execute( + "SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?", + (f'%"{tag}"%', limit) + ).fetchall() + return [self._row_to_engram(r) for r in rows] + + def search_source(self, source: str, limit: int = 50) -> List[Engram]: + """Suche nach Quelle.""" + rows = self._conn.execute( + "SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?", + (f'%"source": "{source}"%', limit) + ).fetchall() + return [self._row_to_engram(r) for r in rows] + + # ---- Stats ---- + + def stats(self) -> Dict[str, Any]: + """Grundlegende Statistiken über den Store.""" + total = self.count() + confirmed = self._conn.execute( + "SELECT COUNT(*) FROM engrams WHERE correctness_json LIKE '%\"confirmed\": true%'" + ).fetchone()[0] + sources = {} + for row in self._conn.execute( + "SELECT metadata_json FROM engrams" + ).fetchall(): + meta = json.loads(row["metadata_json"]) + src = meta.get("source", "unknown") + sources[src] = sources.get(src, 0) + 1 + + return { + "total_engrams": total, + "confirmed": confirmed, + "unconfirmed": total - confirmed, + "sources": sources, + "db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0, + } + + # ---- Backup / Export ---- + + def export_jsonl(self, path: str) -> int: + """Exportiert alle Engramme als JSONL (eine Zeile pro Engramm).""" + count = 0 + with open(path, "w", encoding="utf-8") as f: + for row in self._conn.execute("SELECT * FROM engrams"): + eg = self._row_to_engram(row) + f.write(json.dumps(eg.to_dict(), ensure_ascii=False) + "\n") + count += 1 + return count + + def import_jsonl(self, path: str) -> int: + """Importiert Engramme aus JSONL.""" + count = 0 + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + eg = Engram.from_json(line) + self.save(eg) + count += 1 + return count + + # ---- Helpers ---- + + def _row_to_engram(self, row: sqlite3.Row) -> Engram: + d = { + "id": row["id"], + "content": row["content"], + "metadata": json.loads(row["metadata_json"]), + "correctness": json.loads(row["correctness_json"]), + "links": json.loads(row["links_json"]), + "hierarchy": json.loads(row["hierarchy_json"]), + } + emb = row["embedding_json"] + if emb: + d["embedding"] = json.loads(emb) + return Engram.from_dict(d) + + def close(self) -> None: + self._conn.close() + + +def _now() -> str: + from datetime import datetime, timezone + return datetime.now(timezone.utc).isoformat()