Files
second-brain/src/store.py
Otto 6b0cf5889f fix(store): escape FTS5 special characters in search_text()
- FTS5 crashes on dots (IP addresses) and hyphens (dates)
- Add regex to strip non-alphanumeric chars before FTS5 MATCH
- Fixes: fts5 syntax error near '.' and no such column: 05

Files changed: src/store.py
2026-05-27 17:54:51 +02:00

275 lines
10 KiB
Python

"""
SQLite-basierter Engramm-Store.
Keine externen Abhängigkeiten außer sqlite3 (stdlib).
"""
import json
import sqlite3
import os
import re
from pathlib import Path
from typing import List, Optional, Dict, Any
from uuid import UUID
from .engram import Engram
class EngramStore:
"""
Persistenter Engramm-Speicher mit vollem Text-Index.
Erstelle Instanz:
store = EngramStore("/pfad/zur/db.sqlite")
"""
def __init__(self, db_path: str):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
def _init_schema(self) -> None:
"""Erstellt Tabellen falls nicht vorhanden."""
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS engrams (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
metadata_json TEXT NOT NULL,
correctness_json TEXT NOT NULL,
links_json TEXT NOT NULL,
hierarchy_json TEXT NOT NULL,
embedding_json TEXT,
created_at TEXT NOT NULL,
modified_at TEXT NOT NULL
);
CREATE VIRTUAL TABLE IF NOT EXISTS engrams_fts USING fts5(
content,
tags,
source,
content_rowid=rowid,
tokenize='porter'
);
CREATE TABLE IF NOT EXISTS engrams_links (
from_id TEXT NOT NULL,
to_id TEXT NOT NULL,
PRIMARY KEY (from_id, to_id)
);
""")
self._conn.commit()
# ---- CRUD ----
def save(self, engram: Engram) -> Engram:
"""Speichert oder aktualisiert ein Engramm."""
now = _now()
data = {
"id": str(engram.id),
"content": engram.content,
"metadata_json": json.dumps(engram.metadata, ensure_ascii=False),
"correctness_json": json.dumps(engram.correctness.to_dict(), ensure_ascii=False),
"links_json": json.dumps([str(l) for l in engram.links], ensure_ascii=False),
"hierarchy_json": json.dumps(engram.hierarchy, ensure_ascii=False),
"embedding_json": json.dumps(engram.embedding, ensure_ascii=False) if engram.embedding else None,
"created_at": engram.metadata.get("created", now),
"modified_at": now,
}
self._conn.execute("""
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, embedding_json, created_at, modified_at)
VALUES (:id, :content, :metadata_json, :correctness_json, :links_json, :hierarchy_json, :embedding_json, :created_at, :modified_at)
ON CONFLICT(id) DO UPDATE SET
content=excluded.content,
metadata_json=excluded.metadata_json,
correctness_json=excluded.correctness_json,
links_json=excluded.links_json,
hierarchy_json=excluded.hierarchy_json,
embedding_json=excluded.embedding_json,
modified_at=excluded.modified_at
""", data)
# FTS-Index aktualisieren (DELETE + INSERT, kein UPSERT für virtuelle Tabellen)
tags = " ".join(engram.metadata.get("tags", []))
source = engram.metadata.get("source", "")
rowid = self._conn.execute("SELECT rowid FROM engrams WHERE id=?", (str(engram.id),)).fetchone()
if rowid:
self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],))
self._conn.execute("""
INSERT INTO engrams_fts(rowid, content, tags, source)
VALUES ((SELECT rowid FROM engrams WHERE id=:id), :content, :tags, :source)
""", {"id": str(engram.id), "content": engram.content, "tags": tags, "source": source})
# Links speichern
self._conn.execute("DELETE FROM engrams_links WHERE from_id=?", (str(engram.id),))
for link in engram.links:
self._conn.execute(
"INSERT OR IGNORE INTO engrams_links (from_id, to_id) VALUES (?, ?)",
(str(engram.id), str(link))
)
self._conn.commit()
return engram
def get(self, engram_id: str) -> Optional[Engram]:
"""Lädt ein Engramm anhand seiner ID."""
row = self._conn.execute(
"SELECT * FROM engrams WHERE id=?", (engram_id,)
).fetchone()
if not row:
return None
return self._row_to_engram(row)
def get_all(self, limit: int = 1000, offset: int = 0) -> List[Engram]:
"""Lädt alle Engramme (paginiert)."""
rows = self._conn.execute(
"SELECT * FROM engrams ORDER BY created_at DESC LIMIT ? OFFSET ?",
(limit, offset)
).fetchall()
return [self._row_to_engram(r) for r in rows]
def get_modified_since(self, iso_ts: str, limit: int = 5000) -> List[Engram]:
"""Gibt Engramme zurück, deren `modified_at` nach `iso_ts` liegt."""
rows = self._conn.execute(
"SELECT * FROM engrams WHERE modified_at > ? ORDER BY modified_at ASC LIMIT ?",
(iso_ts, limit),
).fetchall()
return [self._row_to_engram(r) for r in rows]
def delete(self, engram_id: str) -> bool:
"""Löscht ein Engramm und alle Verknüpfungen."""
rowid = self._conn.execute(
"SELECT rowid FROM engrams WHERE id=?", (engram_id,)
).fetchone()
if not rowid:
return False
self._conn.execute("DELETE FROM engrams_fts WHERE rowid=?", (rowid[0],))
self._conn.execute("DELETE FROM engrams_links WHERE from_id=? OR to_id=?", (engram_id, engram_id))
self._conn.execute("DELETE FROM engrams WHERE id=?", (engram_id,))
self._conn.commit()
return True
def count(self) -> int:
"""Anzahl der gespeicherten Engramme."""
row = self._conn.execute("SELECT COUNT(*) FROM engrams").fetchone()
return row[0] if row else 0
# ---- Search ----
def search_text(self, query: str, limit: int = 10) -> List[Engram]:
"""Full-Text-Suche über Engramm-Inhalt via SQLite FTS5 (OR-Verknüpfung)."""
# FTS5-Syntax: Wörter mit OR verbinden für bessere Ergebnisse
words = []
for word in query.split():
# Nur alphanumerische Zeichen als FTS5-Tokens akzeptieren
clean_word = re.sub(r'[^a-zA-Z0-9]+', '', word)
if clean_word:
words.append(clean_word)
safe_query = " OR ".join(words) if len(words) > 1 else (words[0] if words else "*")
sql = """
SELECT e.* FROM engrams e
JOIN engrams_fts fts ON e.rowid = fts.rowid
WHERE engrams_fts MATCH ?
ORDER BY rank
LIMIT ?
"""
rows = self._conn.execute(sql, (safe_query, limit)).fetchall()
return [self._row_to_engram(r) for r in rows]
def search_tag(self, tag: str, limit: int = 50) -> List[Engram]:
"""Suche nach Tag (JSON-contains)."""
# Einfache Substring-Suche in JSON
rows = self._conn.execute(
"SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?",
(f'%"{tag}"%', limit)
).fetchall()
return [self._row_to_engram(r) for r in rows]
def search_source(self, source: str, limit: int = 50) -> List[Engram]:
"""Suche nach Quelle."""
rows = self._conn.execute(
"SELECT * FROM engrams WHERE metadata_json LIKE ? ORDER BY created_at DESC LIMIT ?",
(f'%"source": "{source}"%', limit)
).fetchall()
return [self._row_to_engram(r) for r in rows]
# ---- Stats ----
def stats(self) -> Dict[str, Any]:
"""Grundlegende Statistiken über den Store."""
total = self.count()
confirmed = self._conn.execute(
"SELECT COUNT(*) FROM engrams WHERE correctness_json LIKE '%\"confirmed\": true%'"
).fetchone()[0]
sources = {}
for row in self._conn.execute(
"SELECT metadata_json FROM engrams"
).fetchall():
meta = json.loads(row["metadata_json"])
src = meta.get("source", "unknown")
sources[src] = sources.get(src, 0) + 1
return {
"total_engrams": total,
"confirmed": confirmed,
"unconfirmed": total - confirmed,
"sources": sources,
"db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0,
}
# ---- Backup / Export ----
def export_jsonl(self, path: str) -> int:
"""Exportiert alle Engramme als JSONL (eine Zeile pro Engramm)."""
count = 0
with open(path, "w", encoding="utf-8") as f:
for row in self._conn.execute("SELECT * FROM engrams"):
eg = self._row_to_engram(row)
f.write(json.dumps(eg.to_dict(), ensure_ascii=False) + "\n")
count += 1
return count
def import_jsonl(self, path: str) -> int:
"""Importiert Engramme aus JSONL."""
count = 0
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
eg = Engram.from_json(line)
self.save(eg)
count += 1
return count
# ---- Helpers ----
def _row_to_engram(self, row: sqlite3.Row) -> Engram:
d = {
"id": row["id"],
"content": row["content"],
"metadata": json.loads(row["metadata_json"]),
"correctness": json.loads(row["correctness_json"]),
"links": json.loads(row["links_json"]),
"hierarchy": json.loads(row["hierarchy_json"]),
}
# Keep Engram metadata timestamps aligned with DB columns so downstream
# consumers (e.g. vector indexing watermarks) can rely on them.
try:
d["metadata"]["created"] = row["created_at"]
d["metadata"]["modified"] = row["modified_at"]
except Exception:
pass
emb = row["embedding_json"]
if emb:
d["embedding"] = json.loads(emb)
return Engram.from_dict(d)
def close(self) -> None:
self._conn.close()
def _now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()