3 Commits

7 changed files with 636 additions and 93 deletions

1
.streamlit/secrets.toml Normal file
View File

@@ -0,0 +1 @@
[default]

View File

@@ -1,174 +1,210 @@
"""
app_dashboard.py - Streamlit-Dashboard für Second Brain.
Seiten: Übersicht, Engramme, Suche, Graph, Stats.
Seiten: Übersicht, Engramme, Suche, Graph, Heal-Log, Neural Scorer.
"""
import json
import sys
import os
from pathlib import Path
import streamlit as st
sys.path.insert(0, str(Path(__file__).resolve().parent))
_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_root))
from src.engram import Engram
from src.store import EngramStore
from src.chroma_store import ChromaStore
from src.retriever import Retriever
from src.neural_scorer import NeuralScorer
from src.graph_view import generate_graph_html
from src.loop_detector import LoopDetector
from src.error_healer import ErrorHealer
_DEFAULT_DB = Path(__file__).resolve().parent.parent / "data" / "brain.sqlite"
_DB_PATH = str(st.secrets.get("db_path", _DEFAULT_DB) if hasattr(st, "secrets") else _DEFAULT_DB)
_DEFAULT_DB = _root / "data" / "brain.sqlite"
@st.cache_resource
def _store():
return EngramStore(_DB_PATH)
return EngramStore(str(_DEFAULT_DB))
@st.cache_resource
def _chroma():
p = Path(_DB_PATH).parent / "chroma"
p = Path(str(_DEFAULT_DB)).parent / "chroma"
return ChromaStore(str(p))
_retriever_cache = None
def _retriever():
return Retriever(_store(), _chroma())
global _retriever_cache
if _retriever_cache is None:
_retriever_cache = Retriever(_store(), _chroma())
return _retriever_cache
@st.cache_resource
def _scorer():
return NeuralScorer()
st.set_page_config(page_title="Second Brain Dashboard", layout="wide")
st.title("🧠 Second Brain Dashboard")
@st.cache_resource
def _healer():
return ErrorHealer(_store())
page = st.sidebar.radio("Seite", ["Übersicht", "Engramme", "Suche", "Graph", "Stats", "Neural Scorer"])
st.set_page_config(page_title="Second Brain Dashboard", layout="wide")
st.title("🧠 2.Brain v0.3.1")
page = st.sidebar.radio("Seite", ["Übersicht", "Engramme", "Suche", "Graph", "Heal-Log", "Neural Scorer"])
if page == "Übersicht":
store = _store()
engrams = store.get_all()
engrams = store.get_all(limit=10000)
confirmed = sum(1 for e in engrams if e.correctness.confirmed)
unconfirmed = len(engrams) - confirmed
avg_conf = sum(e.compute_confidence() for e in engrams) / max(1, len(engrams))
errors = [e for e in engrams if "error" in e.metadata.get("tags", [])]
c1, c2, c3, c4 = st.columns(4)
c1, c2, c3, c4, c5 = st.columns(5)
c1.metric("Total", len(engrams))
c2.metric("Confirmed", confirmed)
c3.metric("Pending", unconfirmed)
c4.metric("Avg Confidence", f"{avg_conf:.2f}")
c5.metric("Errors", len(errors))
st.subheader("Recent Engramme")
for eg in sorted(engrams, key=lambda e: e.metadata.get("modified", ""), reverse=True)[:5]:
with st.expander(f"{eg.content[:80]}..."):
valid = eg.validate_grounding()
marker = "" if valid["valid"] else "⚠️"
with st.expander(f"{marker} {eg.content[:80]}..."):
st.write(f"ID: `{eg.id}`")
st.write(f"Source: {eg.metadata.get('source')}")
st.write(f"Confidence: {eg.compute_confidence():.2f}")
st.write(f"Confirmed: {'' if eg.correctness.confirmed else ''}")
st.write("Tags:", ", ".join(eg.metadata.get("tags", [])))
if not valid["valid"]:
st.warning(f"Grounding: {valid['issue']}")
if st.button("Auto-Fix", key=f"af_{eg.id}"):
eg.auto_fix_grounding()
store.save(eg)
st.experimental_rerun()
elif page == "Engramme":
store = _store()
st.subheader("Alle Engramme")
st.subheader("Alle Engramme (max 1000)")
tag_filter = st.text_input("Filter tags")
source_filter = st.selectbox("Source", ["alle", "user", "agent", "web", "file", "system"])
for eg in store.get_all():
for eg in store.get_all(limit=1000):
tags = eg.metadata.get("tags", [])
src = eg.metadata.get("source", "")
if tag_filter and tag_filter not in tags:
continue
if source_filter != "alle" and source_filter != src:
continue
with st.expander(f"{eg.content[:100]}"):
st.write("Confidence:", f"{eg.compute_confidence():.2f}")
st.write("Tags:", ", ".join(tags))
st.write("Source:", src)
c1, c2 = st.columns(2)
if c1.button("Confirm", key=f"conf_{eg.id}"):
col1, col2 = st.columns([4, 1])
with col1:
conf = eg.compute_confidence()
marker = "" if conf > 0.7 else "⚠️"
st.markdown(f"{marker} **{eg.content[:100]}**")
st.caption(f"Conf: {conf:.2f} | Tags: {', '.join(tags)} | Source: {src}")
with col2:
if st.button("✅ Confirm", key=f"conf_{eg.id}"):
eg.correctness.confirm("user")
store.save(eg)
st.success("Confirmed!")
if c2.button("❌ Reject", key=f"rej_{eg.id}"):
st.success("Confirmed")
if st.button("❌ Reject", key=f"rej_{eg.id}"):
eg.correctness.reject("user")
store.save(eg)
st.warning("Rejected.")
st.warning("Rejected")
st.divider()
elif page == "Suche":
st.subheader("Semantic + Keyword Suche")
query = st.text_input("Query")
mode = st.radio("Modus", ["Hybrid", "Keyword", "Semantic"])
st.subheader("Hybrid Search (Semantic + Keyword)")
query = st.text_input("Query", placeholder="Suchbegriff eingeben...")
mode = st.radio("Modus", ["Hybrid", "Keyword", "Semantic"], horizontal=True)
if st.button("Suchen") and query:
ret = _retriever()
if mode == "Hybrid":
results = ret.hybrid_retrieve(query, limit=10)
elif mode == "Semantic":
results = ret.semantic_retrieve(query, limit=10)
else:
results = ret.retrieve(query, limit=10)
results = ret.hybrid_retrieve(query, limit=10) if mode == "Hybrid" else \
ret.semantic_retrieve(query, limit=10) if mode == "Semantic" else \
ret.retrieve(query, limit=10)
if not results:
st.info("Keine Ergebnisse gefunden.")
for r in results:
eg = r["engram"]
with st.container():
st.markdown(f"**{eg.content[:200]}...**")
st.write(f"Score: {r['score']:.3f} | Match: {r['match_type']} | Conf: {eg.compute_confidence():.2f}")
st.write(f"Score: `{r['score']:.3f}` | Match: `{r['match_type']}` | Conf: `{eg.compute_confidence():.2f}`")
c1, c2 = st.columns(2)
if c1.button("✅ Confirm", key=f"sc_{eg.id}"):
eg.correctness.confirm("user")
store = _store()
store.save(eg)
_store().save(eg)
st.success("Confirmed")
if c2.button("❌ Reject", key=f"sr_{eg.id}"):
eg.correctness.reject("user")
store = _store()
store.save(eg)
_store().save(eg)
st.warning("Rejected")
elif page == "Graph":
st.subheader("Graph-Visualisierung")
graph_html_path = Path(_DB_PATH).parent / "graph_view.html"
graph_html_path = Path(str(_DEFAULT_DB)).parent / "graph_view.html"
if st.button("Graph neu generieren"):
with st.spinner("Generiere Graph..."):
path = generate_graph_html(_store(), str(graph_html_path))
st.success(f"Graph generiert: {path}")
if graph_html_path.exists():
with open(graph_html_path, "r", encoding="utf-8") as f:
html = f.read()
# iframe
st.components.v1.html(html, height=800, scrolling=True)
st.components.v1.html(html, height=800)
else:
st.info("Graph nicht generiert. Führe `python -m src.cli graph` aus.")
if st.button("Graph generieren"):
from src.graph_view import generate_graph_html
store = _store()
path = generate_graph_html(store, str(Path(_DB_PATH).parent / "graph_view.html"))
st.success(f"Graph generiert: {path}")
st.info("Graph noch nicht generiert. Klicke oben.")
elif page == "Stats":
store = _store()
engrams = store.get_all()
st.json({
"total": len(engrams),
"confirmed": sum(1 for e in engrams if e.correctness.confirmed),
"pending": sum(1 for e in engrams if not e.correctness.confirmed),
"sources": {s: sum(1 for e in engrams if e.metadata.get("source") == s) for s in {e.metadata.get("source") for e in engrams}},
"tags": {t: sum(1 for e in engrams for t2 in e.metadata.get("tags", []) if t2 == t) for t in {t for e in engrams for t in e.metadata.get("tags", [])}},
"avg_confidence": sum(e.compute_confidence() for e in engrams) / max(1, len(engrams)),
})
elif page == "Heal-Log":
st.subheader("Error Healing & Loop Detection")
healer = _healer()
stats = healer.get_error_stats()
c1, c2, c3 = st.columns(3)
c1.metric("Total Errors", stats["total_errors"])
c2.metric("Repeated", stats["repeated_errors"])
c3.metric("Error Types", len(stats.get("error_types", {})))
st.subheader("Error Types")
for etype, count in stats.get("error_types", {}).items():
st.write(f"- **{etype}**: {count}")
st.subheader("Loop-Checker")
q = st.text_input("Query")
r = st.text_input("Response")
if st.button("Check Loop") and q and r:
detector = LoopDetector()
result = detector.check(q, r)
st.json(result)
if result["loop_detected"]:
st.error(result["suggestion"])
elif page == "Neural Scorer":
st.subheader("Neural Scorer Training")
scorer = _scorer()
store = _store()
engrams = store.get_all()
engrams = store.get_all(limit=10000)
labeled = [e for e in engrams if e.correctness.confirmed or e.correctness.rejections > 0]
st.write(f"Labelled Engramme: {len(labeled)}")
st.write(f"Labelled Engramme: **{len(labeled)}**")
if st.button("Train Neural Scorer"):
if len(labeled) < 2:
st.error("Mindestens 2 labelierte Engramme nötig (confirm + reject).")
else:
with st.spinner("Training läuft..."):
result = scorer.train(labeled, epochs=30)
st.json(result)
st.success("Training abgeschlossen!")
if st.button("Predict All"):
for eg in engrams[:10]:
for eg in engrams[:20]:
pred = scorer.predict(eg)
st.write(f"{eg.content[:60]}... → {pred:.3f}")
st.write(f"{eg.content[:50]}... → **{pred:.3f}**")

