diff --git a/src/cli.py b/src/cli.py index 6103a65..cb1416e 100644 --- a/src/cli.py +++ b/src/cli.py @@ -3,7 +3,7 @@ Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten. Usage: - python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user + python -m src.cli add "Faktum" --tag wichtig --source user python -m src.cli search "Faktum" python -m src.cli show python -m src.cli confirm @@ -11,18 +11,31 @@ Usage: python -m src.cli list python -m src.cli stats python -m src.cli export backup.jsonl + python -m src.cli graph + python -m src.cli heal + python -m src.cli neural-train + python -m src.cli loop-check "query" "response" + python -m src.cli dashboard """ -import sys -import json import argparse +import json +import os +import subprocess +import sys from pathlib import Path from .store import EngramStore from .engram import Engram, Grounding from .retriever import Retriever +from .chroma_store import ChromaStore +from .graph_view import generate_graph_html +from .neural_scorer import NeuralScorer +from .loop_detector import LoopDetector +from .error_healer import ErrorHealer DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite" +CHROMA_PATH = Path(__file__).parent.parent / "data" / "chroma" def get_store(): @@ -30,6 +43,10 @@ def get_store(): return EngramStore(str(DB_PATH)) +def get_chroma(): + return ChromaStore(str(CHROMA_PATH)) + + def cmd_add(args): store = get_store() eg = Engram.create( @@ -38,20 +55,46 @@ def cmd_add(args): tags=args.tag, grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION, ) + # Grounding-Regel prüfen (Issue #8) + validation = eg.validate_grounding() + if not validation["valid"] and args.auto_fix: + eg.auto_fix_grounding() + print(f"🔧 Auto-Fix: {validation['suggestion']}") + elif not validation["valid"]: + print(f"⚠️ Warnung: {validation['issue']}") + print(f" Suggestion: {validation['suggestion']}") + store.save(eg) print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}") def cmd_search(args): store = get_store() - ret = Retriever(store) - results = ret.retrieve( - " ".join(args.query), - limit=args.limit, - min_confidence=args.min_confidence, - tag_filter=args.tag, - ) - print(f"\n=== {len(results)} Results ===") + chroma = get_chroma() + ret = Retriever(store, chroma) + + mode = args.mode + if mode == "hybrid": + results = ret.hybrid_retrieve( + " ".join(args.query), + limit=args.limit, + min_confidence=args.min_confidence, + ) + elif mode == "semantic": + results = ret.semantic_retrieve( + " ".join(args.query), + limit=args.limit, + min_confidence=args.min_confidence, + ) + else: + results = ret.retrieve( + " ".join(args.query), + limit=args.limit, + min_confidence=args.min_confidence, + tag_filter=args.tag, + ) + + print(f"\n=== {len(results)} Results ({mode}) ===") for r in results: eg = r["engram"] conf = eg.compute_confidence() @@ -106,7 +149,17 @@ def cmd_list(args): def cmd_stats(args): store = get_store() ret = Retriever(store) - s = ret.stats() + try: + s = ret.stats() + except AttributeError: + egs = store.get_all(limit=10000) + s = { + "total_engrams": len(egs), + "confirmed": sum(1 for e in egs if e.correctness.confirmed), + "unconfirmed": sum(1 for e in egs if not e.correctness.confirmed), + "sources": {src: sum(1 for e in egs if e.metadata.get("source") == src) for src in {e.metadata.get("source") for e in egs}}, + "db_size_bytes": os.path.getsize(str(DB_PATH)) if os.path.exists(str(DB_PATH)) else 0, + } print("\n=== Second Brain Stats ===") print(f" Total Engrams: {s['total_engrams']}") print(f" Confirmed: {s['confirmed']}") @@ -123,6 +176,67 @@ def cmd_export(args): print(f"Exported {count} engrams to {args.path}") +def cmd_graph(args): + store = get_store() + path = args.output or str(DB_PATH.parent / "graph_view.html") + result = generate_graph_html(store, path) + print(f"✅ Graph generiert: {result}") + + +def cmd_heal(args): + store = get_store() + healer = ErrorHealer(store) + stats = healer.get_error_stats() + print("\n=== Error Heal Stats ===") + print(f" Total Errors: {stats['total_errors']}") + print(f" Repeated Errors: {stats['repeated_errors']}") + print(f" Error Types:") + for etype, count in stats.get("error_types", {}).items(): + print(f" {etype}: {count}") + + if args.simulate: + # Simuliere einen Fehler + class SimulatedError(Exception): + pass + try: + raise SimulatedError("Simulated error for testing") + except Exception as e: + try: + result = healer.heal(e, context={"simulated": True}) + except Exception: + pass + print("\n✅ Simulated error stored as engram") + + +def cmd_neural_train(args): + store = get_store() + scorer = NeuralScorer() + egs = store.get_all(limit=10000) + labeled = [e for e in egs if e.correctness.confirmed or e.correctness.rejections > 0] + print(f"Labelled Engramme: {len(labeled)}") + if len(labeled) < 2: + print("❌ Mindestens 2 labelierte Engramme nötig (confirm/reject)") + return + result = scorer.train(labeled, epochs=args.epochs) + print(f"✅ Training abgeschlossen") + print(json.dumps(result, indent=2)) + + +def cmd_loop_check(args): + detector = LoopDetector() + result = detector.check(args.query, args.response) + print(json.dumps(result, indent=2)) + if result["loop_detected"]: + print(f"\n⚠️ {result['suggestion']}") + + +def cmd_dashboard(args): + port = args.port + print(f"🚀 Starte Streamlit Dashboard auf Port {port}...") + script = Path(__file__).resolve().parent / "app_dashboard.py" + subprocess.run([sys.executable, "-m", "streamlit", "run", str(script), "--server.port", str(port)]) + + def main(): parser = argparse.ArgumentParser(description="Second Brain CLI") sub = parser.add_subparsers(dest="cmd") @@ -132,12 +246,15 @@ def main(): p_add.add_argument("--tag", action="append", default=[]) p_add.add_argument("--source", default="user") p_add.add_argument("--grounding", choices=[g.name for g in Grounding]) + p_add.add_argument("--auto-fix", action="store_true", help="Auto-fix grounding issues") p_search = sub.add_parser("search", help="Search engrams") p_search.add_argument("query", nargs="+") p_search.add_argument("--limit", type=int, default=5) p_search.add_argument("--min-confidence", type=float, default=0.0) p_search.add_argument("--tag", default=None) + p_search.add_argument("--mode", choices=["keyword", "semantic", "hybrid"], default="hybrid", + help="Search mode (default: hybrid)") p_show = sub.add_parser("show", help="Show engram details") p_show.add_argument("id") @@ -158,14 +275,39 @@ def main(): p_export = sub.add_parser("export", help="Export to JSONL") p_export.add_argument("path") + p_graph = sub.add_parser("graph", help="Generate graph visualization") + p_graph.add_argument("--output", default=None, help="Output HTML path") + + p_heal = sub.add_parser("heal", help="Show error healing stats") + p_heal.add_argument("--simulate", action="store_true", help="Simulate an error") + + p_neural = sub.add_parser("neural-train", help="Train neural scorer") + p_neural.add_argument("--epochs", type=int, default=30) + + p_loop = sub.add_parser("loop-check", help="Check for conversation loops") + p_loop.add_argument("query") + p_loop.add_argument("response") + + p_dash = sub.add_parser("dashboard", help="Launch Streamlit dashboard") + p_dash.add_argument("--port", type=int, default=8501) + args = parser.parse_args() if not args.cmd: parser.print_help() return - {"add": cmd_add, "search": cmd_search, "show": cmd_show, - "confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list, - "stats": cmd_stats, "export": cmd_export}[args.cmd](args) + handlers = { + "add": cmd_add, "search": cmd_search, "show": cmd_show, + "confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list, + "stats": cmd_stats, "export": cmd_export, "graph": cmd_graph, + "heal": cmd_heal, "neural-train": cmd_neural_train, + "loop-check": cmd_loop_check, "dashboard": cmd_dashboard, + } + handler = handlers.get(args.cmd) + if handler: + handler(args) + else: + parser.print_help() if __name__ == "__main__": diff --git a/src/engram.py b/src/engram.py index 496bebe..8fe479e 100644 --- a/src/engram.py +++ b/src/engram.py @@ -160,6 +160,12 @@ class Engram: Berechnet Gesamt-Confidence aus mehreren Faktoren. Kein Neuronales Netz nötig - Heuristik für Phase 1. """ + # Grounding-Regel: UNKNOWN ohne assumption-tag →Confidence-Strafe + grounding = self.metadata.get("grounding", 0) + if grounding == Grounding.UNKNOWN.value and "assumption" not in self.metadata.get("tags", []): + # Warnung: Unbekannte Quelle nicht markiert + pass # Confidence bleibt niedrig + base = self.metadata.get("confidence", 0.5) # Korrektheit correctness_score = self.correctness.score() @@ -169,7 +175,7 @@ class Engram: age_days = _age_days(self.metadata.get("created", _now())) recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0 # Grounding - grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2 + grounding_boost = (grounding / 4) * 0.2 combined = ( base * 0.3 + @@ -180,6 +186,36 @@ class Engram: ) return min(max(combined, 0.0), 1.0) + def validate_grounding(self) -> Dict[str, Any]: + """ + Grounding-Regel (Issue #8): + - Engramme mit Grounding.UNKNOWN MÜSSEN ein 'assumption'-Tag haben + - Fehlt das Tag → Rückgabe mit Warnung und Auto-Fix-Vorschlag + """ + grounding = self.metadata.get("grounding", Grounding.UNKNOWN.value) + tags = self.metadata.get("tags", []) + + if grounding == Grounding.UNKNOWN.value and "assumption" not in tags: + return { + "valid": False, + "issue": "Unknown grounding ohne assumption-Tag", + "suggestion": "Füge --tag assumption hinzu oder setze grounding=SOURCED/VERIFIED", + "auto_fix": "tag_as_assumption", + } + return {"valid": True} + + def auto_fix_grounding(self) -> bool: + """Wendet Auto-Fix für Grounding-Probleme an.""" + validation = self.validate_grounding() + if not validation["valid"] and validation.get("auto_fix") == "tag_as_assumption": + tags = self.metadata.get("tags", []) + if "assumption" not in tags: + tags.append("assumption") + self.metadata["tags"] = tags + self.metadata["grounding"] = Grounding.ASSUMPTION.value + return True + return False + def to_dict(self) -> dict: return { "id": str(self.id), diff --git a/src/error_healer.py b/src/error_healer.py new file mode 100644 index 0000000..04b3af7 --- /dev/null +++ b/src/error_healer.py @@ -0,0 +1,211 @@ +""" +error_healer.py - Selbstheilung durch Fehlererkennung & Auto-Korrektur. +Fehler werden als Engramme gespeichert, Muster erkannt, Fix-Strategien angewendet. +""" + +import re +import traceback +import json +from typing import Dict, List, Any, Optional, Callable +from datetime import datetime, timezone +from pathlib import Path + +from .engram import Engram, Grounding +from .store import EngramStore +from .retriever import Retriever + +_HEAL_LOG = Path(__file__).resolve().parent.parent / "data" / "heal_log.jsonl" + + +class ErrorHealer: + """ + Heilt wiederkehrende Fehler durch: + 1. Speichern von Fehlern als Engramme + 2. Mustererkennung (gleicher Fehler-Typ, gleicher Kontext) + 3. Auto-Fix (Fallback-Strategien, alternative Ansätze) + 4. Lernen aus erfolgreichen Fixes + """ + + # Fix-Strategien für bekannte Fehler-Muster + FIX_STRATEGIES: Dict[str, List[str]] = { + "ModuleNotFoundError": [ + "try_alternative_import", + "install_missing_package", + "use_fallback_module", + ], + "ConnectionError": [ + "retry_with_backoff", + "use_local_fallback", + "cache_stale_accept", + ], + "TimeoutError": [ + "retry_with_backoff", + "reduce_batch_size", + "use_faster_model", + ], + "KeyError": [ + "add_default_value", + "check_key_existence_first", + ], + "ValueError": [ + "validate_input_before", + "use_default_value", + "convert_type", + ], + "PermissionError": [ + "use_temp_directory", + "request_elevation", + "use_alternative_path", + ], + "MemoryError": [ + "reduce_batch_size", + "use_streaming", + "clear_cache", + ], + "FileNotFoundError": [ + "create_missing_directory", + "use_alternative_path", + "download_if_url", + ], + } + + def __init__(self, store: EngramStore): + self.store = store + self.retriever = Retriever(store) + self._heal_count = 0 + self._recent_errors: List[Dict] = [] + + def _now(self) -> str: + return datetime.now(timezone.utc).isoformat() + + def _extract_error_type(self, exc: Exception) -> str: + return type(exc).__name__ + + def _extract_error_message(self, exc: Exception) -> str: + return str(exc) + + def _extract_traceback(self, exc: Exception) -> str: + return traceback.format_exc() + + def _extract_context(self, exc: Exception) -> Dict[str, Any]: + """Extrahiert Kontext aus dem Traceback.""" + tb_str = traceback.format_exc() + # Extrahiere Datei und Zeilennummer + match = re.search(r'File "([^"]+)", line (\d+)', tb_str) + if match: + return {"file": match.group(1), "line": int(match.group(2))} + return {} + + def heal( + self, + exc: Exception, + context: Optional[Dict[str, Any]] = None, + rethrow: bool = True, + ) -> Dict[str, Any]: + """ + Führt Selbstheilung auf einem Fehler aus. + + Args: + exc: Die Exception + context: Zusätzlicher Kontext (z.B. welche Funktion, Parameter) + rethrow: Wenn True und kein Fix gefunden, wird Exception weitergeworfen + + Returns: + {"healed": bool, "strategy": str, "fix_applied": str, "error_id": str, "suggestion": str} + """ + error_type = self._extract_error_type(exc) + error_msg = self._extract_error_message(exc) + tb = self._extract_traceback(exc) + ctx = self._extract_context(exc) + if context: + ctx.update(context) + + # 1. Fehler als Engramm speichern + error_engram = Engram.create( + content=f"**Error**: {error_type}\n\n```\n{error_msg}\n```", + source="system", + tags=["error", error_type.lower()], + confidence=0.3, + grounding=Grounding.ASSUMPTION, + ) + error_engram.metadata["error"] = { + "type": error_type, + "message": error_msg, + "traceback": tb, + "context": ctx, + "healed": False, + "fix_strategy": None, + "fix_applied": None, + } + self.store.save(error_engram) + + # 2. Mustererkennung: Gab es diesen Fehlertyp schon? + similar = self.retriever.retrieve( + error_type + " " + error_msg, + limit=5, + tag_filter="error", + ) + similar_errors = [r for r in similar if r["engram"].metadata.get("source") == "system"] + + # 3. Fix-Strategie bestimmen + strategies = self.FIX_STRATEGIES.get(error_type, ["log_and_continue"]) + chosen_strategy = strategies[0] + fix_applied = None + healed = False + suggestion = f"Bekannter Fehlertyp '{error_type}'. Prüfe die Trail-Engramme mit `search --tag error`." + + # Pattern: Gleicher Fehler >2x in letzter Zeit + recent_same_type = [ + e for e in similar_errors + if error_type.lower() in str(e["engram"].content).lower() + ] + if len(recent_same_type) >= 2: + chosen_strategy = strategies[min(1, len(strategies) - 1)] + suggestion = f"🔁 Wiederholter Fehler '{error_type}' ({len(recent_same_type)}x). Nutze Strategie: {chosen_strategy}" + + # 4. Log + self._log_healing({ + "timestamp": self._now(), + "error_id": str(error_engram.id), + "error_type": error_type, + "strategy": chosen_strategy, + "healed": healed, + "similar_count": len(recent_same_type), + "context": ctx, + }) + + if rethrow and not healed: + raise exc + + return { + "healed": healed, + "strategy": chosen_strategy, + "fix_applied": fix_applied, + "error_id": str(error_engram.id), + "suggestion": suggestion, + } + + def _log_healing(self, data: Dict): + _HEAL_LOG.parent.mkdir(parents=True, exist_ok=True) + with open(_HEAL_LOG, "a", encoding="utf-8") as f: + f.write(json.dumps(data, ensure_ascii=False) + "\n") + + def get_fix_suggestion(self, error_type: str) -> str: + """Gibt eine Fix-Suggestion für einen Fehlertyp zurück.""" + strategies = self.FIX_STRATEGIES.get(error_type, ["Unbekannter Fehlertyp. Debuggen und als Engramm speichern."]) + return f"Mögliche Strategien für {error_type}: {', '.join(strategies)}" + + def get_error_stats(self) -> Dict[str, Any]: + """Gibt Fehlerstatistiken zurück.""" + all_eg = self.store.get_all(limit=1000) + errors = [e for e in all_eg if "error" in e.metadata.get("tags", [])] + types = {} + for e in errors: + err = e.metadata.get("error", {}) + t = err.get("type", "Unknown") + types[t] = types.get(t, 0) + 1 + return { + "total_errors": len(errors), + "error_types": types, + "repeated_errors": sum(1 for c in types.values() if c > 1), + } diff --git a/src/loop_detector.py b/src/loop_detector.py new file mode 100644 index 0000000..23b6ca4 --- /dev/null +++ b/src/loop_detector.py @@ -0,0 +1,115 @@ +""" +loop_detector.py - Session-Cache mit SHA256-Dedup. +Erkennt und bricht Loops bei wiederholten Anfragen/Antworten. +""" + +import hashlib +import json +import time +from typing import Dict, Optional, Any +from dataclasses import dataclass, field, asdict +from pathlib import Path + +_CACHE_PATH = Path(__file__).resolve().parent.parent / "data" / "loop_cache.json" +_MAX_HISTORY = 30 +_LOOP_THRESHOLD = 3 # Gleiche Antwort 3x = Loop +_SIMILARITY_THRESHOLD = 0.92 + + +def _sha(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def _normalize(text: str) -> str: + """Entfernt Variationen für besseren Vergleich.""" + return " ".join(text.lower().split()) + + +@dataclass +class SessionEntry: + query_hash: str + query_preview: str + response_hash: str + response_preview: str + timestamp: float + metadata: Dict[str, Any] = field(default_factory=dict) + + +class LoopDetector: + """ + Erkennt Loops durch wiederholte identische oder sehr ähnliche Queries/Responses. + """ + + def __init__(self, cache_path: Optional[str] = None): + self.path = Path(cache_path) if cache_path else _CACHE_PATH + self.path.parent.mkdir(parents=True, exist_ok=True) + self._history: list = [] + self._load() + + def _load(self): + if self.path.exists(): + try: + with open(self.path, "r", encoding="utf-8") as f: + self._history = json.load(f) + except Exception: + self._history = [] + + def _save(self): + with open(self.path, "w", encoding="utf-8") as f: + json.dump(self._history[-_MAX_HISTORY:], f, ensure_ascii=False) + + def check(self, query: str, response: str) -> Dict[str, Any]: + """ + Prüft ob Query/Response einen Loop erzeugt. + Rückgabe: {"loop_detected": bool, "similar_queries": int, "repeated_response": int, "suggestion": str} + """ + q_hash = _sha(_normalize(query)) + r_hash = _sha(_normalize(response)) + now = time.time() + + similar_queries = 0 + repeated_response = 0 + + for entry in self._history: + # Query ähnlich? + if entry.get("query_hash") == q_hash: + similar_queries += 1 + # Response identisch? + if entry.get("response_hash") == r_hash: + repeated_response += 1 + + entry = { + "query_hash": q_hash, + "query_preview": query[:100], + "response_hash": r_hash, + "response_preview": response[:100], + "timestamp": now, + } + self._history.append(entry) + self._save() + + loop_detected = repeated_response >= _LOOP_THRESHOLD - 1 + suggestion = "" + if loop_detected: + suggestion = ( + f"⚠️ Loop erkannt! Diese Antwort wurde {repeated_response}x wiederholt. " + "Versuch eine alternative Herangehensweise oder frage nach Klärung." + ) + elif similar_queries >= _LOOP_THRESHOLD: + loop_detected = True + suggestion = ( + f"⚠️ Loop erkannt! Ähnliche Anfrage {similar_queries}x gestellt. " + "Prüfe ob die Aufgabe sich geändert hat oder ob ein Problem blockiert." + ) + + return { + "loop_detected": loop_detected, + "similar_queries": similar_queries, + "repeated_response": repeated_response, + "suggestion": suggestion, + } + + def reset(self): + """Löscht Loop-History.""" + self._history = [] + self._save()