4 Commits

8 changed files with 646 additions and 177 deletions

1
.streamlit/secrets.toml Normal file
View File

@@ -0,0 +1 @@
[default]

View File

@@ -1,88 +1,14 @@
# 🧠 Second Brain # Second Brain
Zweites Gehirn für OpenClaw - Langzeit- und Kurzzeitgedächtnis mit Bewertung, Proaktivität und Selbstheilung. An embeddable, offline-first memory system for AI agents with correctness tracking, neural scoring, and semantic retrieval.
## Features ## What's New (Phase 2-5)
- **Engramme** - Gedächtniseinheiten mit Confidence, Korrektheit, Verknüpfungen - **Sentence-Transformer Embeddings** (`src/embedder.py`) — Cached, offline, 384-Dim
- **SQLite + FTS5** - Lokaler Speicher ohne externe Abhängigkeiten - **ChromaDB Vector Store** (`src/chroma_store.py`) — Semantic similarity search
- **Hybrid-Retrieval** - Keyword-Suche + Reranking (später + Embeddings) - **Neural Confidence Scorer** (`src/neural_scorer.py`) — PyTorch RL net, trains on confirm/reject feedback
- **Correctness-Tracking** - Richtig/Falsch-Feedback mit Lern-Loop - **Hybrid Retrieval** (`src/retriever.py`) — Keyword + Semantic + Neural fusion
- **Proaktivität** - Heartbeat + Cron für selbständige Checks - **Streamlit Dashboard** (`src/app_dashboard.py`) — Search, confirm/reject, neural training UI
- **Fehlerheilung** - Fehler als Engramme, Mustererkennung, Auto-Fix - **Graph Visualization** (`src/graph_view.py`) — Interactive Cytoscape.js graph with confidence colors
- **Dashboard** - HTML-Visualisierung, kein Framework nötig
- **OpenClaw-Bridge** - Direkte Integration in Agent-Sessions
## Schnellstart ## Architecture
```bash
cd /root/.openclaw/workspace/second-brain
# Engramm hinzufügen
python3 -m src.cli add "Das ist wichtig" --tag wichtig --source user
# Suchen
python3 -m src.cli search "wichtig"
# Feedback geben
python3 -m src.cli confirm <id>
python3 -m src.cli reject <id>
# Dashboard öffnen
python3 -m src.dashboard
# Stats
python3 -m src.cli stats
# Backup
python3 -m src.openclaw_bridge backup
# Tests
python3 -m tests.test_core
```
## Architektur
```
┌─────────────────┐ ┌──────────────┐ ┌────────────────┐
│ OpenClaw │────▶│ Bridge │────▶│ Engram Store │
│ Agent │ │ (Session) │ │ (SQLite) │
└─────────────────┘ └──────────────┘ └────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────────┐
│ Heartbeat │ │ Retriever │
│ (Cron/Check) │ │ (FTS + RR) │
└─────────────────┘ └──────────────┘
┌──────────────┐
│ Dashboard │
│ (HTML) │
└──────────────┘
```
## Module
| Datei | Zweck |
|-------|-------|
| `src/engram.py` | Engramm-Modell, Confidence, Correctness |
| `src/store.py` | SQLite-CRUD, FTS5-Index, Backup/Export |
| `src/retriever.py` | Suche, Reranking, Verknüpfungen |
| `src/cli.py` | Kommandozeilen-Interface |
| `src/openclaw_bridge.py` | OpenClaw-Integration, Heartbeat, Fehler-Handling |
| `src/dashboard.py` | HTML-Dashboard-Generator |
## CI/CD
- **Repo**: http://192.168.6.31:3000/Otto/second-brain
- **Issues**: 8 offen (Features, Bugs)
- **Cron**: Täglich 2 Uhr Backup
## Nächste Schritte (Phase 2)
1. Vektor-Embeddings via sentence-transformers
2. ChromaDB-Store als Alternative zu SQLite
3. PyTorch Neural Scorer
4. Streamlit-Dashboard
5. Graph-Visualisierung (cytoscape.js)

View File