View File

@@ -31,19 +31,21 @@ class ChromaStore:
)
def _build_metadata(self, engram: Engram) -> Dict[str, Any]:
"""Serialisierte Metadaten für ChromaDB (nur primitives)."""
meta = engram.metadata.copy()
# ChromaDB akzeptiert nur Listen/Strings/Numbers/Bools
tags = meta.pop("tags", [])
if isinstance(tags, list):
meta["tags"] = ",".join(str(t) for t in tags)
meta.setdefault("source", "agent")
meta.setdefault("confidence", 0.5)
meta.setdefault("correctness", "unconfirmed")
# Hierarchy als JSON-String
if "hierarchy" in meta:
meta["hierarchy"] = json.dumps(meta["hierarchy"])
return meta
"""Serialisierte Metadaten für ChromaDB (nur primitiv/scalar/Str)."""
m = engram.metadata
safe: Dict[str, Any] = {}
# Nur explizit erlaubte Felder übernehmen
safe["source"] = str(m.get("source", "agent"))
safe["confidence"] = float(m.get("confidence", 0.5))
safe["grounding"] = int(m.get("grounding", 1))
tags = m.get("tags", [])
safe["tags"] = ",".join(str(t) for t in tags) if isinstance(tags, list) else str(tags)
safe["created"] = str(m.get("created", ""))
safe["modified"] = str(m.get("modified", ""))
safe["access_count"] = int(m.get("access_count", 0))
safe["correctness"] = "confirmed" if engram.correctness.confirmed else "unconfirmed"
safe["content"] = str(engram.content)[:500] # Chroma akzeptiert kurze Strings besser
return safe
def add(self, engram: Engram, embedding: Optional[List[float]] = None) -> None:
"""Engramm mit Embedding zur Vektor-DB hinzufügen."""

