feat(complete): Phase 2-5 - Vektor-Embeddings, ChromaDB, Neural Scorer, Streamlit Dashboard, Graph-Visualisierung

This commit is contained in:
2026-05-25 09:43:04 +02:00
parent 08d21f8087
commit 59f4059cd8
6 changed files with 842 additions and 2 deletions

174
src/app_dashboard.py Normal file
View File

@@ -0,0 +1,174 @@
"""
app_dashboard.py - Streamlit-Dashboard für Second Brain.
Seiten: Übersicht, Engramme, Suche, Graph, Stats.
"""
import json
import sys
from pathlib import Path
import streamlit as st
sys.path.insert(0, str(Path(__file__).resolve().parent))
from src.engram import Engram
from src.store import EngramStore
from src.chroma_store import ChromaStore
from src.retriever import Retriever
from src.neural_scorer import NeuralScorer
_DEFAULT_DB = Path(__file__).resolve().parent.parent / "data" / "brain.sqlite"
_DB_PATH = str(st.secrets.get("db_path", _DEFAULT_DB) if hasattr(st, "secrets") else _DEFAULT_DB)
def _store():
return EngramStore(_DB_PATH)
def _chroma():
p = Path(_DB_PATH).parent / "chroma"
return ChromaStore(str(p))
def _retriever():
return Retriever(_store(), _chroma())
def _scorer():
return NeuralScorer()
st.set_page_config(page_title="Second Brain Dashboard", layout="wide")
st.title("🧠 Second Brain Dashboard")
page = st.sidebar.radio("Seite", ["Übersicht", "Engramme", "Suche", "Graph", "Stats", "Neural Scorer"])
if page == "Übersicht":
store = _store()
engrams = store.get_all()
confirmed = sum(1 for e in engrams if e.correctness.confirmed)
unconfirmed = len(engrams) - confirmed
avg_conf = sum(e.compute_confidence() for e in engrams) / max(1, len(engrams))
c1, c2, c3, c4 = st.columns(4)
c1.metric("Total", len(engrams))
c2.metric("Confirmed", confirmed)
c3.metric("Pending", unconfirmed)
c4.metric("Avg Confidence", f"{avg_conf:.2f}")
st.subheader("Recent Engramme")
for eg in sorted(engrams, key=lambda e: e.metadata.get("modified", ""), reverse=True)[:5]:
with st.expander(f"{eg.content[:80]}..."):
st.write(f"Source: {eg.metadata.get('source')}")
st.write(f"Confidence: {eg.compute_confidence():.2f}")
st.write(f"Confirmed: {'' if eg.correctness.confirmed else ''}")
st.write("Tags:", ", ".join(eg.metadata.get("tags", [])))
elif page == "Engramme":
store = _store()
st.subheader("Alle Engramme")
tag_filter = st.text_input("Filter tags")
source_filter = st.selectbox("Source", ["alle", "user", "agent", "web", "file", "system"])
for eg in store.get_all():
tags = eg.metadata.get("tags", [])
src = eg.metadata.get("source", "")
if tag_filter and tag_filter not in tags:
continue
if source_filter != "alle" and source_filter != src:
continue
with st.expander(f"{eg.content[:100]}"):
st.write("Confidence:", f"{eg.compute_confidence():.2f}")
st.write("Tags:", ", ".join(tags))
st.write("Source:", src)
c1, c2 = st.columns(2)
if c1.button("✅ Confirm", key=f"conf_{eg.id}"):
eg.correctness.confirm("user")
store.save(eg)
st.success("Confirmed!")
if c2.button("❌ Reject", key=f"rej_{eg.id}"):
eg.correctness.reject("user")
store.save(eg)
st.warning("Rejected.")
elif page == "Suche":
st.subheader("Semantic + Keyword Suche")
query = st.text_input("Query")
mode = st.radio("Modus", ["Hybrid", "Keyword", "Semantic"])
if st.button("Suchen") and query:
ret = _retriever()
if mode == "Hybrid":
results = ret.hybrid_retrieve(query, limit=10)
elif mode == "Semantic":
results = ret.semantic_retrieve(query, limit=10)
else:
results = ret.retrieve(query, limit=10)
for r in results:
eg = r["engram"]
with st.container():
st.markdown(f"**{eg.content[:200]}...**")
st.write(f"Score: {r['score']:.3f} | Match: {r['match_type']} | Conf: {eg.compute_confidence():.2f}")
c1, c2 = st.columns(2)
if c1.button("✅ Confirm", key=f"sc_{eg.id}"):
eg.correctness.confirm("user")
store = _store()
store.save(eg)
st.success("Confirmed")
if c2.button("❌ Reject", key=f"sr_{eg.id}"):
eg.correctness.reject("user")
store = _store()
store.save(eg)
st.warning("Rejected")
elif page == "Graph":
st.subheader("Graph-Visualisierung")
graph_html_path = Path(_DB_PATH).parent / "graph_view.html"
if graph_html_path.exists():
with open(graph_html_path, "r", encoding="utf-8") as f:
html = f.read()
# iframe
st.components.v1.html(html, height=800, scrolling=True)
else:
st.info("Graph nicht generiert. Führe `python -m src.cli graph` aus.")
if st.button("Graph generieren"):
from src.graph_view import generate_graph_html
store = _store()
path = generate_graph_html(store, str(Path(_DB_PATH).parent / "graph_view.html"))
st.success(f"Graph generiert: {path}")
elif page == "Stats":
store = _store()
engrams = store.get_all()
st.json({
"total": len(engrams),
"confirmed": sum(1 for e in engrams if e.correctness.confirmed),
"pending": sum(1 for e in engrams if not e.correctness.confirmed),
"sources": {s: sum(1 for e in engrams if e.metadata.get("source") == s) for s in {e.metadata.get("source") for e in engrams}},
"tags": {t: sum(1 for e in engrams for t2 in e.metadata.get("tags", []) if t2 == t) for t in {t for e in engrams for t in e.metadata.get("tags", [])}},
"avg_confidence": sum(e.compute_confidence() for e in engrams) / max(1, len(engrams)),
})
elif page == "Neural Scorer":
st.subheader("Neural Scorer Training")
scorer = _scorer()
store = _store()
engrams = store.get_all()
labeled = [e for e in engrams if e.correctness.confirmed or e.correctness.rejections > 0]
st.write(f"Labelled Engramme: {len(labeled)}")
if st.button("Train Neural Scorer"):
if len(labeled) < 2:
st.error("Mindestens 2 labelierte Engramme nötig (confirm + reject).")
else:
result = scorer.train(labeled, epochs=30)
st.json(result)
st.success("Training abgeschlossen!")
if st.button("Predict All"):
for eg in engrams[:10]:
pred = scorer.predict(eg)
st.write(f"{eg.content[:60]}... → {pred:.3f}")

