feat(core): Engram, Store, Retriever, CLI - Grundsystem Second Brain
- src/engram.py: Gedaechtniseinheit mit Confidence, Correctness, Links - src/store.py: SQLite FTS5 persistenter Speicher - src/retriever.py: Hybrid Suche + Reranking - src/cli.py: Kommandozeilen-Interface Issue: #1
This commit is contained in:
253
src/store.py
Normal file
253
src/store.py
Normal file
@@ -0,0 +1,253 @@
|
||||
"""
|
||||
SQLite-basierter Engramm-Store.
|
||||
Keine externen Abhängigkeiten außer sqlite3 (stdlib).
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
from uuid import UUID
|
||||
|
||||
from .engram import Engram
|
||||
|
||||
|
||||
class EngramStore:
|
||||
"""
|
||||
Persistenter Engramm-Speicher mit vollem Text-Index.
|
||||
|
||||
Erstelle Instanz:
|
||||
store = EngramStore("/pfad/zur/db.sqlite")
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str):
|
||||
self.db_path = Path(db_path)
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._init_schema()
|
||||
|
||||
def _init_schema(self) -> None:
|
||||
"""Erstellt Tabellen falls nicht vorhanden."""
|
||||
self._conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS engrams (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT NOT NULL,
|
||||
metadata_json TEXT NOT NULL,
|
||||
correctness_json TEXT NOT NULL,
|
||||
links_json TEXT NOT NULL,
|
||||
hierarchy_json TEXT NOT NULL,
|
||||
embedding_json TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
modified_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS engrams_fts USING fts5(
|
||||
content,
|
||||
tags,
|
||||
source,
|
||||
content_rowid=rowid,
|
||||
tokenize='porter'
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS engrams_links (
|
||||
from_id TEXT NOT NULL,
|
||||
to_id TEXT NOT NULL,
|
||||
PRIMARY KEY (from_id, to_id)
|
||||
);
|
||||
""")
|
||||
self._conn.commit()
|
||||
|
||||
# ---- CRUD ----
|
||||
|
||||
def save(self, engram: Engram) -> Engram:
|
||||
"""Speichert oder aktualisiert ein Engramm."""
|
||||
now = _now()
|
||||
data = {
|
||||
"id": str(engram.id),
|
||||
"content": engram.content,
|
||||
"metadata_json": json.dumps(engram.metadata, ensure_ascii=False),
|
||||
"correctness_json": json.dumps(engram.correctness.to_dict(), ensure_ascii=False),
|
||||
"links_json": json.dumps([str(l) for l in engram.links], ensure_ascii=False),
|
||||
"hierarchy_json": json.dumps(engram.hierarchy, ensure_ascii=False),
|
||||
"embedding_json": json.dumps(engram.embedding, ensure_ascii=False) if engram.embedding else None,
|
||||
"created_at": engram.metadata.get("created", now),
|
||||
"modified_at": now,
|
||||
}
|
||||
self._conn.execute("""
|
||||
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, embedding_json, created_at, modified_at)
|
||||
VALUES (:id, :content, :metadata_json, :correctness_json, :links_json, :hierarchy_json, :embedding_json, :created_at, :modified_at)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
content=excluded.content,
|
||||
metadata_json=excluded.metadata_json,
|
||||
correctness_json=excluded.correctness_json,
|
||||
links_json=excluded.links_json,
|
||||
hierarchy_json=excluded.hierarchy_json,
|
||||
embedding_json=excluded.embedding_json,
|
||||
modified_at=excluded.modified_at
|
||||
""", data)
|
||||
|
||||
# FTS-Index aktualisieren (DELETE + INSERT, kein UPSERT für virtuelle Tabellen)
|
||||
tags = " ".join(engram.metadata.get("tags", []))
|
||||
source = engram.metadata.get("source", "")
|
||||
rowid = self._conn.execute("SELECT rowid FROM engrams WHERE id=?", (str(engram.id),)).fetchone()
|
||||
if rowid:
|
||||
self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],))
|
||||
self._conn.execute("""
|
||||
INSERT INTO engrams_fts(rowid, content, tags, source)
|
||||
VALUES ((SELECT rowid FROM engrams WHERE id=:id), :content, :tags, :source)
|
||||
""", {"id": str(engram.id), "content": engram.content, "tags": tags, "source": source})
|
||||
|
||||
# Links speichern
|
||||
self._conn.execute("DELETE FROM engrams_links WHERE from_id=?", (str(engram.id),))
|
||||
for link in engram.links:
|
||||
self._conn.execute(
|
||||
"INSERT OR IGNORE INTO engrams_links (from_id, to_id) VALUES (?, ?)",
|
||||
(str(engram.id), str(link))
|
||||
)
|
||||
|
||||
self._conn.commit()
|
||||
return engram
|
||||
|
||||
def get(self, engram_id: str) -> Optional[Engram]:
|
||||
"""Lädt ein Engramm anhand seiner ID."""
|
||||
row = self._conn.execute(
|
||||
"SELECT * FROM engrams WHERE id=?", (engram_id,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return self._row_to_engram(row)
|
||||
|
||||
def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]:
|
||||
"""Lädt alle Engramme (paginiert)."""
|
||||
rows = self._conn.execute(
|
||||
"SELECT * FROM engrams ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
||||
(limit, offset)
|
||||
).fetchall()
|
||||
return [self._row_to_engram(r) for r in rows]
|
||||
|
||||
def delete(self, engram_id: str) -> bool:
|
||||
"""Löscht ein Engramm und alle Verknüpfungen."""
|
||||
rowid = self._conn.execute(
|
||||
"SELECT rowid FROM engrams WHERE id=?", (engram_id,)
|
||||
).fetchone()
|
||||
if not rowid:
|
||||
return False
|
||||
self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],))
|
||||
self._conn.execute("DELETE FROM engrams_links WHERE from_id=? OR to_id=?", (engram_id, engram_id))
|
||||
self._conn.execute("DELETE FROM engrams WHERE id=?", (engram_id,))
|
||||
self._conn.commit()
|
||||
return True
|
||||
|
||||
def count(self) -> int:
|
||||
"""Anzahl der gespeicherten Engramme."""
|
||||
row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
# ---- Search ----
|
||||
|
||||
def search_text(self, query: str, limit: int = 10) -> List[Engram]:
|
||||
"""Full-Text-Suche über Engramm-Inhalt via SQLite FTS5 (OR-Verknüpfung)."""
|
||||
# FTS5-Syntax: Wörter mit OR verbinden für bessere Ergebnisse
|
||||
words = [w.strip() for w in query.replace("'", "''").split() if w.strip()]
|
||||
safe_query = " OR ".join(words) if len(words) > 1 else (words[0] if words else "*")
|
||||
sql = """
|
||||
SELECT e.* FROM engrams e
|
||||
JOIN engrams_fts fts ON e.rowid = fts.rowid
|
||||
WHERE engrams_fts MATCH ?
|
||||
ORDER BY rank
|
||||
LIMIT ?
|
||||
"""
|
||||
rows = self._conn.execute(sql, (safe_query, limit)).fetchall()
|
||||
return [self._row_to_engram(r) for r in rows]
|
||||
|
||||
def search_tag(self, tag: str, limit: int = 50) -> List[Engram]:
|
||||
"""Suche nach Tag (JSON-contains)."""
|
||||
# Einfache Substring-Suche in JSON
|
||||
rows = self._conn.execute(
|
||||
"SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?",
|
||||
(f'%"{tag}"%', limit)
|
||||
).fetchall()
|
||||
return [self._row_to_engram(r) for r in rows]
|
||||
|
||||
def search_source(self, source: str, limit: int = 50) -> List[Engram]:
|
||||
"""Suche nach Quelle."""
|
||||
rows = self._conn.execute(
|
||||
"SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?",
|
||||
(f'%"source": "{source}"%', limit)
|
||||
).fetchall()
|
||||
return [self._row_to_engram(r) for r in rows]
|
||||
|
||||
# ---- Stats ----
|
||||
|
||||
def stats(self) -> Dict[str, Any]:
|
||||
"""Grundlegende Statistiken über den Store."""
|
||||
total = self.count()
|
||||
confirmed = self._conn.execute(
|
||||
"SELECT COUNT(*) FROM engrams WHERE correctness_json LIKE '%\"confirmed\": true%'"
|
||||
).fetchone()[0]
|
||||
sources = {}
|
||||
for row in self._conn.execute(
|
||||
"SELECT metadata_json FROM engrams"
|
||||
).fetchall():
|
||||
meta = json.loads(row["metadata_json"])
|
||||
src = meta.get("source", "unknown")
|
||||
sources[src] = sources.get(src, 0) + 1
|
||||
|
||||
return {
|
||||
"total_engrams": total,
|
||||
"confirmed": confirmed,
|
||||
"unconfirmed": total - confirmed,
|
||||
"sources": sources,
|
||||
"db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0,
|
||||
}
|
||||
|
||||
# ---- Backup / Export ----
|
||||
|
||||
def export_jsonl(self, path: str) -> int:
|
||||
"""Exportiert alle Engramme als JSONL (eine Zeile pro Engramm)."""
|
||||
count = 0
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for row in self._conn.execute("SELECT * FROM engrams"):
|
||||
eg = self._row_to_engram(row)
|
||||
f.write(json.dumps(eg.to_dict(), ensure_ascii=False) + "\n")
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def import_jsonl(self, path: str) -> int:
|
||||
"""Importiert Engramme aus JSONL."""
|
||||
count = 0
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
eg = Engram.from_json(line)
|
||||
self.save(eg)
|
||||
count += 1
|
||||
return count
|
||||
|
||||
# ---- Helpers ----
|
||||
|
||||
def _row_to_engram(self, row: sqlite3.Row) -> Engram:
|
||||
d = {
|
||||
"id": row["id"],
|
||||
"content": row["content"],
|
||||
"metadata": json.loads(row["metadata_json"]),
|
||||
"correctness": json.loads(row["correctness_json"]),
|
||||
"links": json.loads(row["links_json"]),
|
||||
"hierarchy": json.loads(row["hierarchy_json"]),
|
||||
}
|
||||
emb = row["embedding_json"]
|
||||
if emb:
|
||||
d["embedding"] = json.loads(emb)
|
||||
return Engram.from_dict(d)
|
||||
|
||||
def close(self) -> None:
|
||||
self._conn.close()
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
from datetime import datetime, timezone
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
Reference in New Issue
Block a user