@@ -1,174 +1,210 @@
""" """
app_dashboard.py - Streamlit-Dashboard für Second Brain. app_dashboard.py - Streamlit-Dashboard für Second Brain.
Seiten: Übersicht, Engramme, Suche, Graph, Stats. Seiten: Übersicht, Engramme, Suche, Graph, Heal-Log, Neural Scorer.
""" """
import json import json
import sys import sys
import os
from pathlib import Path from pathlib import Path
import streamlit as st import streamlit as st
sys.path.insert(0, str(Path(__file__).resolve().parent)) _root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_root))
from src.engram import Engram from src.engram import Engram
from src.store import EngramStore from src.store import EngramStore
from src.chroma_store import ChromaStore from src.chroma_store import ChromaStore
from src.retriever import Retriever from src.retriever import Retriever
from src.neural_scorer import NeuralScorer from src.neural_scorer import NeuralScorer
from src.graph_view import generate_graph_html
from src.loop_detector import LoopDetector
from src.error_healer import ErrorHealer
_DEFAULT_DB = Path(__file__).resolve().parent.parent / "data" / "brain.sqlite" _DEFAULT_DB = _root / "data" / "brain.sqlite"
_DB_PATH = str(st.secrets.get("db_path", _DEFAULT_DB) if hasattr(st, "secrets") else _DEFAULT_DB)
@st.cache_resource
def _store(): def _store():
return EngramStore(_DB_PATH) return EngramStore(str(_DEFAULT_DB))
@st.cache_resource
def _chroma(): def _chroma():
p = Path(_DB_PATH).parent / "chroma" p = Path(str(_DEFAULT_DB)).parent / "chroma"
return ChromaStore(str(p)) return ChromaStore(str(p))
_retriever_cache = None
def _retriever(): def _retriever():
return Retriever(_store(), _chroma()) global _retriever_cache
if _retriever_cache is None:
_retriever_cache = Retriever(_store(), _chroma())
return _retriever_cache
@st.cache_resource
def _scorer(): def _scorer():
return NeuralScorer() return NeuralScorer()
st.set_page_config(page_title="Second Brain Dashboard", layout="wide") @st.cache_resource
st.title("🧠 Second Brain Dashboard") def _healer():
return ErrorHealer(_store())
page = st.sidebar.radio("Seite", ["Übersicht", "Engramme", "Suche", "Graph", "Stats", "Neural Scorer"])
st.set_page_config(page_title="Second Brain Dashboard", layout="wide")
st.title("🧠 2.Brain v0.3.1")
page = st.sidebar.radio("Seite", ["Übersicht", "Engramme", "Suche", "Graph", "Heal-Log", "Neural Scorer"])
if page == "Übersicht": if page == "Übersicht":
store = _store() store = _store()
engrams = store.get_all() engrams = store.get_all(limit=10000)
confirmed = sum(1 for e in engrams if e.correctness.confirmed) confirmed = sum(1 for e in engrams if e.correctness.confirmed)
unconfirmed = len(engrams) - confirmed unconfirmed = len(engrams) - confirmed
avg_conf = sum(e.compute_confidence() for e in engrams) / max(1, len(engrams)) avg_conf = sum(e.compute_confidence() for e in engrams) / max(1, len(engrams))
errors = [e for e in engrams if "error" in e.metadata.get("tags", [])]
c1, c2, c3, c4 = st.columns(4) c1, c2, c3, c4, c5 = st.columns(5)
c1.metric("Total", len(engrams)) c1.metric("Total", len(engrams))
c2.metric("Confirmed", confirmed) c2.metric("Confirmed", confirmed)
c3.metric("Pending", unconfirmed) c3.metric("Pending", unconfirmed)
c4.metric("Avg Confidence", f"{avg_conf:.2f}") c4.metric("Avg Confidence", f"{avg_conf:.2f}")
c5.metric("Errors", len(errors))
st.subheader("Recent Engramme") st.subheader("Recent Engramme")
for eg in sorted(engrams, key=lambda e: e.metadata.get("modified", ""), reverse=True)[:5]: for eg in sorted(engrams, key=lambda e: e.metadata.get("modified", ""), reverse=True)[:5]:
with st.expander(f"{eg.content[:80]}..."): valid = eg.validate_grounding()
marker = "" if valid["valid"] else "⚠️"
with st.expander(f"{marker} {eg.content[:80]}..."):
st.write(f"ID: `{eg.id}`")
st.write(f"Source: {eg.metadata.get('source')}") st.write(f"Source: {eg.metadata.get('source')}")
st.write(f"Confidence: {eg.compute_confidence():.2f}") st.write(f"Confidence: {eg.compute_confidence():.2f}")
st.write(f"Confirmed: {'' if eg.correctness.confirmed else ''}") st.write(f"Confirmed: {'' if eg.correctness.confirmed else ''}")
st.write("Tags:", ", ".join(eg.metadata.get("tags", []))) st.write("Tags:", ", ".join(eg.metadata.get("tags", [])))
if not valid["valid"]:
st.warning(f"Grounding: {valid['issue']}")
if st.button("Auto-Fix", key=f"af_{eg.id}"):
eg.auto_fix_grounding()
store.save(eg)
st.experimental_rerun()
elif page == "Engramme": elif page == "Engramme":
store = _store() store = _store()
st.subheader("Alle Engramme") st.subheader("Alle Engramme (max 1000)")
tag_filter = st.text_input("Filter tags") tag_filter = st.text_input("Filter tags")
source_filter = st.selectbox("Source", ["alle", "user", "agent", "web", "file", "system"]) source_filter = st.selectbox("Source", ["alle", "user", "agent", "web", "file", "system"])
for eg in store.get_all(): for eg in store.get_all(limit=1000):
tags = eg.metadata.get("tags", []) tags = eg.metadata.get("tags", [])
src = eg.metadata.get("source", "") src = eg.metadata.get("source", "")
if tag_filter and tag_filter not in tags: if tag_filter and tag_filter not in tags:
continue continue
if source_filter != "alle" and source_filter != src: if source_filter != "alle" and source_filter != src:
continue continue
with st.expander(f"{eg.content[:100]}"): col1, col2 = st.columns([4, 1])
st.write("Confidence:", f"{eg.compute_confidence():.2f}") with col1:
st.write("Tags:", ", ".join(tags)) conf = eg.compute_confidence()
st.write("Source:", src) marker = "" if conf > 0.7 else "⚠️"
c1, c2 = st.columns(2) st.markdown(f"{marker} **{eg.content[:100]}**")
if c1.button("Confirm", key=f"conf_{eg.id}"): st.caption(f"Conf: {conf:.2f} | Tags: {', '.join(tags)} | Source: {src}")
with col2:
if st.button("✅ Confirm", key=f"conf_{eg.id}"):
eg.correctness.confirm("user") eg.correctness.confirm("user")
store.save(eg) store.save(eg)
st.success("Confirmed!") st.success("Confirmed")
if c2.button("❌ Reject", key=f"rej_{eg.id}"): if st.button("❌ Reject", key=f"rej_{eg.id}"):
eg.correctness.reject("user") eg.correctness.reject("user")
store.save(eg) store.save(eg)
st.warning("Rejected.") st.warning("Rejected")
st.divider()
elif page == "Suche": elif page == "Suche":
st.subheader("Semantic + Keyword Suche") st.subheader("Hybrid Search (Semantic + Keyword)")
query = st.text_input("Query") query = st.text_input("Query", placeholder="Suchbegriff eingeben...")
mode = st.radio("Modus", ["Hybrid", "Keyword", "Semantic"]) mode = st.radio("Modus", ["Hybrid", "Keyword", "Semantic"], horizontal=True)
if st.button("Suchen") and query: if st.button("Suchen") and query:
ret = _retriever() ret = _retriever()
if mode == "Hybrid": results = ret.hybrid_retrieve(query, limit=10) if mode == "Hybrid" else \
results = ret.hybrid_retrieve(query, limit=10) ret.semantic_retrieve(query, limit=10) if mode == "Semantic" else \
elif mode == "Semantic": ret.retrieve(query, limit=10)
results = ret.semantic_retrieve(query, limit=10) if not results:
else: st.info("Keine Ergebnisse gefunden.")
results = ret.retrieve(query, limit=10)
for r in results: for r in results:
eg = r["engram"] eg = r["engram"]
with st.container(): with st.container():
st.markdown(f"**{eg.content[:200]}...**") st.markdown(f"**{eg.content[:200]}...**")
st.write(f"Score: {r['score']:.3f} | Match: {r['match_type']} | Conf: {eg.compute_confidence():.2f}") st.write(f"Score: `{r['score']:.3f}` | Match: `{r['match_type']}` | Conf: `{eg.compute_confidence():.2f}`")
c1, c2 = st.columns(2) c1, c2 = st.columns(2)
if c1.button("✅ Confirm", key=f"sc_{eg.id}"): if c1.button("✅ Confirm", key=f"sc_{eg.id}"):
eg.correctness.confirm("user") eg.correctness.confirm("user")
store = _store() _store().save(eg)
store.save(eg)
st.success("Confirmed") st.success("Confirmed")
if c2.button("❌ Reject", key=f"sr_{eg.id}"): if c2.button("❌ Reject", key=f"sr_{eg.id}"):
eg.correctness.reject("user") eg.correctness.reject("user")
store = _store() _store().save(eg)
store.save(eg)
st.warning("Rejected") st.warning("Rejected")
elif page == "Graph": elif page == "Graph":
st.subheader("Graph-Visualisierung") st.subheader("Graph-Visualisierung")
graph_html_path = Path(_DB_PATH).parent / "graph_view.html" graph_html_path = Path(str(_DEFAULT_DB)).parent / "graph_view.html"
if st.button("Graph neu generieren"):
with st.spinner("Generiere Graph..."):
path = generate_graph_html(_store(), str(graph_html_path))
st.success(f"Graph generiert: {path}")
if graph_html_path.exists(): if graph_html_path.exists():
with open(graph_html_path, "r", encoding="utf-8") as f: with open(graph_html_path, "r", encoding="utf-8") as f:
html = f.read() html = f.read()
# iframe st.components.v1.html(html, height=800)
st.components.v1.html(html, height=800, scrolling=True)
else: else:
st.info("Graph nicht generiert. Führe `python -m src.cli graph` aus.") st.info("Graph noch nicht generiert. Klicke oben.")
if st.button("Graph generieren"):
from src.graph_view import generate_graph_html
store = _store()
path = generate_graph_html(store, str(Path(_DB_PATH).parent / "graph_view.html"))
st.success(f"Graph generiert: {path}")
elif page == "Stats": elif page == "Heal-Log":
store = _store() st.subheader("Error Healing & Loop Detection")
engrams = store.get_all() healer = _healer()
st.json({ stats = healer.get_error_stats()
"total": len(engrams), c1, c2, c3 = st.columns(3)
"confirmed": sum(1 for e in engrams if e.correctness.confirmed), c1.metric("Total Errors", stats["total_errors"])
"pending": sum(1 for e in engrams if not e.correctness.confirmed), c2.metric("Repeated", stats["repeated_errors"])
"sources": {s: sum(1 for e in engrams if e.metadata.get("source") == s) for s in {e.metadata.get("source") for e in engrams}}, c3.metric("Error Types", len(stats.get("error_types", {})))
"tags": {t: sum(1 for e in engrams for t2 in e.metadata.get("tags", []) if t2 == t) for t in {t for e in engrams for t in e.metadata.get("tags", [])}},
"avg_confidence": sum(e.compute_confidence() for e in engrams) / max(1, len(engrams)), st.subheader("Error Types")
}) for etype, count in stats.get("error_types", {}).items():
st.write(f"- **{etype}**: {count}")
st.subheader("Loop-Checker")
q = st.text_input("Query")
r = st.text_input("Response")
if st.button("Check Loop") and q and r:
detector = LoopDetector()
result = detector.check(q, r)
st.json(result)
if result["loop_detected"]:
st.error(result["suggestion"])
elif page == "Neural Scorer": elif page == "Neural Scorer":
st.subheader("Neural Scorer Training") st.subheader("Neural Scorer Training")
scorer = _scorer() scorer = _scorer()
store = _store() store = _store()
engrams = store.get_all() engrams = store.get_all(limit=10000)
labeled = [e for e in engrams if e.correctness.confirmed or e.correctness.rejections > 0] labeled = [e for e in engrams if e.correctness.confirmed or e.correctness.rejections > 0]
st.write(f"Labelled Engramme: {len(labeled)}") st.write(f"Labelled Engramme: **{len(labeled)}**")
if st.button("Train Neural Scorer"): if st.button("Train Neural Scorer"):
if len(labeled) < 2: if len(labeled) < 2:
st.error("Mindestens 2 labelierte Engramme nötig (confirm + reject).") st.error("Mindestens 2 labelierte Engramme nötig (confirm + reject).")
else: else:
result = scorer.train(labeled, epochs=30) with st.spinner("Training läuft..."):
result = scorer.train(labeled, epochs=30)
st.json(result) st.json(result)
st.success("Training abgeschlossen!") st.success("Training abgeschlossen!")
if st.button("Predict All"): if st.button("Predict All"):
for eg in engrams[:10]: for eg in engrams[:20]:
pred = scorer.predict(eg) pred = scorer.predict(eg)
st.write(f"{eg.content[:60]}... → {pred:.3f}") st.write(f"{eg.content[:50]}... → **{pred:.3f}**")