View File

@@ -3,7 +3,7 @@
Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten.
Usage:
python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user
python -m src.cli add "Faktum" --tag wichtig --source user
python -m src.cli search "Faktum"
python -m src.cli show <id>
python -m src.cli confirm <id>
@@ -11,18 +11,31 @@ Usage:
python -m src.cli list
python -m src.cli stats
python -m src.cli export backup.jsonl
python -m src.cli graph
python -m src.cli heal
python -m src.cli neural-train
python -m src.cli loop-check "query" "response"
python -m src.cli dashboard
"""
import sys
import json
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from .store import EngramStore
from .engram import Engram, Grounding
from .retriever import Retriever
from .chroma_store import ChromaStore
from .graph_view import generate_graph_html
from .neural_scorer import NeuralScorer
from .loop_detector import LoopDetector
from .error_healer import ErrorHealer
DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite"
CHROMA_PATH = Path(__file__).parent.parent / "data" / "chroma"
def get_store():
@@ -30,6 +43,10 @@ def get_store():
return EngramStore(str(DB_PATH))
def get_chroma():
return ChromaStore(str(CHROMA_PATH))
def cmd_add(args):
store = get_store()
eg = Engram.create(
@@ -38,20 +55,46 @@ def cmd_add(args):
tags=args.tag,
grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION,
)
# Grounding-Regel prüfen (Issue #8)
validation = eg.validate_grounding()
if not validation["valid"] and args.auto_fix:
eg.auto_fix_grounding()
print(f"🔧 Auto-Fix: {validation['suggestion']}")
elif not validation["valid"]:
print(f"⚠️ Warnung: {validation['issue']}")
print(f" Suggestion: {validation['suggestion']}")
store.save(eg)
print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}")
def cmd_search(args):
store = get_store()
ret = Retriever(store)
chroma = get_chroma()
ret = Retriever(store, chroma)
mode = args.mode
if mode == "hybrid":
results = ret.hybrid_retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
)
elif mode == "semantic":
results = ret.semantic_retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
)
else:
results = ret.retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
tag_filter=args.tag,
)
print(f"\n=== {len(results)} Results ===")
print(f"\n=== {len(results)} Results ({mode}) ===")
for r in results:
eg = r["engram"]
conf = eg.compute_confidence()
@@ -106,7 +149,17 @@ def cmd_list(args):
def cmd_stats(args):
store = get_store()
ret = Retriever(store)
try:
s = ret.stats()
except AttributeError:
egs = store.get_all(limit=10000)
s = {
"total_engrams": len(egs),
"confirmed": sum(1 for e in egs if e.correctness.confirmed),
"unconfirmed": sum(1 for e in egs if not e.correctness.confirmed),
"sources": {src: sum(1 for e in egs if e.metadata.get("source") == src) for src in {e.metadata.get("source") for e in egs}},
"db_size_bytes": os.path.getsize(str(DB_PATH)) if os.path.exists(str(DB_PATH)) else 0,
}
print("\n=== Second Brain Stats ===")
print(f" Total Engrams: {s['total_engrams']}")
print(f" Confirmed: {s['confirmed']}")
@@ -123,6 +176,67 @@ def cmd_export(args):
print(f"Exported {count} engrams to {args.path}")
def cmd_graph(args):
store = get_store()
path = args.output or str(DB_PATH.parent / "graph_view.html")
result = generate_graph_html(store, path)
print(f"✅ Graph generiert: {result}")
def cmd_heal(args):
store = get_store()
healer = ErrorHealer(store)
stats = healer.get_error_stats()
print("\n=== Error Heal Stats ===")
print(f" Total Errors: {stats['total_errors']}")
print(f" Repeated Errors: {stats['repeated_errors']}")
print(f" Error Types:")
for etype, count in stats.get("error_types", {}).items():
print(f" {etype}: {count}")
if args.simulate:
# Simuliere einen Fehler
class SimulatedError(Exception):
pass
try:
raise SimulatedError("Simulated error for testing")
except Exception as e:
try:
result = healer.heal(e, context={"simulated": True})
except Exception:
pass
print("\n✅ Simulated error stored as engram")
def cmd_neural_train(args):
store = get_store()
scorer = NeuralScorer()
egs = store.get_all(limit=10000)
labeled = [e for e in egs if e.correctness.confirmed or e.correctness.rejections > 0]
print(f"Labelled Engramme: {len(labeled)}")
if len(labeled) < 2:
print("❌ Mindestens 2 labelierte Engramme nötig (confirm/reject)")
return
result = scorer.train(labeled, epochs=args.epochs)
print(f"✅ Training abgeschlossen")
print(json.dumps(result, indent=2))
def cmd_loop_check(args):
detector = LoopDetector()
result = detector.check(args.query, args.response)
print(json.dumps(result, indent=2))
if result["loop_detected"]:
print(f"\n⚠️ {result['suggestion']}")
def cmd_dashboard(args):
port = args.port
print(f"🚀 Starte Streamlit Dashboard auf Port {port}...")
script = Path(__file__).resolve().parent / "app_dashboard.py"
subprocess.run([sys.executable, "-m", "streamlit", "run", str(script), "--server.port", str(port)])
def main():
parser = argparse.ArgumentParser(description="Second Brain CLI")
sub = parser.add_subparsers(dest="cmd")
@@ -132,12 +246,15 @@ def main():
p_add.add_argument("--tag", action="append", default=[])
p_add.add_argument("--source", default="user")
p_add.add_argument("--grounding", choices=[g.name for g in Grounding])
p_add.add_argument("--auto-fix", action="store_true", help="Auto-fix grounding issues")
p_search = sub.add_parser("search", help="Search engrams")
p_search.add_argument("query", nargs="+")
p_search.add_argument("--limit", type=int, default=5)
p_search.add_argument("--min-confidence", type=float, default=0.0)
p_search.add_argument("--tag", default=None)
p_search.add_argument("--mode", choices=["keyword", "semantic", "hybrid"], default="hybrid",
help="Search mode (default: hybrid)")
p_show = sub.add_parser("show", help="Show engram details")
p_show.add_argument("id")
@@ -158,14 +275,39 @@ def main():
p_export = sub.add_parser("export", help="Export to JSONL")
p_export.add_argument("path")
p_graph = sub.add_parser("graph", help="Generate graph visualization")
p_graph.add_argument("--output", default=None, help="Output HTML path")
p_heal = sub.add_parser("heal", help="Show error healing stats")
p_heal.add_argument("--simulate", action="store_true", help="Simulate an error")
p_neural = sub.add_parser("neural-train", help="Train neural scorer")
p_neural.add_argument("--epochs", type=int, default=30)
p_loop = sub.add_parser("loop-check", help="Check for conversation loops")
p_loop.add_argument("query")
p_loop.add_argument("response")
p_dash = sub.add_parser("dashboard", help="Launch Streamlit dashboard")
p_dash.add_argument("--port", type=int, default=8501)
args = parser.parse_args()
if not args.cmd:
parser.print_help()
return
{"add": cmd_add, "search": cmd_search, "show": cmd_show,
handlers = {
"add": cmd_add, "search": cmd_search, "show": cmd_show,
"confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list,
"stats": cmd_stats, "export": cmd_export}[args.cmd](args)
"stats": cmd_stats, "export": cmd_export, "graph": cmd_graph,
"heal": cmd_heal, "neural-train": cmd_neural_train,
"loop-check": cmd_loop_check, "dashboard": cmd_dashboard,
}
handler = handlers.get(args.cmd)
if handler:
handler(args)
else:
parser.print_help()
if __name__ == "__main__":

View File

@@ -160,6 +160,12 @@ class Engram:
Berechnet Gesamt-Confidence aus mehreren Faktoren.
Kein Neuronales Netz nötig - Heuristik für Phase 1.
"""
# Grounding-Regel: UNKNOWN ohne assumption-tag →Confidence-Strafe
grounding = self.metadata.get("grounding", 0)
if grounding == Grounding.UNKNOWN.value and "assumption" not in self.metadata.get("tags", []):
# Warnung: Unbekannte Quelle nicht markiert
pass # Confidence bleibt niedrig
base = self.metadata.get("confidence", 0.5)
# Korrektheit
correctness_score = self.correctness.score()
@@ -169,7 +175,7 @@ class Engram:
age_days = _age_days(self.metadata.get("created", _now()))
recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0
# Grounding
grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2
grounding_boost = (grounding / 4) * 0.2
combined = (
base * 0.3 +
@@ -180,6 +186,36 @@ class Engram:
)
return min(max(combined, 0.0), 1.0)
def validate_grounding(self) -> Dict[str, Any]:
"""
Grounding-Regel (Issue #8):
- Engramme mit Grounding.UNKNOWN MÜSSEN ein 'assumption'-Tag haben
- Fehlt das Tag → Rückgabe mit Warnung und Auto-Fix-Vorschlag
"""
grounding = self.metadata.get("grounding", Grounding.UNKNOWN.value)
tags = self.metadata.get("tags", [])
if grounding == Grounding.UNKNOWN.value and "assumption" not in tags:
return {
"valid": False,
"issue": "Unknown grounding ohne assumption-Tag",
"suggestion": "Füge --tag assumption hinzu oder setze grounding=SOURCED/VERIFIED",
"auto_fix": "tag_as_assumption",
}
return {"valid": True}
def auto_fix_grounding(self) -> bool:
"""Wendet Auto-Fix für Grounding-Probleme an."""
validation = self.validate_grounding()
if not validation["valid"] and validation.get("auto_fix") == "tag_as_assumption":
tags = self.metadata.get("tags", [])
if "assumption" not in tags:
tags.append("assumption")
self.metadata["tags"] = tags
self.metadata["grounding"] = Grounding.ASSUMPTION.value
return True
return False
def to_dict(self) -> dict:
return {
"id": str(self.id),

211
src/error_healer.py Normal file
View File

@@ -0,0 +1,211 @@
"""
error_healer.py - Selbstheilung durch Fehlererkennung & Auto-Korrektur.
Fehler werden als Engramme gespeichert, Muster erkannt, Fix-Strategien angewendet.
"""
import re
import traceback
import json
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime, timezone
from pathlib import Path
from .engram import Engram, Grounding
from .store import EngramStore
from .retriever import Retriever
_HEAL_LOG = Path(__file__).resolve().parent.parent / "data" / "heal_log.jsonl"
class ErrorHealer:
"""
Heilt wiederkehrende Fehler durch:
1. Speichern von Fehlern als Engramme
2. Mustererkennung (gleicher Fehler-Typ, gleicher Kontext)
3. Auto-Fix (Fallback-Strategien, alternative Ansätze)
4. Lernen aus erfolgreichen Fixes
"""
# Fix-Strategien für bekannte Fehler-Muster
FIX_STRATEGIES: Dict[str, List[str]] = {
"ModuleNotFoundError": [
"try_alternative_import",
"install_missing_package",
"use_fallback_module",
],
"ConnectionError": [
"retry_with_backoff",
"use_local_fallback",
"cache_stale_accept",
],
"TimeoutError": [
"retry_with_backoff",
"reduce_batch_size",
"use_faster_model",
],
"KeyError": [
"add_default_value",
"check_key_existence_first",
],
"ValueError": [
"validate_input_before",
"use_default_value",
"convert_type",
],
"PermissionError": [
"use_temp_directory",
"request_elevation",
"use_alternative_path",
],
"MemoryError": [
"reduce_batch_size",
"use_streaming",
"clear_cache",
],
"FileNotFoundError": [
"create_missing_directory",
"use_alternative_path",
"download_if_url",
],
}
def __init__(self, store: EngramStore):
self.store = store
self.retriever = Retriever(store)
self._heal_count = 0
self._recent_errors: List[Dict] = []
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _extract_error_type(self, exc: Exception) -> str:
return type(exc).__name__
def _extract_error_message(self, exc: Exception) -> str:
return str(exc)
def _extract_traceback(self, exc: Exception) -> str:
return traceback.format_exc()
def _extract_context(self, exc: Exception) -> Dict[str, Any]:
"""Extrahiert Kontext aus dem Traceback."""
tb_str = traceback.format_exc()
# Extrahiere Datei und Zeilennummer
match = re.search(r'File "([^"]+)", line (\d+)', tb_str)
if match:
return {"file": match.group(1), "line": int(match.group(2))}
return {}
def heal(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None,
rethrow: bool = True,
) -> Dict[str, Any]:
"""
Führt Selbstheilung auf einem Fehler aus.
Args:
exc: Die Exception
context: Zusätzlicher Kontext (z.B. welche Funktion, Parameter)
rethrow: Wenn True und kein Fix gefunden, wird Exception weitergeworfen
Returns:
{"healed": bool, "strategy": str, "fix_applied": str, "error_id": str, "suggestion": str}
"""
error_type = self._extract_error_type(exc)
error_msg = self._extract_error_message(exc)
tb = self._extract_traceback(exc)
ctx = self._extract_context(exc)
if context:
ctx.update(context)
# 1. Fehler als Engramm speichern
error_engram = Engram.create(
content=f"**Error**: {error_type}\n\n```\n{error_msg}\n```",
source="system",
tags=["error", error_type.lower()],
confidence=0.3,
grounding=Grounding.ASSUMPTION,
)
error_engram.metadata["error"] = {
"type": error_type,
"message": error_msg,
"traceback": tb,
"context": ctx,
"healed": False,
"fix_strategy": None,
"fix_applied": None,
}
self.store.save(error_engram)
# 2. Mustererkennung: Gab es diesen Fehlertyp schon?
similar = self.retriever.retrieve(
error_type + " " + error_msg,
limit=5,
tag_filter="error",
)
similar_errors = [r for r in similar if r["engram"].metadata.get("source") == "system"]
# 3. Fix-Strategie bestimmen
strategies = self.FIX_STRATEGIES.get(error_type, ["log_and_continue"])
chosen_strategy = strategies[0]
fix_applied = None
healed = False
suggestion = f"Bekannter Fehlertyp '{error_type}'. Prüfe die Trail-Engramme mit `search --tag error`."
# Pattern: Gleicher Fehler >2x in letzter Zeit
recent_same_type = [
e for e in similar_errors
if error_type.lower() in str(e["engram"].content).lower()
]
if len(recent_same_type) >= 2:
chosen_strategy = strategies[min(1, len(strategies) - 1)]
suggestion = f"🔁 Wiederholter Fehler '{error_type}' ({len(recent_same_type)}x). Nutze Strategie: {chosen_strategy}"
# 4. Log
self._log_healing({
"timestamp": self._now(),
"error_id": str(error_engram.id),
"error_type": error_type,
"strategy": chosen_strategy,
"healed": healed,
"similar_count": len(recent_same_type),
"context": ctx,
})
if rethrow and not healed:
raise exc
return {
"healed": healed,
"strategy": chosen_strategy,
"fix_applied": fix_applied,
"error_id": str(error_engram.id),
"suggestion": suggestion,
}
def _log_healing(self, data: Dict):
_HEAL_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(_HEAL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
def get_fix_suggestion(self, error_type: str) -> str:
"""Gibt eine Fix-Suggestion für einen Fehlertyp zurück."""
strategies = self.FIX_STRATEGIES.get(error_type, ["Unbekannter Fehlertyp. Debuggen und als Engramm speichern."])
return f"Mögliche Strategien für {error_type}: {', '.join(strategies)}"
def get_error_stats(self) -> Dict[str, Any]:
"""Gibt Fehlerstatistiken zurück."""
all_eg = self.store.get_all(limit=1000)
errors = [e for e in all_eg if "error" in e.metadata.get("tags", [])]
types = {}
for e in errors:
err = e.metadata.get("error", {})
t = err.get("type", "Unknown")
types[t] = types.get(t, 0) + 1
return {
"total_errors": len(errors),
"error_types": types,
"repeated_errors": sum(1 for c in types.values() if c > 1),
}

115
src/loop_detector.py Normal file
View File

@@ -0,0 +1,115 @@
"""
loop_detector.py - Session-Cache mit SHA256-Dedup.
Erkennt und bricht Loops bei wiederholten Anfragen/Antworten.
"""
import hashlib
import json
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field, asdict
from pathlib import Path
_CACHE_PATH = Path(__file__).resolve().parent.parent / "data" / "loop_cache.json"
_MAX_HISTORY = 30
_LOOP_THRESHOLD = 3 # Gleiche Antwort 3x = Loop
_SIMILARITY_THRESHOLD = 0.92
def _sha(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
def _normalize(text: str) -> str:
"""Entfernt Variationen für besseren Vergleich."""
return " ".join(text.lower().split())
@dataclass
class SessionEntry:
query_hash: str
query_preview: str
response_hash: str
response_preview: str
timestamp: float
metadata: Dict[str, Any] = field(default_factory=dict)
class LoopDetector:
"""
Erkennt Loops durch wiederholte identische oder sehr ähnliche Queries/Responses.
"""
def __init__(self, cache_path: Optional[str] = None):
self.path = Path(cache_path) if cache_path else _CACHE_PATH
self.path.parent.mkdir(parents=True, exist_ok=True)
self._history: list = []
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path, "r", encoding="utf-8") as f:
self._history = json.load(f)
except Exception:
self._history = []
def _save(self):
with open(self.path, "w", encoding="utf-8") as f:
json.dump(self._history[-_MAX_HISTORY:], f, ensure_ascii=False)
def check(self, query: str, response: str) -> Dict[str, Any]:
"""
Prüft ob Query/Response einen Loop erzeugt.
Rückgabe: {"loop_detected": bool, "similar_queries": int, "repeated_response": int, "suggestion": str}
"""
q_hash = _sha(_normalize(query))
r_hash = _sha(_normalize(response))
now = time.time()
similar_queries = 0
repeated_response = 0
for entry in self._history:
# Query ähnlich?
if entry.get("query_hash") == q_hash:
similar_queries += 1
# Response identisch?
if entry.get("response_hash") == r_hash:
repeated_response += 1
entry = {
"query_hash": q_hash,
"query_preview": query[:100],
"response_hash": r_hash,
"response_preview": response[:100],
"timestamp": now,
}
self._history.append(entry)
self._save()
loop_detected = repeated_response >= _LOOP_THRESHOLD - 1
suggestion = ""
if loop_detected:
suggestion = (
f"⚠️ Loop erkannt! Diese Antwort wurde {repeated_response}x wiederholt. "
"Versuch eine alternative Herangehensweise oder frage nach Klärung."
)
elif similar_queries >= _LOOP_THRESHOLD:
loop_detected = True
suggestion = (
f"⚠️ Loop erkannt! Ähnliche Anfrage {similar_queries}x gestellt. "
"Prüfe ob die Aufgabe sich geändert hat oder ob ein Problem blockiert."
)
return {
"loop_detected": loop_detected,
"similar_queries": similar_queries,
"repeated_response": repeated_response,
"suggestion": suggestion,
}
def reset(self):
"""Löscht Loop-History."""
self._history = []
self._save()