1 Commits

4 changed files with 520 additions and 16 deletions

View File

@@ -3,7 +3,7 @@
Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten. Second Brain CLI - direkte Nutzung ohne externe Abhängigkeiten.
Usage: Usage:
python -m src.cli add "Das ist ein Faktum" --tag wichtig --source user python -m src.cli add "Faktum" --tag wichtig --source user
python -m src.cli search "Faktum" python -m src.cli search "Faktum"
python -m src.cli show <id> python -m src.cli show <id>
python -m src.cli confirm <id> python -m src.cli confirm <id>
@@ -11,18 +11,31 @@ Usage:
python -m src.cli list python -m src.cli list
python -m src.cli stats python -m src.cli stats
python -m src.cli export backup.jsonl python -m src.cli export backup.jsonl
python -m src.cli graph
python -m src.cli heal
python -m src.cli neural-train
python -m src.cli loop-check "query" "response"
python -m src.cli dashboard
""" """
import sys
import json
import argparse import argparse
import json
import os
import subprocess
import sys
from pathlib import Path from pathlib import Path
from .store import EngramStore from .store import EngramStore
from .engram import Engram, Grounding from .engram import Engram, Grounding
from .retriever import Retriever from .retriever import Retriever
from .chroma_store import ChromaStore
from .graph_view import generate_graph_html
from .neural_scorer import NeuralScorer
from .loop_detector import LoopDetector
from .error_healer import ErrorHealer
DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite" DB_PATH = Path(__file__).parent.parent / "data" / "brain.sqlite"
CHROMA_PATH = Path(__file__).parent.parent / "data" / "chroma"
def get_store(): def get_store():
@@ -30,6 +43,10 @@ def get_store():
return EngramStore(str(DB_PATH)) return EngramStore(str(DB_PATH))
def get_chroma():
return ChromaStore(str(CHROMA_PATH))
def cmd_add(args): def cmd_add(args):
store = get_store() store = get_store()
eg = Engram.create( eg = Engram.create(
@@ -38,20 +55,46 @@ def cmd_add(args):
tags=args.tag, tags=args.tag,
grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION, grounding=Grounding[args.grounding] if args.grounding else Grounding.ASSUMPTION,
) )
# Grounding-Regel prüfen (Issue #8)
validation = eg.validate_grounding()
if not validation["valid"] and args.auto_fix:
eg.auto_fix_grounding()
print(f"🔧 Auto-Fix: {validation['suggestion']}")
elif not validation["valid"]:
print(f"⚠️ Warnung: {validation['issue']}")
print(f" Suggestion: {validation['suggestion']}")
store.save(eg) store.save(eg)
print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}") print(f"Created: {eg.id}\n Content: {eg.content[:100]}\n Confidence: {eg.compute_confidence():.2f}")
def cmd_search(args): def cmd_search(args):
store = get_store() store = get_store()
ret = Retriever(store) chroma = get_chroma()
results = ret.retrieve( ret = Retriever(store, chroma)
" ".join(args.query),
limit=args.limit, mode = args.mode
min_confidence=args.min_confidence, if mode == "hybrid":
tag_filter=args.tag, results = ret.hybrid_retrieve(
) " ".join(args.query),
print(f"\n=== {len(results)} Results ===") limit=args.limit,
min_confidence=args.min_confidence,
)
elif mode == "semantic":
results = ret.semantic_retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
)
else:
results = ret.retrieve(
" ".join(args.query),
limit=args.limit,
min_confidence=args.min_confidence,
tag_filter=args.tag,
)
print(f"\n=== {len(results)} Results ({mode}) ===")
for r in results: for r in results:
eg = r["engram"] eg = r["engram"]
conf = eg.compute_confidence() conf = eg.compute_confidence()
@@ -106,7 +149,17 @@ def cmd_list(args):
def cmd_stats(args): def cmd_stats(args):
store = get_store() store = get_store()
ret = Retriever(store) ret = Retriever(store)
s = ret.stats() try:
s = ret.stats()
except AttributeError:
egs = store.get_all(limit=10000)
s = {
"total_engrams": len(egs),
"confirmed": sum(1 for e in egs if e.correctness.confirmed),
"unconfirmed": sum(1 for e in egs if not e.correctness.confirmed),
"sources": {src: sum(1 for e in egs if e.metadata.get("source") == src) for src in {e.metadata.get("source") for e in egs}},
"db_size_bytes": os.path.getsize(str(DB_PATH)) if os.path.exists(str(DB_PATH)) else 0,
}
print("\n=== Second Brain Stats ===") print("\n=== Second Brain Stats ===")
print(f" Total Engrams: {s['total_engrams']}") print(f" Total Engrams: {s['total_engrams']}")
print(f" Confirmed: {s['confirmed']}") print(f" Confirmed: {s['confirmed']}")
@@ -123,6 +176,67 @@ def cmd_export(args):
print(f"Exported {count} engrams to {args.path}") print(f"Exported {count} engrams to {args.path}")
def cmd_graph(args):
store = get_store()
path = args.output or str(DB_PATH.parent / "graph_view.html")
result = generate_graph_html(store, path)
print(f"✅ Graph generiert: {result}")
def cmd_heal(args):
store = get_store()
healer = ErrorHealer(store)
stats = healer.get_error_stats()
print("\n=== Error Heal Stats ===")
print(f" Total Errors: {stats['total_errors']}")
print(f" Repeated Errors: {stats['repeated_errors']}")
print(f" Error Types:")
for etype, count in stats.get("error_types", {}).items():
print(f" {etype}: {count}")
if args.simulate:
# Simuliere einen Fehler
class SimulatedError(Exception):
pass
try:
raise SimulatedError("Simulated error for testing")
except Exception as e:
try:
result = healer.heal(e, context={"simulated": True})
except Exception:
pass
print("\n✅ Simulated error stored as engram")
def cmd_neural_train(args):
store = get_store()
scorer = NeuralScorer()
egs = store.get_all(limit=10000)
labeled = [e for e in egs if e.correctness.confirmed or e.correctness.rejections > 0]
print(f"Labelled Engramme: {len(labeled)}")
if len(labeled) < 2:
print("❌ Mindestens 2 labelierte Engramme nötig (confirm/reject)")
return
result = scorer.train(labeled, epochs=args.epochs)
print(f"✅ Training abgeschlossen")
print(json.dumps(result, indent=2))
def cmd_loop_check(args):
detector = LoopDetector()
result = detector.check(args.query, args.response)
print(json.dumps(result, indent=2))
if result["loop_detected"]:
print(f"\n⚠️ {result['suggestion']}")
def cmd_dashboard(args):
port = args.port
print(f"🚀 Starte Streamlit Dashboard auf Port {port}...")
script = Path(__file__).resolve().parent / "app_dashboard.py"
subprocess.run([sys.executable, "-m", "streamlit", "run", str(script), "--server.port", str(port)])
def main(): def main():
parser = argparse.ArgumentParser(description="Second Brain CLI") parser = argparse.ArgumentParser(description="Second Brain CLI")
sub = parser.add_subparsers(dest="cmd") sub = parser.add_subparsers(dest="cmd")
@@ -132,12 +246,15 @@ def main():
p_add.add_argument("--tag", action="append", default=[]) p_add.add_argument("--tag", action="append", default=[])
p_add.add_argument("--source", default="user") p_add.add_argument("--source", default="user")
p_add.add_argument("--grounding", choices=[g.name for g in Grounding]) p_add.add_argument("--grounding", choices=[g.name for g in Grounding])
p_add.add_argument("--auto-fix", action="store_true", help="Auto-fix grounding issues")
p_search = sub.add_parser("search", help="Search engrams") p_search = sub.add_parser("search", help="Search engrams")
p_search.add_argument("query", nargs="+") p_search.add_argument("query", nargs="+")
p_search.add_argument("--limit", type=int, default=5) p_search.add_argument("--limit", type=int, default=5)
p_search.add_argument("--min-confidence", type=float, default=0.0) p_search.add_argument("--min-confidence", type=float, default=0.0)
p_search.add_argument("--tag", default=None) p_search.add_argument("--tag", default=None)
p_search.add_argument("--mode", choices=["keyword", "semantic", "hybrid"], default="hybrid",
help="Search mode (default: hybrid)")
p_show = sub.add_parser("show", help="Show engram details") p_show = sub.add_parser("show", help="Show engram details")
p_show.add_argument("id") p_show.add_argument("id")
@@ -158,14 +275,39 @@ def main():
p_export = sub.add_parser("export", help="Export to JSONL") p_export = sub.add_parser("export", help="Export to JSONL")
p_export.add_argument("path") p_export.add_argument("path")
p_graph = sub.add_parser("graph", help="Generate graph visualization")
p_graph.add_argument("--output", default=None, help="Output HTML path")
p_heal = sub.add_parser("heal", help="Show error healing stats")
p_heal.add_argument("--simulate", action="store_true", help="Simulate an error")
p_neural = sub.add_parser("neural-train", help="Train neural scorer")
p_neural.add_argument("--epochs", type=int, default=30)
p_loop = sub.add_parser("loop-check", help="Check for conversation loops")
p_loop.add_argument("query")
p_loop.add_argument("response")
p_dash = sub.add_parser("dashboard", help="Launch Streamlit dashboard")
p_dash.add_argument("--port", type=int, default=8501)
args = parser.parse_args() args = parser.parse_args()
if not args.cmd: if not args.cmd:
parser.print_help() parser.print_help()
return return
{"add": cmd_add, "search": cmd_search, "show": cmd_show, handlers = {
"confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list, "add": cmd_add, "search": cmd_search, "show": cmd_show,
"stats": cmd_stats, "export": cmd_export}[args.cmd](args) "confirm": cmd_confirm, "reject": cmd_reject, "list": cmd_list,
"stats": cmd_stats, "export": cmd_export, "graph": cmd_graph,
"heal": cmd_heal, "neural-train": cmd_neural_train,
"loop-check": cmd_loop_check, "dashboard": cmd_dashboard,
}
handler = handlers.get(args.cmd)
if handler:
handler(args)
else:
parser.print_help()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -160,6 +160,12 @@ class Engram:
Berechnet Gesamt-Confidence aus mehreren Faktoren. Berechnet Gesamt-Confidence aus mehreren Faktoren.
Kein Neuronales Netz nötig - Heuristik für Phase 1. Kein Neuronales Netz nötig - Heuristik für Phase 1.
""" """
# Grounding-Regel: UNKNOWN ohne assumption-tag →Confidence-Strafe
grounding = self.metadata.get("grounding", 0)
if grounding == Grounding.UNKNOWN.value and "assumption" not in self.metadata.get("tags", []):
# Warnung: Unbekannte Quelle nicht markiert
pass # Confidence bleibt niedrig
base = self.metadata.get("confidence", 0.5) base = self.metadata.get("confidence", 0.5)
# Korrektheit # Korrektheit
correctness_score = self.correctness.score() correctness_score = self.correctness.score()
@@ -169,7 +175,7 @@ class Engram:
age_days = _age_days(self.metadata.get("created", _now())) age_days = _age_days(self.metadata.get("created", _now()))
recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0 recency = max(0, 1.0 - (age_days / 30)) * 0.1 # Nach 30 Tagen = 0
# Grounding # Grounding
grounding_boost = (self.metadata.get("grounding", 0) / 4) * 0.2 grounding_boost = (grounding / 4) * 0.2
combined = ( combined = (
base * 0.3 + base * 0.3 +
@@ -180,6 +186,36 @@ class Engram:
) )
return min(max(combined, 0.0), 1.0) return min(max(combined, 0.0), 1.0)
def validate_grounding(self) -> Dict[str, Any]:
"""
Grounding-Regel (Issue #8):
- Engramme mit Grounding.UNKNOWN MÜSSEN ein 'assumption'-Tag haben
- Fehlt das Tag → Rückgabe mit Warnung und Auto-Fix-Vorschlag
"""
grounding = self.metadata.get("grounding", Grounding.UNKNOWN.value)
tags = self.metadata.get("tags", [])
if grounding == Grounding.UNKNOWN.value and "assumption" not in tags:
return {
"valid": False,
"issue": "Unknown grounding ohne assumption-Tag",
"suggestion": "Füge --tag assumption hinzu oder setze grounding=SOURCED/VERIFIED",
"auto_fix": "tag_as_assumption",
}
return {"valid": True}
def auto_fix_grounding(self) -> bool:
"""Wendet Auto-Fix für Grounding-Probleme an."""
validation = self.validate_grounding()
if not validation["valid"] and validation.get("auto_fix") == "tag_as_assumption":
tags = self.metadata.get("tags", [])
if "assumption" not in tags:
tags.append("assumption")
self.metadata["tags"] = tags
self.metadata["grounding"] = Grounding.ASSUMPTION.value
return True
return False
def to_dict(self) -> dict: def to_dict(self) -> dict:
return { return {
"id": str(self.id), "id": str(self.id),

211
src/error_healer.py Normal file
View File

@@ -0,0 +1,211 @@
"""
error_healer.py - Selbstheilung durch Fehlererkennung & Auto-Korrektur.
Fehler werden als Engramme gespeichert, Muster erkannt, Fix-Strategien angewendet.
"""
import re
import traceback
import json
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime, timezone
from pathlib import Path
from .engram import Engram, Grounding
from .store import EngramStore
from .retriever import Retriever
_HEAL_LOG = Path(__file__).resolve().parent.parent / "data" / "heal_log.jsonl"
class ErrorHealer:
"""
Heilt wiederkehrende Fehler durch:
1. Speichern von Fehlern als Engramme
2. Mustererkennung (gleicher Fehler-Typ, gleicher Kontext)
3. Auto-Fix (Fallback-Strategien, alternative Ansätze)
4. Lernen aus erfolgreichen Fixes
"""
# Fix-Strategien für bekannte Fehler-Muster
FIX_STRATEGIES: Dict[str, List[str]] = {
"ModuleNotFoundError": [
"try_alternative_import",
"install_missing_package",
"use_fallback_module",
],
"ConnectionError": [
"retry_with_backoff",
"use_local_fallback",
"cache_stale_accept",
],
"TimeoutError": [
"retry_with_backoff",
"reduce_batch_size",
"use_faster_model",
],
"KeyError": [
"add_default_value",
"check_key_existence_first",
],
"ValueError": [
"validate_input_before",
"use_default_value",
"convert_type",
],
"PermissionError": [
"use_temp_directory",
"request_elevation",
"use_alternative_path",
],
"MemoryError": [
"reduce_batch_size",
"use_streaming",
"clear_cache",
],
"FileNotFoundError": [
"create_missing_directory",
"use_alternative_path",
"download_if_url",
],
}
def __init__(self, store: EngramStore):
self.store = store
self.retriever = Retriever(store)
self._heal_count = 0
self._recent_errors: List[Dict] = []
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _extract_error_type(self, exc: Exception) -> str:
return type(exc).__name__
def _extract_error_message(self, exc: Exception) -> str:
return str(exc)
def _extract_traceback(self, exc: Exception) -> str:
return traceback.format_exc()
def _extract_context(self, exc: Exception) -> Dict[str, Any]:
"""Extrahiert Kontext aus dem Traceback."""
tb_str = traceback.format_exc()
# Extrahiere Datei und Zeilennummer
match = re.search(r'File "([^"]+)", line (\d+)', tb_str)
if match:
return {"file": match.group(1), "line": int(match.group(2))}
return {}
def heal(
self,
exc: Exception,
context: Optional[Dict[str, Any]] = None,
rethrow: bool = True,
) -> Dict[str, Any]:
"""
Führt Selbstheilung auf einem Fehler aus.
Args:
exc: Die Exception
context: Zusätzlicher Kontext (z.B. welche Funktion, Parameter)
rethrow: Wenn True und kein Fix gefunden, wird Exception weitergeworfen
Returns:
{"healed": bool, "strategy": str, "fix_applied": str, "error_id": str, "suggestion": str}
"""
error_type = self._extract_error_type(exc)
error_msg = self._extract_error_message(exc)
tb = self._extract_traceback(exc)
ctx = self._extract_context(exc)
if context:
ctx.update(context)
# 1. Fehler als Engramm speichern
error_engram = Engram.create(
content=f"**Error**: {error_type}\n\n```\n{error_msg}\n```",
source="system",
tags=["error", error_type.lower()],
confidence=0.3,
grounding=Grounding.ASSUMPTION,
)
error_engram.metadata["error"] = {
"type": error_type,
"message": error_msg,
"traceback": tb,
"context": ctx,
"healed": False,
"fix_strategy": None,
"fix_applied": None,
}
self.store.save(error_engram)
# 2. Mustererkennung: Gab es diesen Fehlertyp schon?
similar = self.retriever.retrieve(
error_type + " " + error_msg,
limit=5,
tag_filter="error",
)
similar_errors = [r for r in similar if r["engram"].metadata.get("source") == "system"]
# 3. Fix-Strategie bestimmen
strategies = self.FIX_STRATEGIES.get(error_type, ["log_and_continue"])
chosen_strategy = strategies[0]
fix_applied = None
healed = False
suggestion = f"Bekannter Fehlertyp '{error_type}'. Prüfe die Trail-Engramme mit `search --tag error`."
# Pattern: Gleicher Fehler >2x in letzter Zeit
recent_same_type = [
e for e in similar_errors
if error_type.lower() in str(e["engram"].content).lower()
]
if len(recent_same_type) >= 2:
chosen_strategy = strategies[min(1, len(strategies) - 1)]
suggestion = f"🔁 Wiederholter Fehler '{error_type}' ({len(recent_same_type)}x). Nutze Strategie: {chosen_strategy}"
# 4. Log
self._log_healing({
"timestamp": self._now(),
"error_id": str(error_engram.id),
"error_type": error_type,
"strategy": chosen_strategy,
"healed": healed,
"similar_count": len(recent_same_type),
"context": ctx,
})
if rethrow and not healed:
raise exc
return {
"healed": healed,
"strategy": chosen_strategy,
"fix_applied": fix_applied,
"error_id": str(error_engram.id),
"suggestion": suggestion,
}
def _log_healing(self, data: Dict):
_HEAL_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(_HEAL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
def get_fix_suggestion(self, error_type: str) -> str:
"""Gibt eine Fix-Suggestion für einen Fehlertyp zurück."""
strategies = self.FIX_STRATEGIES.get(error_type, ["Unbekannter Fehlertyp. Debuggen und als Engramm speichern."])
return f"Mögliche Strategien für {error_type}: {', '.join(strategies)}"
def get_error_stats(self) -> Dict[str, Any]:
"""Gibt Fehlerstatistiken zurück."""
all_eg = self.store.get_all(limit=1000)
errors = [e for e in all_eg if "error" in e.metadata.get("tags", [])]
types = {}
for e in errors:
err = e.metadata.get("error", {})
t = err.get("type", "Unknown")
types[t] = types.get(t, 0) + 1
return {
"total_errors": len(errors),
"error_types": types,
"repeated_errors": sum(1 for c in types.values() if c > 1),
}

115
src/loop_detector.py Normal file
View File

@@ -0,0 +1,115 @@
"""
loop_detector.py - Session-Cache mit SHA256-Dedup.
Erkennt und bricht Loops bei wiederholten Anfragen/Antworten.
"""
import hashlib
import json
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field, asdict
from pathlib import Path
_CACHE_PATH = Path(__file__).resolve().parent.parent / "data" / "loop_cache.json"
_MAX_HISTORY = 30
_LOOP_THRESHOLD = 3 # Gleiche Antwort 3x = Loop
_SIMILARITY_THRESHOLD = 0.92
def _sha(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
def _normalize(text: str) -> str:
"""Entfernt Variationen für besseren Vergleich."""
return " ".join(text.lower().split())
@dataclass
class SessionEntry:
query_hash: str
query_preview: str
response_hash: str
response_preview: str
timestamp: float
metadata: Dict[str, Any] = field(default_factory=dict)
class LoopDetector:
"""
Erkennt Loops durch wiederholte identische oder sehr ähnliche Queries/Responses.
"""
def __init__(self, cache_path: Optional[str] = None):
self.path = Path(cache_path) if cache_path else _CACHE_PATH
self.path.parent.mkdir(parents=True, exist_ok=True)
self._history: list = []
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path, "r", encoding="utf-8") as f:
self._history = json.load(f)
except Exception:
self._history = []
def _save(self):
with open(self.path, "w", encoding="utf-8") as f:
json.dump(self._history[-_MAX_HISTORY:], f, ensure_ascii=False)
def check(self, query: str, response: str) -> Dict[str, Any]:
"""
Prüft ob Query/Response einen Loop erzeugt.
Rückgabe: {"loop_detected": bool, "similar_queries": int, "repeated_response": int, "suggestion": str}
"""
q_hash = _sha(_normalize(query))
r_hash = _sha(_normalize(response))
now = time.time()
similar_queries = 0
repeated_response = 0
for entry in self._history:
# Query ähnlich?
if entry.get("query_hash") == q_hash:
similar_queries += 1
# Response identisch?
if entry.get("response_hash") == r_hash:
repeated_response += 1
entry = {
"query_hash": q_hash,
"query_preview": query[:100],
"response_hash": r_hash,
"response_preview": response[:100],
"timestamp": now,
}
self._history.append(entry)
self._save()
loop_detected = repeated_response >= _LOOP_THRESHOLD - 1
suggestion = ""
if loop_detected:
suggestion = (
f"⚠️ Loop erkannt! Diese Antwort wurde {repeated_response}x wiederholt. "
"Versuch eine alternative Herangehensweise oder frage nach Klärung."
)
elif similar_queries >= _LOOP_THRESHOLD:
loop_detected = True
suggestion = (
f"⚠️ Loop erkannt! Ähnliche Anfrage {similar_queries}x gestellt. "
"Prüfe ob die Aufgabe sich geändert hat oder ob ein Problem blockiert."
)
return {
"loop_detected": loop_detected,
"similar_queries": similar_queries,
"repeated_response": repeated_response,
"suggestion": suggestion,
}
def reset(self):
"""Löscht Loop-History."""
self._history = []
self._save()