119
src/chroma_store.py Normal file
View File

@@ -0,0 +1,119 @@
"""
chroma_store.py - ChromaDB Vektor-Speicher für semantische Suche.
Erweitert den SQLite-Store um Vektor-ähnlichkeit.
"""
import json
from pathlib import Path
from typing import List, Optional, Dict, Any
from uuid import UUID
import chromadb
from chromadb.config import Settings
from .engram import Engram
from .embedder import encode
class ChromaStore:
"""
ChromaDB-basierter Vektor-Speicher.
Speichert Engramme als Vektoren mit Metadaten.
"""
def __init__(self, path: str = "data/chroma"):
self.path = Path(path)
self.path.mkdir(parents=True, exist_ok=True)
self.client = chromadb.PersistentClient(path=str(self.path))
self.collection = self.client.get_or_create_collection(
name="engrams",
metadata={"hnsw:space": "cosine"},
)
def _build_metadata(self, engram: Engram) -> Dict[str, Any]:
"""Serialisierte Metadaten für ChromaDB (nur primitives)."""
meta = engram.metadata.copy()
# ChromaDB akzeptiert nur Listen/Strings/Numbers/Bools
tags = meta.pop("tags", [])
if isinstance(tags, list):
meta["tags"] = ",".join(str(t) for t in tags)
meta.setdefault("source", "agent")
meta.setdefault("confidence", 0.5)
meta.setdefault("correctness", "unconfirmed")
# Hierarchy als JSON-String
if "hierarchy" in meta:
meta["hierarchy"] = json.dumps(meta["hierarchy"])
return meta
def add(self, engram: Engram, embedding: Optional[List[float]] = None) -> None:
"""Engramm mit Embedding zur Vektor-DB hinzufügen."""
eid = str(engram.id)
emb = embedding or engram.embedding
if emb is None:
emb = encode(engram.content)
if emb is None:
return
meta = self._build_metadata(engram)
meta["content"] = engram.content[:1000] # Chroma likes short strings
self.collection.add(
ids=[eid],
embeddings=[emb],
metadatas=[meta],
)
def update(self, engram: Engram, embedding: Optional[List[float]] = None) -> None:
"""Engramm aktualisieren."""
eid = str(engram.id)
emb = embedding or engram.embedding
if emb is None:
emb = encode(engram.content)
if emb is None:
return
meta = self._build_metadata(engram)
self.collection.update(
ids=[eid],
embeddings=[emb],
metadatas=[meta],
)
def delete(self, eid: str) -> None:
"""Engramm aus Vektor-DB entfernen."""
self.collection.delete(ids=[eid])
def query(self, text: str, top_k: int = 5, filters: Optional[Dict] = None) -> List[Dict[str, Any]]:
"""Semantische Suche."""
emb = encode(text)
if emb is None:
return []
results = self.collection.query(
query_embeddings=[emb],
n_results=top_k,
where=filters,
include=["metadatas", "distances", "documents"],
)
out = []
for i in range(len(results["ids"][0])):
out.append({
"id": results["ids"][0][i],
"distance": results["distances"][0][i],
"metadata": results["metadatas"][0][i],
})
return out
def get_by_id(self, eid: str) -> Optional[Dict[str, Any]]:
"""Einzelnes Engramm via ID."""
try:
r = self.collection.get(ids=[eid], include=["embeddings", "metadatas"])
if r and r["ids"]:
return {
"id": r["ids"][0],
"embedding": r["embeddings"][0] if "embeddings" in r else None,
"metadata": r["metadatas"][0] if "metadatas" in r else {},
}
except Exception as e:
print(f"[chroma_store] get_by_id failed: {e}")
return None
def count(self) -> int:
return self.collection.count()

