Compare commits
1 Commits
v0.2.0
...
feature/ph
| Author | SHA1 | Date | |
|---|---|---|---|
| 687f1df818 |
156
src/cli.py
156
src/cli.py
@@ -3,7 +3,7 @@
|
|||||||
Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten.
|
Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user
|
python -m src.cli add "Faktum" --tag wichtig --source user
|
||||||
python -m src.cli search "Faktum"
|
python -m src.cli search "Faktum"
|
||||||
python -m src.cli show <id>
|
python -m src.cli show <id>
|
||||||
python -m src.cli confirm <id>
|
python -m src.cli confirm <id>
|
||||||
@@ -11,18 +11,31 @@ Usage:
|
|||||||
python -m src.cli list
|
python -m src.cli list
|
||||||
python -m src.cli stats
|
python -m src.cli stats
|
||||||
python -m src.cli export backup.jsonl
|
python -m src.cli export backup.jsonl
|
||||||
|
python -m src.cli graph
|
||||||
|
python -m src.cli heal
|
||||||
|
python -m src.cli neural-train
|
||||||
|
python -m src.cli loop-check "query" "response"
|
||||||
|
python -m src.cli dashboard
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .store import EngramStore
|
from .store import EngramStore
|
||||||
from .engram import Engram, Grounding
|
from .engram import Engram, Grounding
|
||||||
from .retriever import Retriever
|
from .retriever import Retriever
|
||||||
|
from .chroma_store import ChromaStore
|
||||||
|
from .graph_view import generate_graph_html
|
||||||
|
from .neural_scorer import NeuralScorer
|
||||||
|
from .loop_detector import LoopDetector
|
||||||
|
from .error_healer import ErrorHealer
|
||||||
|
|
||||||
DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite"
|
DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite"
|
||||||
|
CHROMA_PATH = Path(__file__).parent.parent / "data" / "chroma"
|
||||||
|
|
||||||
|
|
||||||
def get_store():
|
def get_store():
|
||||||
@@ -30,6 +43,10 @@ def get_store():
|
|||||||
return EngramStore(str(DB_PATH))
|
return EngramStore(str(DB_PATH))
|
||||||
|
|
||||||
|
|
||||||
|
def get_chroma():
|
||||||
|
return ChromaStore(str(CHROMA_PATH))
|
||||||
|
|
||||||
|
|
||||||
def cmd_add(args):
|
def cmd_add(args):
|
||||||
store = get_store()
|
store = get_store()
|
||||||
eg = Engram.create(
|
eg = Engram.create(
|
||||||
@@ -38,20 +55,46 @@ def cmd_add(args):
|
|||||||
tags=args.tag,
|
tags=args.tag,
|
||||||
grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION,
|
grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION,
|
||||||
)
|
)
|
||||||
|
# Grounding-Regel prüfen (Issue #8)
|
||||||
|
validation = eg.validate_grounding()
|
||||||
|
if not validation["valid"] and args.auto_fix:
|
||||||
|
eg.auto_fix_grounding()
|
||||||
|
print(f"🔧 Auto-Fix: {validation['suggestion']}")
|
||||||
|
elif not validation["valid"]:
|
||||||
|
print(f"⚠️ Warnung: {validation['issue']}")
|
||||||
|
print(f" Suggestion: {validation['suggestion']}")
|
||||||
|
|
||||||
store.save(eg)
|
store.save(eg)
|
||||||
print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}")
|
print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}")
|
||||||
|
|
||||||
|
|
||||||
def cmd_search(args):
|
def cmd_search(args):
|
||||||
store = get_store()
|
store = get_store()
|
||||||
ret = Retriever(store)
|
chroma = get_chroma()
|
||||||
|
ret = Retriever(store, chroma)
|
||||||
|
|
||||||
|
mode = args.mode
|
||||||
|
if mode == "hybrid":
|
||||||
|
results = ret.hybrid_retrieve(
|
||||||
|
" ".join(args.query),
|
||||||
|
limit=args.limit,
|
||||||
|
min_confidence=args.min_confidence,
|
||||||
|
)
|
||||||
|
elif mode == "semantic":
|
||||||
|
results = ret.semantic_retrieve(
|
||||||
|
" ".join(args.query),
|
||||||
|
limit=args.limit,
|
||||||
|
min_confidence=args.min_confidence,
|
||||||
|
)
|
||||||
|
else:
|
||||||
results = ret.retrieve(
|
results = ret.retrieve(
|
||||||
" ".join(args.query),
|
" ".join(args.query),
|
||||||
limit=args.limit,
|
limit=args.limit,
|
||||||
min_confidence=args.min_confidence,
|
min_confidence=args.min_confidence,
|
||||||
tag_filter=args.tag,
|
tag_filter=args.tag,
|
||||||
)
|
)
|
||||||
print(f"\n=== {len(results)} Results ===")
|
|
||||||
|
print(f"\n=== {len(results)} Results ({mode}) ===")
|
||||||
for r in results:
|
for r in results:
|
||||||
eg = r["engram"]
|
eg = r["engram"]
|
||||||
conf = eg.compute_confidence()
|
conf = eg.compute_confidence()
|
||||||
@@ -106,7 +149,17 @@ def cmd_list(args):
|
|||||||
def cmd_stats(args):
|
def cmd_stats(args):
|
||||||
store = get_store()
|
store = get_store()
|
||||||
ret = Retriever(store)
|
ret = Retriever(store)
|
||||||
|
try:
|
||||||
s = ret.stats()
|
s = ret.stats()
|
||||||
|
except AttributeError:
|
||||||
|
egs = store.get_all(limit=10000)
|
||||||
|
s = {
|
||||||
|
"total_engrams": len(egs),
|
||||||
|
"confirmed": sum(1 for e in egs if e.correctness.confirmed),
|
||||||
|
"unconfirmed": sum(1 for e in egs if not e.correctness.confirmed),
|
||||||
|
"sources": {src: sum(1 for e in egs if e.metadata.get("source") == src) for src in {e.metadata.get("source") for e in egs}},
|
||||||
|
"db_size_bytes": os.path.getsize(str(DB_PATH)) if os.path.exists(str(DB_PATH)) else 0,
|
||||||
|
}
|
||||||
print("\n=== Second Brain Stats ===")
|
print("\n=== Second Brain Stats ===")
|
||||||
print(f" Total Engrams: {s['total_engrams']}")
|
print(f" Total Engrams: {s['total_engrams']}")
|
||||||
print(f" Confirmed: {s['confirmed']}")
|
print(f" Confirmed: {s['confirmed']}")
|
||||||
@@ -123,6 +176,67 @@ def cmd_export(args):
|
|||||||
print(f"Exported {count} engrams to {args.path}")
|
print(f"Exported {count} engrams to {args.path}")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_graph(args):
|
||||||
|
store = get_store()
|
||||||
|
path = args.output or str(DB_PATH.parent / "graph_view.html")
|
||||||
|
result = generate_graph_html(store, path)
|
||||||
|
print(f"✅ Graph generiert: {result}")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_heal(args):
|
||||||
|
store = get_store()
|
||||||
|
healer = ErrorHealer(store)
|
||||||
|
stats = healer.get_error_stats()
|
||||||
|
print("\n=== Error Heal Stats ===")
|
||||||
|
print(f" Total Errors: {stats['total_errors']}")
|
||||||
|
print(f" Repeated Errors: {stats['repeated_errors']}")
|
||||||
|
print(f" Error Types:")
|
||||||
|
for etype, count in stats.get("error_types", {}).items():
|
||||||
|
print(f" {etype}: {count}")
|
||||||
|
|
||||||
|
if args.simulate:
|
||||||
|
# Simuliere einen Fehler
|
||||||
|
class SimulatedError(Exception):
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
raise SimulatedError("Simulated error for testing")
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
result = healer.heal(e, context={"simulated": True})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
print("\n✅ Simulated error stored as engram")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_neural_train(args):
|
||||||
|
store = get_store()
|
||||||
|
scorer = NeuralScorer()
|
||||||
|
egs = store.get_all(limit=10000)
|
||||||
|
labeled = [e for e in egs if e.correctness.confirmed or e.correctness.rejections > 0]
|
||||||
|
print(f"Labelled Engramme: {len(labeled)}")
|
||||||
|
if len(labeled) < 2:
|
||||||
|
print("❌ Mindestens 2 labelierte Engramme nötig (confirm/reject)")
|
||||||
|
return
|
||||||
|
result = scorer.train(labeled, epochs=args.epochs)
|
||||||
|
print(f"✅ Training abgeschlossen")
|
||||||
|
print(json.dumps(result, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_loop_check(args):
|
||||||
|
detector = LoopDetector()
|
||||||
|
result = detector.check(args.query, args.response)
|
||||||
|
print(json.dumps(result, indent=2))
|
||||||
|
if result["loop_detected"]:
|
||||||
|
print(f"\n⚠️ {result['suggestion']}")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_dashboard(args):
|
||||||
|
port = args.port
|
||||||
|
print(f"🚀 Starte Streamlit Dashboard auf Port {port}...")
|
||||||
|
script = Path(__file__).resolve().parent / "app_dashboard.py"
|
||||||
|
subprocess.run([sys.executable, "-m", "streamlit", "run", str(script), "--server.port", str(port)])
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="Second Brain CLI")
|
parser = argparse.ArgumentParser(description="Second Brain CLI")
|
||||||
sub = parser.add_subparsers(dest="cmd")
|
sub = parser.add_subparsers(dest="cmd")
|
||||||
@@ -132,12 +246,15 @@ def main():
|
|||||||
p_add.add_argument("--tag", action="append", default=[])
|
p_add.add_argument("--tag", action="append", default=[])
|
||||||
p_add.add_argument("--source", default="user")
|
p_add.add_argument("--source", default="user")
|
||||||
p_add.add_argument("--grounding", choices=[g.name for g in Grounding])
|
p_add.add_argument("--grounding", choices=[g.name for g in Grounding])
|
||||||
|
p_add.add_argument("--auto-fix", action="store_true", help="Auto-fix grounding issues")
|
||||||
|
|
||||||
p_search = sub.add_parser("search", help="Search engrams")
|
p_search = sub.add_parser("search", help="Search engrams")
|
||||||
p_search.add_argument("query", nargs="+")
|
p_search.add_argument("query", nargs="+")
|
||||||
p_search.add_argument("--limit", type=int, default=5)
|
p_search.add_argument("--limit", type=int, default=5)
|
||||||
p_search.add_argument("--min-confidence", type=float, default=0.0)
|
p_search.add_argument("--min-confidence", type=float, default=0.0)
|
||||||
p_search.add_argument("--tag", default=None)
|
p_search.add_argument("--tag", default=None)
|
||||||
|
p_search.add_argument("--mode", choices=["keyword", "semantic", "hybrid"], default="hybrid",
|
||||||
|
help="Search mode (default: hybrid)")
|
||||||
|
|
||||||
p_show = sub.add_parser("show", help="Show engram details")
|
p_show = sub.add_parser("show", help="Show engram details")
|
||||||
p_show.add_argument("id")
|
p_show.add_argument("id")
|
||||||
@@ -158,14 +275,39 @@ def main():
|
|||||||
p_export = sub.add_parser("export", help="Export to JSONL")
|
p_export = sub.add_parser("export", help="Export to JSONL")
|
||||||
p_export.add_argument("path")
|
p_export.add_argument("path")
|
||||||
|
|
||||||
|
p_graph = sub.add_parser("graph", help="Generate graph visualization")
|
||||||
|
p_graph.add_argument("--output", default=None, help="Output HTML path")
|
||||||
|
|
||||||
|
p_heal = sub.add_parser("heal", help="Show error healing stats")
|
||||||
|
p_heal.add_argument("--simulate", action="store_true", help="Simulate an error")
|
||||||
|
|
||||||
|
p_neural = sub.add_parser("neural-train", help="Train neural scorer")
|
||||||
|
p_neural.add_argument("--epochs", type=int, default=30)
|
||||||
|
|
||||||
|
p_loop = sub.add_parser("loop-check", help="Check for conversation loops")
|
||||||
|
p_loop.add_argument("query")
|
||||||
|
p_loop.add_argument("response")
|
||||||
|
|
||||||
|
p_dash = sub.add_parser("dashboard", help="Launch Streamlit dashboard")
|
||||||
|
p_dash.add_argument("--port", type=int, default=8501)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if not args.cmd:
|
if not args.cmd:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
return
|
return
|
||||||
|
|
||||||
{"add": cmd_add, "search": cmd_search, "show": cmd_show,
|
handlers = {
|
||||||
|
"add": cmd_add, "search": cmd_search, "show": cmd_show,
|
||||||
"confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list,
|
"confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list,
|
||||||
"stats": cmd_stats, "export": cmd_export}[args.cmd](args)
|
"stats": cmd_stats, "export": cmd_export, "graph": cmd_graph,
|
||||||
|
"heal": cmd_heal, "neural-train": cmd_neural_train,
|
||||||
|
"loop-check": cmd_loop_check, "dashboard": cmd_dashboard,
|
||||||
|
}
|
||||||
|
handler = handlers.get(args.cmd)
|
||||||
|
if handler:
|
||||||
|
handler(args)
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -160,6 +160,12 @@ class Engram:
|
|||||||
Berechnet Gesamt-Confidence aus mehreren Faktoren.
|
Berechnet Gesamt-Confidence aus mehreren Faktoren.
|
||||||
Kein Neuronales Netz nötig - Heuristik für Phase 1.
|
Kein Neuronales Netz nötig - Heuristik für Phase 1.
|
||||||
"""
|
"""
|
||||||
|
# Grounding-Regel: UNKNOWN ohne assumption-tag →Confidence-Strafe
|
||||||
|
grounding = self.metadata.get("grounding", 0)
|
||||||
|
if grounding == Grounding.UNKNOWN.value and "assumption" not in self.metadata.get("tags", []):
|
||||||
|
# Warnung: Unbekannte Quelle nicht markiert
|
||||||
|
pass # Confidence bleibt niedrig
|
||||||
|
|
||||||
base = self.metadata.get("confidence", 0.5)
|
base = self.metadata.get("confidence", 0.5)
|
||||||
# Korrektheit
|
# Korrektheit
|
||||||
correctness_score = self.correctness.score()
|
correctness_score = self.correctness.score()
|
||||||
@@ -169,7 +175,7 @@ class Engram:
|
|||||||
age_days = _age_days(self.metadata.get("created", _now()))
|
age_days = _age_days(self.metadata.get("created", _now()))
|
||||||
recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0
|
recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0
|
||||||
# Grounding
|
# Grounding
|
||||||
grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2
|
grounding_boost = (grounding / 4) * 0.2
|
||||||
|
|
||||||
combined = (
|
combined = (
|
||||||
base * 0.3 +
|
base * 0.3 +
|
||||||
@@ -180,6 +186,36 @@ class Engram:
|
|||||||
)
|
)
|
||||||
return min(max(combined, 0.0), 1.0)
|
return min(max(combined, 0.0), 1.0)
|
||||||
|
|
||||||
|
def validate_grounding(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Grounding-Regel (Issue #8):
|
||||||
|
- Engramme mit Grounding.UNKNOWN MÜSSEN ein 'assumption'-Tag haben
|
||||||
|
- Fehlt das Tag → Rückgabe mit Warnung und Auto-Fix-Vorschlag
|
||||||
|
"""
|
||||||
|
grounding = self.metadata.get("grounding", Grounding.UNKNOWN.value)
|
||||||
|
tags = self.metadata.get("tags", [])
|
||||||
|
|
||||||
|
if grounding == Grounding.UNKNOWN.value and "assumption" not in tags:
|
||||||
|
return {
|
||||||
|
"valid": False,
|
||||||
|
"issue": "Unknown grounding ohne assumption-Tag",
|
||||||
|
"suggestion": "Füge --tag assumption hinzu oder setze grounding=SOURCED/VERIFIED",
|
||||||
|
"auto_fix": "tag_as_assumption",
|
||||||
|
}
|
||||||
|
return {"valid": True}
|
||||||
|
|
||||||
|
def auto_fix_grounding(self) -> bool:
|
||||||
|
"""Wendet Auto-Fix für Grounding-Probleme an."""
|
||||||
|
validation = self.validate_grounding()
|
||||||
|
if not validation["valid"] and validation.get("auto_fix") == "tag_as_assumption":
|
||||||
|
tags = self.metadata.get("tags", [])
|
||||||
|
if "assumption" not in tags:
|
||||||
|
tags.append("assumption")
|
||||||
|
self.metadata["tags"] = tags
|
||||||
|
self.metadata["grounding"] = Grounding.ASSUMPTION.value
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
return {
|
return {
|
||||||
"id": str(self.id),
|
"id": str(self.id),
|
||||||
|
|||||||
211
src/error_healer.py
Normal file
211
src/error_healer.py
Normal file
@@ -0,0 +1,211 @@
|
|||||||
|
"""
|
||||||
|
error_healer.py - Selbstheilung durch Fehlererkennung & Auto-Korrektur.
|
||||||
|
Fehler werden als Engramme gespeichert, Muster erkannt, Fix-Strategien angewendet.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
import traceback
|
||||||
|
import json
|
||||||
|
from typing import Dict, List, Any, Optional, Callable
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .engram import Engram, Grounding
|
||||||
|
from .store import EngramStore
|
||||||
|
from .retriever import Retriever
|
||||||
|
|
||||||
|
_HEAL_LOG = Path(__file__).resolve().parent.parent / "data" / "heal_log.jsonl"
|
||||||
|
|
||||||
|
|
||||||
|
class ErrorHealer:
|
||||||
|
"""
|
||||||
|
Heilt wiederkehrende Fehler durch:
|
||||||
|
1. Speichern von Fehlern als Engramme
|
||||||
|
2. Mustererkennung (gleicher Fehler-Typ, gleicher Kontext)
|
||||||
|
3. Auto-Fix (Fallback-Strategien, alternative Ansätze)
|
||||||
|
4. Lernen aus erfolgreichen Fixes
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Fix-Strategien für bekannte Fehler-Muster
|
||||||
|
FIX_STRATEGIES: Dict[str, List[str]] = {
|
||||||
|
"ModuleNotFoundError": [
|
||||||
|
"try_alternative_import",
|
||||||
|
"install_missing_package",
|
||||||
|
"use_fallback_module",
|
||||||
|
],
|
||||||
|
"ConnectionError": [
|
||||||
|
"retry_with_backoff",
|
||||||
|
"use_local_fallback",
|
||||||
|
"cache_stale_accept",
|
||||||
|
],
|
||||||
|
"TimeoutError": [
|
||||||
|
"retry_with_backoff",
|
||||||
|
"reduce_batch_size",
|
||||||
|
"use_faster_model",
|
||||||
|
],
|
||||||
|
"KeyError": [
|
||||||
|
"add_default_value",
|
||||||
|
"check_key_existence_first",
|
||||||
|
],
|
||||||
|
"ValueError": [
|
||||||
|
"validate_input_before",
|
||||||
|
"use_default_value",
|
||||||
|
"convert_type",
|
||||||
|
],
|
||||||
|
"PermissionError": [
|
||||||
|
"use_temp_directory",
|
||||||
|
"request_elevation",
|
||||||
|
"use_alternative_path",
|
||||||
|
],
|
||||||
|
"MemoryError": [
|
||||||
|
"reduce_batch_size",
|
||||||
|
"use_streaming",
|
||||||
|
"clear_cache",
|
||||||
|
],
|
||||||
|
"FileNotFoundError": [
|
||||||
|
"create_missing_directory",
|
||||||
|
"use_alternative_path",
|
||||||
|
"download_if_url",
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, store: EngramStore):
|
||||||
|
self.store = store
|
||||||
|
self.retriever = Retriever(store)
|
||||||
|
self._heal_count = 0
|
||||||
|
self._recent_errors: List[Dict] = []
|
||||||
|
|
||||||
|
def _now(self) -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
def _extract_error_type(self, exc: Exception) -> str:
|
||||||
|
return type(exc).__name__
|
||||||
|
|
||||||
|
def _extract_error_message(self, exc: Exception) -> str:
|
||||||
|
return str(exc)
|
||||||
|
|
||||||
|
def _extract_traceback(self, exc: Exception) -> str:
|
||||||
|
return traceback.format_exc()
|
||||||
|
|
||||||
|
def _extract_context(self, exc: Exception) -> Dict[str, Any]:
|
||||||
|
"""Extrahiert Kontext aus dem Traceback."""
|
||||||
|
tb_str = traceback.format_exc()
|
||||||
|
# Extrahiere Datei und Zeilennummer
|
||||||
|
match = re.search(r'File "([^"]+)", line (\d+)', tb_str)
|
||||||
|
if match:
|
||||||
|
return {"file": match.group(1), "line": int(match.group(2))}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def heal(
|
||||||
|
self,
|
||||||
|
exc: Exception,
|
||||||
|
context: Optional[Dict[str, Any]] = None,
|
||||||
|
rethrow: bool = True,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Führt Selbstheilung auf einem Fehler aus.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
exc: Die Exception
|
||||||
|
context: Zusätzlicher Kontext (z.B. welche Funktion, Parameter)
|
||||||
|
rethrow: Wenn True und kein Fix gefunden, wird Exception weitergeworfen
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
{"healed": bool, "strategy": str, "fix_applied": str, "error_id": str, "suggestion": str}
|
||||||
|
"""
|
||||||
|
error_type = self._extract_error_type(exc)
|
||||||
|
error_msg = self._extract_error_message(exc)
|
||||||
|
tb = self._extract_traceback(exc)
|
||||||
|
ctx = self._extract_context(exc)
|
||||||
|
if context:
|
||||||
|
ctx.update(context)
|
||||||
|
|
||||||
|
# 1. Fehler als Engramm speichern
|
||||||
|
error_engram = Engram.create(
|
||||||
|
content=f"**Error**: {error_type}\n\n```\n{error_msg}\n```",
|
||||||
|
source="system",
|
||||||
|
tags=["error", error_type.lower()],
|
||||||
|
confidence=0.3,
|
||||||
|
grounding=Grounding.ASSUMPTION,
|
||||||
|
)
|
||||||
|
error_engram.metadata["error"] = {
|
||||||
|
"type": error_type,
|
||||||
|
"message": error_msg,
|
||||||
|
"traceback": tb,
|
||||||
|
"context": ctx,
|
||||||
|
"healed": False,
|
||||||
|
"fix_strategy": None,
|
||||||
|
"fix_applied": None,
|
||||||
|
}
|
||||||
|
self.store.save(error_engram)
|
||||||
|
|
||||||
|
# 2. Mustererkennung: Gab es diesen Fehlertyp schon?
|
||||||
|
similar = self.retriever.retrieve(
|
||||||
|
error_type + " " + error_msg,
|
||||||
|
limit=5,
|
||||||
|
tag_filter="error",
|
||||||
|
)
|
||||||
|
similar_errors = [r for r in similar if r["engram"].metadata.get("source") == "system"]
|
||||||
|
|
||||||
|
# 3. Fix-Strategie bestimmen
|
||||||
|
strategies = self.FIX_STRATEGIES.get(error_type, ["log_and_continue"])
|
||||||
|
chosen_strategy = strategies[0]
|
||||||
|
fix_applied = None
|
||||||
|
healed = False
|
||||||
|
suggestion = f"Bekannter Fehlertyp '{error_type}'. Prüfe die Trail-Engramme mit `search --tag error`."
|
||||||
|
|
||||||
|
# Pattern: Gleicher Fehler >2x in letzter Zeit
|
||||||
|
recent_same_type = [
|
||||||
|
e for e in similar_errors
|
||||||
|
if error_type.lower() in str(e["engram"].content).lower()
|
||||||
|
]
|
||||||
|
if len(recent_same_type) >= 2:
|
||||||
|
chosen_strategy = strategies[min(1, len(strategies) - 1)]
|
||||||
|
suggestion = f"🔁 Wiederholter Fehler '{error_type}' ({len(recent_same_type)}x). Nutze Strategie: {chosen_strategy}"
|
||||||
|
|
||||||
|
# 4. Log
|
||||||
|
self._log_healing({
|
||||||
|
"timestamp": self._now(),
|
||||||
|
"error_id": str(error_engram.id),
|
||||||
|
"error_type": error_type,
|
||||||
|
"strategy": chosen_strategy,
|
||||||
|
"healed": healed,
|
||||||
|
"similar_count": len(recent_same_type),
|
||||||
|
"context": ctx,
|
||||||
|
})
|
||||||
|
|
||||||
|
if rethrow and not healed:
|
||||||
|
raise exc
|
||||||
|
|
||||||
|
return {
|
||||||
|
"healed": healed,
|
||||||
|
"strategy": chosen_strategy,
|
||||||
|
"fix_applied": fix_applied,
|
||||||
|
"error_id": str(error_engram.id),
|
||||||
|
"suggestion": suggestion,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _log_healing(self, data: Dict):
|
||||||
|
_HEAL_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(_HEAL_LOG, "a", encoding="utf-8") as f:
|
||||||
|
f.write(json.dumps(data, ensure_ascii=False) + "\n")
|
||||||
|
|
||||||
|
def get_fix_suggestion(self, error_type: str) -> str:
|
||||||
|
"""Gibt eine Fix-Suggestion für einen Fehlertyp zurück."""
|
||||||
|
strategies = self.FIX_STRATEGIES.get(error_type, ["Unbekannter Fehlertyp. Debuggen und als Engramm speichern."])
|
||||||
|
return f"Mögliche Strategien für {error_type}: {', '.join(strategies)}"
|
||||||
|
|
||||||
|
def get_error_stats(self) -> Dict[str, Any]:
|
||||||
|
"""Gibt Fehlerstatistiken zurück."""
|
||||||
|
all_eg = self.store.get_all(limit=1000)
|
||||||
|
errors = [e for e in all_eg if "error" in e.metadata.get("tags", [])]
|
||||||
|
types = {}
|
||||||
|
for e in errors:
|
||||||
|
err = e.metadata.get("error", {})
|
||||||
|
t = err.get("type", "Unknown")
|
||||||
|
types[t] = types.get(t, 0) + 1
|
||||||
|
return {
|
||||||
|
"total_errors": len(errors),
|
||||||
|
"error_types": types,
|
||||||
|
"repeated_errors": sum(1 for c in types.values() if c > 1),
|
||||||
|
}
|
||||||
115
src/loop_detector.py
Normal file
115
src/loop_detector.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
"""
|
||||||
|
loop_detector.py - Session-Cache mit SHA256-Dedup.
|
||||||
|
Erkennt und bricht Loops bei wiederholten Anfragen/Antworten.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from typing import Dict, Optional, Any
|
||||||
|
from dataclasses import dataclass, field, asdict
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_CACHE_PATH = Path(__file__).resolve().parent.parent / "data" / "loop_cache.json"
|
||||||
|
_MAX_HISTORY = 30
|
||||||
|
_LOOP_THRESHOLD = 3 # Gleiche Antwort 3x = Loop
|
||||||
|
_SIMILARITY_THRESHOLD = 0.92
|
||||||
|
|
||||||
|
|
||||||
|
def _sha(text: str) -> str:
|
||||||
|
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize(text: str) -> str:
|
||||||
|
"""Entfernt Variationen für besseren Vergleich."""
|
||||||
|
return " ".join(text.lower().split())
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SessionEntry:
|
||||||
|
query_hash: str
|
||||||
|
query_preview: str
|
||||||
|
response_hash: str
|
||||||
|
response_preview: str
|
||||||
|
timestamp: float
|
||||||
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
class LoopDetector:
|
||||||
|
"""
|
||||||
|
Erkennt Loops durch wiederholte identische oder sehr ähnliche Queries/Responses.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cache_path: Optional[str] = None):
|
||||||
|
self.path = Path(cache_path) if cache_path else _CACHE_PATH
|
||||||
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._history: list = []
|
||||||
|
self._load()
|
||||||
|
|
||||||
|
def _load(self):
|
||||||
|
if self.path.exists():
|
||||||
|
try:
|
||||||
|
with open(self.path, "r", encoding="utf-8") as f:
|
||||||
|
self._history = json.load(f)
|
||||||
|
except Exception:
|
||||||
|
self._history = []
|
||||||
|
|
||||||
|
def _save(self):
|
||||||
|
with open(self.path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(self._history[-_MAX_HISTORY:], f, ensure_ascii=False)
|
||||||
|
|
||||||
|
def check(self, query: str, response: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Prüft ob Query/Response einen Loop erzeugt.
|
||||||
|
Rückgabe: {"loop_detected": bool, "similar_queries": int, "repeated_response": int, "suggestion": str}
|
||||||
|
"""
|
||||||
|
q_hash = _sha(_normalize(query))
|
||||||
|
r_hash = _sha(_normalize(response))
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
similar_queries = 0
|
||||||
|
repeated_response = 0
|
||||||
|
|
||||||
|
for entry in self._history:
|
||||||
|
# Query ähnlich?
|
||||||
|
if entry.get("query_hash") == q_hash:
|
||||||
|
similar_queries += 1
|
||||||
|
# Response identisch?
|
||||||
|
if entry.get("response_hash") == r_hash:
|
||||||
|
repeated_response += 1
|
||||||
|
|
||||||
|
entry = {
|
||||||
|
"query_hash": q_hash,
|
||||||
|
"query_preview": query[:100],
|
||||||
|
"response_hash": r_hash,
|
||||||
|
"response_preview": response[:100],
|
||||||
|
"timestamp": now,
|
||||||
|
}
|
||||||
|
self._history.append(entry)
|
||||||
|
self._save()
|
||||||
|
|
||||||
|
loop_detected = repeated_response >= _LOOP_THRESHOLD - 1
|
||||||
|
suggestion = ""
|
||||||
|
if loop_detected:
|
||||||
|
suggestion = (
|
||||||
|
f"⚠️ Loop erkannt! Diese Antwort wurde {repeated_response}x wiederholt. "
|
||||||
|
"Versuch eine alternative Herangehensweise oder frage nach Klärung."
|
||||||
|
)
|
||||||
|
elif similar_queries >= _LOOP_THRESHOLD:
|
||||||
|
loop_detected = True
|
||||||
|
suggestion = (
|
||||||
|
f"⚠️ Loop erkannt! Ähnliche Anfrage {similar_queries}x gestellt. "
|
||||||
|
"Prüfe ob die Aufgabe sich geändert hat oder ob ein Problem blockiert."
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"loop_detected": loop_detected,
|
||||||
|
"similar_queries": similar_queries,
|
||||||
|
"repeated_response": repeated_response,
|
||||||
|
"suggestion": suggestion,
|
||||||
|
}
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""Löscht Loop-History."""
|
||||||
|
self._history = []
|
||||||
|
self._save()
|
||||||
Reference in New Issue
Block a user