View File

@@ -31,19 +31,21 @@ class ChromaStore:
) )
def _build_metadata(self, engram: Engram) -> Dict[str, Any]: def _build_metadata(self, engram: Engram) -> Dict[str, Any]:
"""Serialisierte Metadaten für ChromaDB (nur primitives).""" """Serialisierte Metadaten für ChromaDB (nur primitiv/scalar/Str)."""
meta = engram.metadata.copy() m = engram.metadata
# ChromaDB akzeptiert nur Listen/Strings/Numbers/Bools safe: Dict[str, Any] = {}
tags = meta.pop("tags", []) # Nur explizit erlaubte Felder übernehmen
if isinstance(tags, list): safe["source"] = str(m.get("source", "agent"))
meta["tags"] = ",".join(str(t) for t in tags) safe["confidence"] = float(m.get("confidence", 0.5))
meta.setdefault("source", "agent") safe["grounding"] = int(m.get("grounding", 1))
meta.setdefault("confidence", 0.5) tags = m.get("tags", [])
meta.setdefault("correctness", "unconfirmed") safe["tags"] = ",".join(str(t) for t in tags) if isinstance(tags, list) else str(tags)
# Hierarchy als JSON-String safe["created"] = str(m.get("created", ""))
if "hierarchy" in meta: safe["modified"] = str(m.get("modified", ""))
meta["hierarchy"] = json.dumps(meta["hierarchy"]) safe["access_count"] = int(m.get("access_count", 0))
return meta safe["correctness"] = "confirmed" if engram.correctness.confirmed else "unconfirmed"
safe["content"] = str(engram.content)[:500] # Chroma akzeptiert kurze Strings besser
return safe
def add(self, engram: Engram, embedding: Optional[List[float]] = None) -> None: def add(self, engram: Engram, embedding: Optional[List[float]] = None) -> None:
"""Engramm mit Embedding zur Vektor-DB hinzufügen.""" """Engramm mit Embedding zur Vektor-DB hinzufügen."""