116
src/embedder.py Normal file
View File

@@ -0,0 +1,116 @@
"""
embedder.py - Sentence-Transformer Embedding-Modul.
Offlined-fähig, cached auf Disk.
"""
import json
import hashlib
import os
from pathlib import Path
from typing import List, Optional
import numpy as np
from sentence_transformers import SentenceTransformer
_MODEL_NAME = "all-MiniLM-L6-v2"
_EMBED_DIM = 384
_CACHE_DIR = Path(__file__).resolve().parent.parent / "data" / "embedding_cache"
__model: Optional[SentenceTransformer] = None
def _get_model() -> SentenceTransformer:
global __model
if __model is None:
__model = SentenceTransformer(_MODEL_NAME)
return __model
def _text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _cache_path(h: str) -> Path:
_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return _CACHE_DIR / f"{h}.json"
def encode(text: str, cache: bool = True, normalize: bool = True) -> Optional[List[float]]:
"""Embeddiert einen Text. Gibt None zurück wenn Modell nicht verfügbar."""
try:
h = _text_hash(text)
cp = _cache_path(h)
if cache and cp.exists():
with open(cp, "r", encoding="utf-8") as f:
data = json.load(f)
return data["embedding"]
model = _get_model()
vec = model.encode(text, convert_to_numpy=True)
if normalize:
norm = np.linalg.norm(vec)
if norm > 0:
vec = vec / norm
vec_list = vec.tolist()
if cache:
with open(cp, "w", encoding="utf-8") as f:
json.dump({"text": text, "embedding": vec_list}, f, ensure_ascii=False)
return vec_list
except Exception as e:
print(f"[embedder] Encoding failed: {e}")
return None
def encode_batch(texts: List[str], cache: bool = True, normalize: bool = True) -> List[Optional[List[float]]]:
"""Embeddiert mehrere Texte."""
try:
results: List[Optional[List[float]]] = []
to_encode: List[str] = []
idx_map: List[int] = []
for i, text in enumerate(texts):
h = _text_hash(text)
cp = _cache_path(h)
if cache and cp.exists():
with open(cp, "r", encoding="utf-8") as f:
data = json.load(f)
results.append(data["embedding"])
else:
results.append(None)
to_encode.append(text)
idx_map.append(i)
if to_encode:
model = _get_model()
vecs = model.encode(to_encode, convert_to_numpy=True)
if normalize:
norms = np.linalg.norm(vecs, axis=1, keepdims=True)
norms[norms == 0] = 1
vecs = vecs / norms
for m, vec in zip(idx_map, vecs):
vec_list = vec.tolist()
results[m] = vec_list
if cache:
h = _text_hash(texts[m])
cp = _cache_path(h)
with open(cp, "w", encoding="utf-8") as f:
json.dump({"text": texts[m], "embedding": vec_list}, f, ensure_ascii=False)
return results
except Exception as e:
print(f"[embedder] Batch encoding failed: {e}")
return [None] * len(texts)
def similar(query: str, candidates: List[str], top_k: int = 5) -> List[tuple]:
"""Gibt die top-k besten Kandidaten für eine Query zurück."""
q_vec = np.array(encode(query))
c_vecs = encode_batch(candidates)
scores = []
for i, c_vec in enumerate(c_vecs):
if c_vec is not None:
c_arr = np.array(c_vec)
score = float(np.dot(q_vec, c_arr))
scores.append((i, score))
scores.sort(key=lambda x: x[1], reverse=True)
return [(candidates[i], s) for i, s in scores[:top_k]]

184
src/graph_view.py Normal file
View File

@@ -0,0 +1,184 @@
"""
graph_view.py - Generiert interaktive Graph-Visualisierung (Cytoscape.js).
"""
import json
from pathlib import Path
from typing import Optional
from .store import EngramStore
_HTML_TEMPLATE = """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Second Brain Graph</title>
<script src="https://unpkg.com/cytoscape@3.26.0/dist/cytoscape.min.js"></script>
<style>
body {{ margin:0; padding:0; background:#1a1a2e; color:#eee; font-family: sans-serif; }}
#cy {{ width: 100vw; height: 100vh; }}
#info {{ position: absolute; top: 10px; left: 10px; background: rgba(0,0,0,0.8); padding: 15px; border-radius: 8px; max-width: 300px; }}
#controls {{ position: absolute; bottom: 10px; left: 10px; }}
.btn {{ background: #e94560; border: none; color: white; padding: 8px 16px; border-radius: 4px; cursor: pointer; margin-right: 5px; }}
.filter {{ background: #0f3460; border: none; color: white; padding: 6px 12px; border-radius: 4px; margin-right: 5px; cursor: pointer; }}
</style>
</head>
<body>
<div id="info">
<h3>🧠 Second Brain Graph</h3>
<p>Knoten: Engramme (Farbe = Confidence)</p>
<p>Grün=hoch, Gelb=mittel, Rot=niedrig</p>
<p>Links: Verknüpfungen</p>
<p><strong>Klicke</strong> für Details</p>
</div>
<div id="controls">
<button class="btn" onclick="cy.fit()">Fit</button>
<button class="filter" onclick="filterHigh()">Nur High-Conf</button>
<button class="filter" onclick="filterConfirmed()">Nur Confirmed</button>
<button class="filter" onclick="showAll()">Alle</button>
</div>
<div id="cy"></div>
<script>
var cy = cytoscape({{
container: document.getElementById('cy'),
elements: {elements_json},
style: [
{{ selector: 'node', style: {{
'background-color': 'data(color)',
'width': 'data(size)',
'height': 'data(size)',
'label': 'data(label)',
'color': '#fff',
'font-size': '10px',
'text-outline-color': '#000',
'text-outline-width': 1,
'border-width': 2,
'border-color': '#333'
}} }},
{{ selector: 'edge', style: {{
'width': 2,
'line-color': '#555',
'target-arrow-color': '#555',
'target-arrow-shape': 'triangle',
'curve-style': 'bezier'
}} }},
{{ selector: '.highlighted', style: {{
'border-color': '#e94560',
'border-width': 4
}} }}
],
layout: {{
name: 'cose',
idealEdgeLength: 100,
nodeOverlap: 20,
refresh: 20,
fit: true,
padding: 30,
randomize: false,
componentSpacing: 100,
nodeRepulsion: 400000,
edgeElasticity: 100,
nestingFactor: 5,
gravity: 80,
numIter: 1000,
initialTemp: 200,
coolingFactor: 0.95,
minTemp: 1.0
}}
}});
cy.on('tap', 'node', function(evt){{
var node = evt.target;
var info = document.getElementById('info');
info.innerHTML = '<h3>#' + node.id() + '</h3>'
+ '<p><strong>' + node.data('title') + '</strong></p>'
+ '<p>Confidence: ' + node.data('confidence').toFixed(2) + '</p>'
+ '<p>Confirmed: ' + (node.data('confirmed') ? '' : '') + '</p>'
+ '<p>Source: ' + node.data('source') + '</p>'
+ '<p>Tags: ' + node.data('tags') + '</p>';
}});
function filterHigh(){{
cy.elements().hide();
cy.nodes().filter(function(n){{ return n.data('confidence') >= 0.7; }}).show();
cy.edges().filter(function(e){{ return e.source().visible() && e.target().visible(); }}).show();
}}
function filterConfirmed(){{
cy.elements().hide();
cy.nodes().filter(function(n){{ return n.data('confirmed'); }}).show();
cy.edges().filter(function(e){{ return e.source().visible() && e.target().visible(); }}).show();
}}
function showAll(){{
cy.elements().show();
}}
</script>
</body>
</html>
"""
def _confidence_color(conf: float) -> str:
if conf >= 0.8:
return "#27ae60" # Green
elif conf >= 0.5:
return "#f39c12" # Yellow
else:
return "#e74c3c" # Red
def _node_size(access_count: int) -> float:
return max(20, min(60, 20 + access_count * 5))
def generate_graph_html(store: EngramStore, output_path: str) -> str:
"""Generiert interaktive HTML-Graph-Visualisierung."""
engrams = store.get_all()
nodes = []
edges = []
node_ids = set()
for eg in engrams:
eid = str(eg.id)
conf = eg.compute_confidence()
color = _confidence_color(conf)
size = _node_size(eg.metadata.get("access_count", 0))
tags = ", ".join(eg.metadata.get("tags", []))
nodes.append({
"data": {
"id": eid,
"label": eg.content[:40] + ("..." if len(eg.content) > 40 else ""),
"title": eg.content,
"color": color,
"size": size,
"confidence": conf,
"confirmed": eg.correctness.confirmed,
"source": eg.metadata.get("source", "?"),
"tags": tags,
}
})
node_ids.add(eid)
for lid in eg.links:
lid_s = str(lid)
if lid_s in node_ids:
edges.append({
"data": {
"id": f"{eid}_{lid_s}",
"source": eid,
"target": lid_s,
}
})
elements = {"nodes": nodes, "edges": edges}
html = _HTML_TEMPLATE.format(elements_json=json.dumps(elements, ensure_ascii=False))
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
with open(out, "w", encoding="utf-8") as f:
f.write(html)
return str(out)