View File

@@ -3,7 +3,7 @@
Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten. Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten.
Usage: Usage:
python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user python -m src.cli add "Faktum" --tag wichtig --source user
python -m src.cli search "Faktum" python -m src.cli search "Faktum"
python -m src.cli show <id> python -m src.cli show <id>
python -m src.cli confirm <id> python -m src.cli confirm <id>
@@ -11,18 +11,31 @@ Usage:
python -m src.cli list python -m src.cli list
python -m src.cli stats python -m src.cli stats
python -m src.cli export backup.jsonl python -m src.cli export backup.jsonl
python -m src.cli graph
python -m src.cli heal
python -m src.cli neural-train
python -m src.cli loop-check "query" "response"
python -m src.cli dashboard
""" """
import sys
import json
import argparse import argparse
import json
import os
import subprocess
import sys
from pathlib import Path from pathlib import Path
from .store import EngramStore from .store import EngramStore
from .engram import Engram, Grounding from .engram import Engram, Grounding
from .retriever import Retriever from .retriever import Retriever
from .chroma_store import ChromaStore
from .graph_view import generate_graph_html
from .neural_scorer import NeuralScorer
from .loop_detector import LoopDetector
from .error_healer import ErrorHealer
DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite" DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite"
CHROMA_PATH = Path(__file__).parent.parent / "data" / "chroma"
def get_store(): def get_store():
@@ -30,6 +43,10 @@ def get_store():
return EngramStore(str(DB_PATH)) return EngramStore(str(DB_PATH))
def get_chroma():
return ChromaStore(str(CHROMA_PATH))
def cmd_add(args): def cmd_add(args):
store = get_store() store = get_store()
eg = Engram.create( eg = Engram.create(
@@ -38,20 +55,46 @@ def cmd_add(args):
tags=args.tag, tags=args.tag,
grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION, grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION,
) )
# Grounding-Regel prüfen (Issue #8)
validation = eg.validate_grounding()
if not validation["valid"] and args.auto_fix:
eg.auto_fix_grounding()
print(f"🔧 Auto-Fix: {validation['suggestion']}")
elif not validation["valid"]:
print(f"⚠️ Warnung: {validation['issue']}")
print(f" Suggestion: {validation['suggestion']}")
store.save(eg) store.save(eg)
print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}") print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}")
def cmd_search(args): def cmd_search(args):
store = get_store() store = get_store()
ret = Retriever(store) chroma = get_chroma()
results = ret.retrieve( ret = Retriever(store, chroma)
" ".join(args.query),
limit=args.limit, mode = args.mode
min_confidence=args.min_confidence, if mode == "hybrid":
tag_filter=args.tag, results = ret.hybrid_retrieve(
) " ".join(args.query),
print(f"\n=== {len(results)} Results ===") limit=args.limit,
min_confidence=args.min_confidence,
)
elif mode == "semantic":
results = ret.semantic_retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
)
else:
results = ret.retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
tag_filter=args.tag,
)
print(f"\n=== {len(results)} Results ({mode}) ===")
for r in results: for r in results:
eg = r["engram"] eg = r["engram"]
conf = eg.compute_confidence() conf = eg.compute_confidence()
@@ -106,7 +149,17 @@ def cmd_list(args):
def cmd_stats(args): def cmd_stats(args):
store = get_store() store = get_store()
ret = Retriever(store) ret = Retriever(store)
s = ret.stats() try:
s = ret.stats()
except AttributeError:
egs = store.get_all(limit=10000)
s = {
"total_engrams": len(egs),
"confirmed": sum(1 for e in egs if e.correctness.confirmed),
"unconfirmed": sum(1 for e in egs if not e.correctness.confirmed),
"sources": {src: sum(1 for e in egs if e.metadata.get("source") == src) for src in {e.metadata.get("source") for e in egs}},
"db_size_bytes": os.path.getsize(str(DB_PATH)) if os.path.exists(str(DB_PATH)) else 0,
}
print("\n=== Second Brain Stats ===") print("\n=== Second Brain Stats ===")
print(f" Total Engrams: {s['total_engrams']}") print(f" Total Engrams: {s['total_engrams']}")
print(f" Confirmed: {s['confirmed']}") print(f" Confirmed: {s['confirmed']}")
@@ -123,6 +176,67 @@ def cmd_export(args):
print(f"Exported {count} engrams to {args.path}") print(f"Exported {count} engrams to {args.path}")
def cmd_graph(args):
store = get_store()
path = args.output or str(DB_PATH.parent / "graph_view.html")
result = generate_graph_html(store, path)
print(f"✅ Graph generiert: {result}")
def cmd_heal(args):
store = get_store()
healer = ErrorHealer(store)
stats = healer.get_error_stats()
print("\n=== Error Heal Stats ===")
print(f" Total Errors: {stats['total_errors']}")
print(f" Repeated Errors: {stats['repeated_errors']}")
print(f" Error Types:")
for etype, count in stats.get("error_types", {}).items():
print(f" {etype}: {count}")
if args.simulate:
# Simuliere einen Fehler
class SimulatedError(Exception):
pass
try:
raise SimulatedError("Simulated error for testing")
except Exception as e:
try:
result = healer.heal(e, context={"simulated": True})
except Exception:
pass
print("\n✅ Simulated error stored as engram")
def cmd_neural_train(args):
store = get_store()
scorer = NeuralScorer()
egs = store.get_all(limit=10000)
labeled = [e for e in egs if e.correctness.confirmed or e.correctness.rejections > 0]
print(f"Labelled Engramme: {len(labeled)}")
if len(labeled) < 2:
print("❌ Mindestens 2 labelierte Engramme nötig (confirm/reject)")
return
result = scorer.train(labeled, epochs=args.epochs)
print(f"✅ Training abgeschlossen")
print(json.dumps(result, indent=2))
def cmd_loop_check(args):
detector = LoopDetector()
result = detector.check(args.query, args.response)
print(json.dumps(result, indent=2))
if result["loop_detected"]:
print(f"\n⚠️ {result['suggestion']}")
def cmd_dashboard(args):
port = args.port
print(f"🚀 Starte Streamlit Dashboard auf Port {port}...")
script = Path(__file__).resolve().parent / "app_dashboard.py"
subprocess.run([sys.executable, "-m", "streamlit", "run", str(script), "--server.port", str(port)])
def main(): def main():
parser = argparse.ArgumentParser(description="Second Brain CLI") parser = argparse.ArgumentParser(description="Second Brain CLI")
sub = parser.add_subparsers(dest="cmd") sub = parser.add_subparsers(dest="cmd")
@@ -132,12 +246,15 @@ def main():
p_add.add_argument("--tag", action="append", default=[]) p_add.add_argument("--tag", action="append", default=[])
p_add.add_argument("--source", default="user") p_add.add_argument("--source", default="user")
p_add.add_argument("--grounding", choices=[g.name for g in Grounding]) p_add.add_argument("--grounding", choices=[g.name for g in Grounding])
p_add.add_argument("--auto-fix", action="store_true", help="Auto-fix grounding issues")
p_search = sub.add_parser("search", help="Search engrams") p_search = sub.add_parser("search", help="Search engrams")
p_search.add_argument("query", nargs="+") p_search.add_argument("query", nargs="+")
p_search.add_argument("--limit", type=int, default=5) p_search.add_argument("--limit", type=int, default=5)
p_search.add_argument("--min-confidence", type=float, default=0.0) p_search.add_argument("--min-confidence", type=float, default=0.0)
p_search.add_argument("--tag", default=None) p_search.add_argument("--tag", default=None)
p_search.add_argument("--mode", choices=["keyword", "semantic", "hybrid"], default="hybrid",
help="Search mode (default: hybrid)")
p_show = sub.add_parser("show", help="Show engram details") p_show = sub.add_parser("show", help="Show engram details")
p_show.add_argument("id") p_show.add_argument("id")
@@ -158,14 +275,39 @@ def main():
p_export = sub.add_parser("export", help="Export to JSONL") p_export = sub.add_parser("export", help="Export to JSONL")
p_export.add_argument("path") p_export.add_argument("path")
p_graph = sub.add_parser("graph", help="Generate graph visualization")
p_graph.add_argument("--output", default=None, help="Output HTML path")
p_heal = sub.add_parser("heal", help="Show error healing stats")
p_heal.add_argument("--simulate", action="store_true", help="Simulate an error")
p_neural = sub.add_parser("neural-train", help="Train neural scorer")
p_neural.add_argument("--epochs", type=int, default=30)
p_loop = sub.add_parser("loop-check", help="Check for conversation loops")
p_loop.add_argument("query")
p_loop.add_argument("response")
p_dash = sub.add_parser("dashboard", help="Launch Streamlit dashboard")
p_dash.add_argument("--port", type=int, default=8501)
args = parser.parse_args() args = parser.parse_args()
if not args.cmd: if not args.cmd:
parser.print_help() parser.print_help()
return return
{"add": cmd_add, "search": cmd_search, "show": cmd_show, handlers = {
"confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list, "add": cmd_add, "search": cmd_search, "show": cmd_show,
"stats": cmd_stats, "export": cmd_export}[args.cmd](args) "confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list,
"stats": cmd_stats, "export": cmd_export, "graph": cmd_graph,
"heal": cmd_heal, "neural-train": cmd_neural_train,
"loop-check": cmd_loop_check, "dashboard": cmd_dashboard,
}
handler = handlers.get(args.cmd)
if handler:
handler(args)
else:
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -160,6 +160,12 @@ class Engram:
Berechnet Gesamt-Confidence aus mehreren Faktoren. Berechnet Gesamt-Confidence aus mehreren Faktoren.
Kein Neuronales Netz nötig - Heuristik für Phase 1. Kein Neuronales Netz nötig - Heuristik für Phase 1.
""" """
# Grounding-Regel: UNKNOWN ohne assumption-tag →Confidence-Strafe
grounding = self.metadata.get("grounding", 0)
if grounding == Grounding.UNKNOWN.value and "assumption" not in self.metadata.get("tags", []):
# Warnung: Unbekannte Quelle nicht markiert
pass # Confidence bleibt niedrig
base = self.metadata.get("confidence", 0.5) base = self.metadata.get("confidence", 0.5)
# Korrektheit # Korrektheit
correctness_score = self.correctness.score() correctness_score = self.correctness.score()
@@ -169,7 +175,7 @@ class Engram:
age_days = _age_days(self.metadata.get("created", _now())) age_days = _age_days(self.metadata.get("created", _now()))
recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0 recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0
# Grounding # Grounding
grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2 grounding_boost = (grounding / 4) * 0.2
combined = ( combined = (
base * 0.3 + base * 0.3 +
@@ -180,6 +186,36 @@ class Engram:
) )
return min(max(combined, 0.0), 1.0) return min(max(combined, 0.0), 1.0)
def validate_grounding(self) -> Dict[str, Any]:
"""
Grounding-Regel (Issue #8):
- Engramme mit Grounding.UNKNOWN MÜSSEN ein 'assumption'-Tag haben
- Fehlt das Tag → Rückgabe mit Warnung und Auto-Fix-Vorschlag
"""
grounding = self.metadata.get("grounding", Grounding.UNKNOWN.value)
tags = self.metadata.get("tags", [])
if grounding == Grounding.UNKNOWN.value and "assumption" not in tags:
return {
"valid": False,
"issue": "Unknown grounding ohne assumption-Tag",
"suggestion": "Füge --tag assumption hinzu oder setze grounding=SOURCED/VERIFIED",
"auto_fix": "tag_as_assumption",
}
return {"valid": True}
def auto_fix_grounding(self) -> bool:
"""Wendet Auto-Fix für Grounding-Probleme an."""
validation = self.validate_grounding()
if not validation["valid"] and validation.get("auto_fix") == "tag_as_assumption":
tags = self.metadata.get("tags", [])
if "assumption" not in tags:
tags.append("assumption")
self.metadata["tags"] = tags
self.metadata["grounding"] = Grounding.ASSUMPTION.value
return True
return False
def to_dict(self) -> dict: def to_dict(self) -> dict:
return { return {
"id": str(self.id), "id": str(self.id),

211
src/error_healer.py Normal file
View File

@@ -0,0 +1,211 @@
"""
error_healer.py - Selbstheilung durch Fehlererkennung & Auto-Korrektur.
Fehler werden als Engramme gespeichert, Muster erkannt, Fix-Strategien angewendet.
"""
import re
import traceback
import json
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime, timezone
from pathlib import Path
from .engram import Engram, Grounding
from .store import EngramStore
from .retriever import Retriever
_HEAL_LOG = Path(__file__).resolve().parent.parent / "data" / "heal_log.jsonl"
class ErrorHealer:
"""
Heilt wiederkehrende Fehler durch:
1. Speichern von Fehlern als Engramme
2. Mustererkennung (gleicher Fehler-Typ, gleicher Kontext)
3. Auto-Fix (Fallback-Strategien, alternative Ansätze)
4. Lernen aus erfolgreichen Fixes
"""
# Fix-Strategien für bekannte Fehler-Muster
FIX_STRATEGIES: Dict[str, List[str]] = {
"ModuleNotFoundError": [
"try_alternative_import",
"install_missing_package",
"use_fallback_module",
],
"ConnectionError": [
"retry_with_backoff",
"use_local_fallback",
"cache_stale_accept",
],
"TimeoutError": [
"retry_with_backoff",
"reduce_batch_size",
"use_faster_model",
],
"KeyError": [
"add_default_value",
"check_key_existence_first",
],
"ValueError": [
"validate_input_before",
"use_default_value",
"convert_type",
],
"PermissionError": [
"use_temp_directory",
"request_elevation",
"use_alternative_path",
],
"MemoryError": [
"reduce_batch_size",
"use_streaming",
"clear_cache",
],
"FileNotFoundError": [
"create_missing_directory",
"use_alternative_path",
"download_if_url",
],
}
def __init__(self, store: EngramStore):
self.store = store
self.retriever = Retriever(store)
self._heal_count = 0
self._recent_errors: List[Dict] = []
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _extract_error_type(self, exc: Exception) -> str:
return type(exc).__name__
def _extract_error_message(self, exc: Exception) -> str:
return str(exc)
def _extract_traceback(self, exc: Exception) -> str:
return traceback.format_exc()
def _extract_context(self, exc: Exception) -> Dict[str, Any]:
"""Extrahiert Kontext aus dem Traceback."""
tb_str = traceback.format_exc()
# Extrahiere Datei und Zeilennummer
match = re.search(r'File "([^"]+)", line (\d+)', tb_str)
if match:
return {"file": match.group(1), "line": int(match.group(2))}
return {}
def heal(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None,
rethrow: bool = True,
) -> Dict[str, Any]:
"""
Führt Selbstheilung auf einem Fehler aus.
Args:
exc: Die Exception
context: Zusätzlicher Kontext (z.B. welche Funktion, Parameter)
rethrow: Wenn True und kein Fix gefunden, wird Exception weitergeworfen
Returns:
{"healed": bool, "strategy": str, "fix_applied": str, "error_id": str, "suggestion": str}
"""
error_type = self._extract_error_type(exc)
error_msg = self._extract_error_message(exc)
tb = self._extract_traceback(exc)
ctx = self._extract_context(exc)
if context:
ctx.update(context)
# 1. Fehler als Engramm speichern
error_engram = Engram.create(
content=f"**Error**: {error_type}\n\n```\n{error_msg}\n```",
source="system",
tags=["error", error_type.lower()],
confidence=0.3,
grounding=Grounding.ASSUMPTION,
)
error_engram.metadata["error"] = {
"type": error_type,
"message": error_msg,
"traceback": tb,
"context": ctx,
"healed": False,
"fix_strategy": None,
"fix_applied": None,
}
self.store.save(error_engram)
# 2. Mustererkennung: Gab es diesen Fehlertyp schon?
similar = self.retriever.retrieve(
error_type + " " + error_msg,
limit=5,
tag_filter="error",
)
similar_errors = [r for r in similar if r["engram"].metadata.get("source") == "system"]
# 3. Fix-Strategie bestimmen
strategies = self.FIX_STRATEGIES.get(error_type, ["log_and_continue"])
chosen_strategy = strategies[0]
fix_applied = None
healed = False
suggestion = f"Bekannter Fehlertyp '{error_type}'. Prüfe die Trail-Engramme mit `search --tag error`."
# Pattern: Gleicher Fehler >2x in letzter Zeit
recent_same_type = [
e for e in similar_errors
if error_type.lower() in str(e["engram"].content).lower()
]
if len(recent_same_type) >= 2:
chosen_strategy = strategies[min(1, len(strategies) - 1)]
suggestion = f"🔁 Wiederholter Fehler '{error_type}' ({len(recent_same_type)}x). Nutze Strategie: {chosen_strategy}"
# 4. Log
self._log_healing({
"timestamp": self._now(),
"error_id": str(error_engram.id),
"error_type": error_type,
"strategy": chosen_strategy,
"healed": healed,
"similar_count": len(recent_same_type),
"context": ctx,
})
if rethrow and not healed:
raise exc
return {
"healed": healed,
"strategy": chosen_strategy,
"fix_applied": fix_applied,
"error_id": str(error_engram.id),
"suggestion": suggestion,
}
def _log_healing(self, data: Dict):
_HEAL_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(_HEAL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
def get_fix_suggestion(self, error_type: str) -> str:
"""Gibt eine Fix-Suggestion für einen Fehlertyp zurück."""
strategies = self.FIX_STRATEGIES.get(error_type, ["Unbekannter Fehlertyp. Debuggen und als Engramm speichern."])
return f"Mögliche Strategien für {error_type}: {', '.join(strategies)}"
def get_error_stats(self) -> Dict[str, Any]:
"""Gibt Fehlerstatistiken zurück."""
all_eg = self.store.get_all(limit=1000)
errors = [e for e in all_eg if "error" in e.metadata.get("tags", [])]
types = {}
for e in errors:
err = e.metadata.get("error", {})
t = err.get("type", "Unknown")
types[t] = types.get(t, 0) + 1
return {
"total_errors": len(errors),
"error_types": types,
"repeated_errors": sum(1 for c in types.values() if c > 1),
}

115
src/loop_detector.py Normal file
View File

@@ -0,0 +1,115 @@
"""
loop_detector.py - Session-Cache mit SHA256-Dedup.
Erkennt und bricht Loops bei wiederholten Anfragen/Antworten.
"""
import hashlib
import json
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field, asdict
from pathlib import Path
_CACHE_PATH = Path(__file__).resolve().parent.parent / "data" / "loop_cache.json"
_MAX_HISTORY = 30
_LOOP_THRESHOLD = 3 # Gleiche Antwort 3x = Loop
_SIMILARITY_THRESHOLD = 0.92
def _sha(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
def _normalize(text: str) -> str:
"""Entfernt Variationen für besseren Vergleich."""
return " ".join(text.lower().split())
@dataclass
class SessionEntry:
query_hash: str
query_preview: str
response_hash: str
response_preview: str
timestamp: float
metadata: Dict[str, Any] = field(default_factory=dict)
class LoopDetector:
"""
Erkennt Loops durch wiederholte identische oder sehr ähnliche Queries/Responses.
"""
def __init__(self, cache_path: Optional[str] = None):
self.path = Path(cache_path) if cache_path else _CACHE_PATH
self.path.parent.mkdir(parents=True, exist_ok=True)
self._history: list = []
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path, "r", encoding="utf-8") as f:
self._history = json.load(f)
except Exception:
self._history = []
def _save(self):
with open(self.path, "w", encoding="utf-8") as f:
json.dump(self._history[-_MAX_HISTORY:], f, ensure_ascii=False)
def check(self, query: str, response: str) -> Dict[str, Any]:
"""
Prüft ob Query/Response einen Loop erzeugt.
Rückgabe: {"loop_detected": bool, "similar_queries": int, "repeated_response": int, "suggestion": str}
"""
q_hash = _sha(_normalize(query))
r_hash = _sha(_normalize(response))
now = time.time()
similar_queries = 0
repeated_response = 0
for entry in self._history:
# Query ähnlich?
if entry.get("query_hash") == q_hash:
similar_queries += 1
# Response identisch?
if entry.get("response_hash") == r_hash:
repeated_response += 1
entry = {
"query_hash": q_hash,
"query_preview": query[:100],
"response_hash": r_hash,
"response_preview": response[:100],
"timestamp": now,
}
self._history.append(entry)
self._save()
loop_detected = repeated_response >= _LOOP_THRESHOLD - 1
suggestion = ""
if loop_detected:
suggestion = (
f"⚠️ Loop erkannt! Diese Antwort wurde {repeated_response}x wiederholt. "
"Versuch eine alternative Herangehensweise oder frage nach Klärung."
)
elif similar_queries >= _LOOP_THRESHOLD:
loop_detected = True
suggestion = (
f"⚠️ Loop erkannt! Ähnliche Anfrage {similar_queries}x gestellt. "
"Prüfe ob die Aufgabe sich geändert hat oder ob ein Problem blockiert."
)
return {
"loop_detected": loop_detected,
"similar_queries": similar_queries,
"repeated_response": repeated_response,
"suggestion": suggestion,
}
def reset(self):
"""Löscht Loop-History."""
self._history = []
self._save()