169
src/neural_scorer.py Normal file
View File

@@ -0,0 +1,169 @@
"""
neural_scorer.py - PyTorch Neural Confidence Net.
Trainiert sich selbst via confirm/reject Feedback.
"""
import json
import random
from pathlib import Path
from typing import List, Optional
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from .engram import Engram
from .embedder import encode, _EMBED_DIM
_DATA_DIR = Path(__file__).resolve().parent.parent / "data"
_MODEL_PATH = _DATA_DIR / "neural_scorer.pt"
_TRAIN_LOG = _DATA_DIR / "neural_train_log.jsonl"
# Embedding-Dimension + Metadaten: length_norm, age_hours, access_count, source_encoded
_INPUT_DIM = _EMBED_DIM + 4
_HIDDEN = 128
_HIDDEN2 = 64
_HIDDEN3 = 32
class ConfidenceNet(nn.Module):
def __init__(self, input_dim: int = _INPUT_DIM):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, _HIDDEN),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(_HIDDEN, _HIDDEN2),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(_HIDDEN2, _HIDDEN3),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(_HIDDEN3, 1),
nn.Sigmoid(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
class NeuralScorer:
"""
Trainierbarer Confidence Scorer.
Predict -> confirm/reject -> train -> besserer Predict.
"""
def __init__(self, model_path: Optional[str] = None):
self.device = torch.device("cpu")
self.model = ConfidenceNet().to(self.device)
self.model_path = Path(model_path) if model_path else _MODEL_PATH
self._load()
def _load(self):
if self.model_path.exists():
try:
self.model.load_state_dict(torch.load(self.model_path, map_location=self.device, weights_only=True))
self.model.eval()
print(f"[neural_scorer] Model loaded from {self.model_path}")
except Exception as e:
print(f"[neural_scorer] Could not load model: {e}")
def _save(self):
self.model_path.parent.mkdir(parents=True, exist_ok=True)
torch.save(self.model.state_dict(), self.model_path)
def _encode_meta(self, engram: Engram, now_sec: Optional[float] = None) -> List[float]:
m = engram.metadata
now = now_sec or __import__("time").time()
created = m.get("created", "")
try:
from datetime import datetime
dt = datetime.fromisoformat(created)
age_hours = (now - dt.timestamp()) / 3600.0 if created else 0.0
except Exception:
age_hours = 0.0
access_count = float(m.get("access_count", 0))
source = m.get("source", "agent")
source_map = {"user": 1.0, "agent": 0.8, "web": 0.6, "file": 0.7, "system": 0.9}
source_enc = source_map.get(source, 0.5)
content_len = min(len(engram.content) / 1000.0, 1.0)
return [content_len, age_hours, access_count, source_enc]
def predict(self, engram: Engram) -> Optional[float]:
"""Gibt einen Confidence-Score zwischen 0.0 und 1.0 zurück."""
self.model.eval()
emb = engram.embedding or encode(engram.content)
if emb is None:
return None
meta = self._encode_meta(engram)
vec = emb + meta
x = torch.tensor([vec], dtype=torch.float32, device=self.device)
with torch.no_grad():
out = self.model(x)
return float(out.item())
def _build_training_data(self, engrams: List[Engram]) -> tuple:
from datetime import datetime
now = __import__("time").time()
X: List[List[float]] = []
Y: List[float] = []
for eg in engrams:
emb = eg.embedding or encode(eg.content)
if emb is None:
continue
meta = self._encode_meta(eg, now)
vec = emb + meta
if eg.correctness.confirmed:
label = 1.0
elif eg.correctness.rejections > 0:
label = 0.0
else:
label = eg.correctness.score()
if label == 0.5:
continue
X.append(vec)
Y.append(label)
return X, Y
def train(self, engrams: List[Engram], epochs: int = 20, batch_size: int = 16) -> dict:
"""Trainiert auf confirm/reject Feedback."""
X, Y = self._build_training_data(engrams)
if not X:
return {"error": "No labeled training data available"}
self.model.train()
dataset = TensorDataset(
torch.tensor(X, dtype=torch.float32),
torch.tensor(Y, dtype=torch.float32).unsqueeze(1),
)
dataloader = DataLoader(dataset, batch_size=min(batch_size, len(X)), shuffle=True)
optimizer = optim.Adam(self.model.parameters(), lr=0.001, weight_decay=1e-5)
criterion = nn.BCELoss()
losses = []
for epoch in range(epochs):
epoch_loss = 0.0
for xb, yb in dataloader:
optimizer.zero_grad()
pred = self.model(xb)
loss = criterion(pred, yb)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=5.0)
optimizer.step()
epoch_loss += loss.item()
losses.append(epoch_loss / max(1, len(dataloader)))
self._save()
self.model.eval()
log = {"epochs": epochs, "losses": losses, "samples": len(Y), "path": str(self.model_path)}
_TRAIN_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(_TRAIN_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(log) + "\n")
return log

View File

@@ -4,14 +4,17 @@ Phase 1: FTS-Keyword + Confidence-Reranking.
Phase 2: + Embedding + Fusion.
"""
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from .engram import Engram
from .store import EngramStore
from .chroma_store import ChromaStore
from .embedder import encode
class Retriever:
def __init__(self, store: EngramStore):
def __init__(self, store: EngramStore, chroma: Optional[ChromaStore] = None):
self.store = store
self.chroma = chroma
def retrieve(
self,
@@ -37,6 +40,81 @@ class Retriever:
results.sort(key=lambda r: r["score"], reverse=True)
return results[:limit]
def semantic_retrieve(
self,
query: str,
limit: int = 5,
min_confidence: float = 0.0,
) -> List[Dict[str, Any]]:
"""Semantische Suche via ChromaDB."""
if not self.chroma:
return []
chroma_results = self.chroma.query(query, top_k=limit * 3)
eids = [r["id"] for r in chroma_results]
results = []
for r in chroma_results:
eg = self.store.get(r["id"])
if not eg:
continue
conf = eg.compute_confidence()
if conf < min_confidence:
continue
score = 1.0 - r.get("distance", 0)
results.append({"engram": eg, "score": score, "match_type": "semantic"})
results.sort(key=lambda r: r["score"], reverse=True)
return results[:limit]
def hybrid_retrieve(
self,
query: str,
limit: int = 5,
min_confidence: float = 0.0,
keyword_weight: float = 0.4,
semantic_weight: float = 0.6,
) -> List[Dict[str, Any]]:
"""
Fusion: Keyword + Semantic + Neural Score.
"""
kw_results = {
str(r["engram"].id): r
for r in self.retrieve(query, limit=limit * 3, min_confidence=min_confidence)
}
sem_results = {
str(r["engram"].id): r
for r in self.semantic_retrieve(query, limit=limit * 3, min_confidence=min_confidence)
}
all_ids = set(kw_results.keys()) | set(sem_results.keys())
fusion: List[Dict[str, Any]] = []
for eid in all_ids:
kw = kw_results.get(eid)
sem = sem_results.get(eid)
kw_score = kw["score"] if kw else 0.0
sem_score = sem["score"] if sem else 0.0
# Weighted fusion
mixed = keyword_weight * kw_score + semantic_weight * sem_score
# Neural/Confidence bonus
eg = kw["engram"] if kw else sem["engram"]
neural_bonus = eg.compute_confidence() * 0.1
final = min(1.0, mixed + neural_bonus)
match_type = "hybrid"
if kw and sem:
match_type = "hybrid"
elif sem:
match_type = "semantic"
else:
match_type = "keyword"
fusion.append({"engram": eg, "score": final, "match_type": match_type})
fusion.sort(key=lambda r: r["score"], reverse=True)
return fusion[:limit]
def related(self, engram_id: str, limit: int = 5) -> List[Engram]:
eg = self.store.get(engram_id)
if not eg: