Files
second-brain/fastapi_app.py
Otto 0ff6db73ea feat(dashboard): integrate link suggestions and predictive links into UI
- FastAPI: parse_engram now includes link_suggestions and predictive_links from metadata
- FastAPI: add POST /api/links/accept to create links from suggestions
- Dashboard: new renderCardsWithSuggestions() displays suggestions in each card
- Dashboard: acceptLink() function handles click-to-link
- Dashboard: loadCards() calls renderCardsWithSuggestions()
- Systemd: remove DirectoryNotEmpty from ingest path unit (already present)

Refs: #25 #27
2026-05-31 15:42:46 +02:00

1057 lines
35 KiB
Python

#!/usr/bin/env python3
"""
Second Brain FastAPI Dashboard
Goals:
- "Release-ready" defaults (no hardcoded absolute paths)
- Minimal config via env vars
- Serves the existing static dashboard (templates/dashboard.html + static/)
"""
import json
import os
import sqlite3
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4
from urllib.parse import urlparse
from fastapi import FastAPI, Form, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
# ─── Config ──────────────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent
WORKSPACE = Path(os.environ.get("SECOND_BRAIN_WORKSPACE", str(REPO_ROOT))).resolve()
DB_PATH = Path(os.environ.get("SECOND_BRAIN_DB_PATH", str(WORKSPACE / "data" / "brain.sqlite"))).resolve()
PORT = int(os.environ.get("SECOND_BRAIN_PORT", os.environ.get("PORT", "8501")))
HOST = os.environ.get("SECOND_BRAIN_HOST", "0.0.0.0")
def create_app() -> FastAPI:
app = FastAPI(title="Second Brain Dashboard")
static_dir = WORKSPACE / "static"
if static_dir.is_dir():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
return app
app = create_app()
# ─── Helpers ─────────────────────────────────────────────────────────────────
def get_db():
if not DB_PATH.exists():
raise FileNotFoundError(f"DB not found: {DB_PATH}")
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def parse_engram(row: sqlite3.Row) -> dict:
meta = json.loads(row["metadata_json"] or "{}")
correctness = json.loads(row["correctness_json"] or "{}")
verdict = correctness.get("verdict")
if not isinstance(verdict, str) or not verdict:
# Back-compat inference for older rows
if correctness.get("confirmed", False):
verdict = "confirmed_true"
elif int(correctness.get("rejections", 0) or 0) > 0:
verdict = "confirmed_false"
else:
verdict = "unknown"
result = {
"id": row["id"],
"content": row["content"],
"confidence": meta.get("confidence", 0.0),
"confirmed": correctness.get("confirmed", False),
"verdict": verdict,
"evidence": correctness.get("evidence", []),
"confirmations": correctness.get("confirmations", 0),
"rejections": correctness.get("rejections", 0),
"tags": meta.get("tags", []),
"created": meta.get("created", row["created_at"]),
"modified": meta.get("modified", row["modified_at"]),
"last_reviewed": correctness.get("last_reviewed"),
"review_history": correctness.get("review_history", []),
"source": meta.get("source", "unknown"),
"access_count": meta.get("access_count", 0),
"grounding": meta.get("grounding", 0),
}
# Vorschläge aus metadata
if "link_suggestions" in meta:
result["link_suggestions"] = meta["link_suggestions"]
if "predictive_links" in meta:
result["predictive_links"] = meta["predictive_links"]
return result
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _update_correctness(engram_id: str, *, action: str, reason: str | None = None) -> dict:
"""
Update correctness_json for an engram. action: confirm|reject
"""
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
raise FileNotFoundError(f"Engram not found: {engram_id}")
corr = json.loads(row["correctness_json"] or "{}")
corr.setdefault("verdict", None)
corr.setdefault("evidence", [])
corr.setdefault("confirmed", False)
corr.setdefault("confirmations", 0)
corr.setdefault("rejections", 0)
corr.setdefault("review_history", [])
corr["last_reviewed"] = _now_iso()
entry = {
"by": "dashboard",
"action": action,
"at": corr["last_reviewed"],
"note": (reason or "").strip(),
}
try:
corr["review_history"].append(entry)
except Exception:
corr["review_history"] = [entry]
if action == "confirm":
corr["verdict"] = "confirmed_true"
corr["confirmed"] = True
corr["confirmations"] = int(corr.get("confirmations", 0) or 0) + 1
elif action == "reject":
corr["verdict"] = "confirmed_false"
corr["rejections"] = int(corr.get("rejections", 0) or 0) + 1
corr["confirmed"] = False
# Store minimal evidence for dashboard-driven actions.
try:
ev = corr.get("evidence")
if not isinstance(ev, list):
ev = []
ev.append(
{
"kind": "human",
"by": "dashboard",
"at": corr["last_reviewed"],
"action": action,
}
)
corr["evidence"] = ev[-50:] # cap growth
except Exception:
pass
c.execute(
"UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(corr, ensure_ascii=False), corr["last_reviewed"], engram_id),
)
conn.commit()
conn.close()
return {"ok": True}
def _bump_access(engram_id: str) -> dict:
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT metadata_json FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
raise FileNotFoundError(f"Engram not found: {engram_id}")
meta = json.loads(row["metadata_json"] or "{}")
meta["access_count"] = int(meta.get("access_count", 0) or 0) + 1
meta["last_accessed"] = _now_iso()
c.execute(
"UPDATE engrams SET metadata_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(meta, ensure_ascii=False), meta["last_accessed"], engram_id),
)
conn.commit()
conn.close()
return {"ok": True}
def _safe_json_extract_tags(meta_json: str) -> list[str]:
try:
d = json.loads(meta_json or "{}")
tags = d.get("tags") or []
return [t for t in tags if isinstance(t, str)]
except Exception:
return []
def _host_from_meta(meta_json: str) -> str | None:
try:
d = json.loads(meta_json or "{}")
grounding = d.get("grounding")
url = d.get("url")
if isinstance(grounding, dict) and isinstance(grounding.get("url"), str):
url = grounding.get("url")
if not isinstance(url, str):
return None
parsed = urlparse(url)
return parsed.hostname
except Exception:
return None
def _systemd_unit_state(unit: str) -> dict:
"""
Best-effort systemd status snapshot for a known unit.
Never raises; returns minimal fields.
"""
try:
out = subprocess.check_output(
["systemctl", "show", unit, "--no-page", "--property=ActiveState,SubState,Result,ExecMainStatus,ExecMainCode,ExecMainStartTimestamp,ExecMainExitTimestamp"],
text=True,
stderr=subprocess.STDOUT,
timeout=2,
)
kv = {}
for line in out.splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v
return {
"unit": unit,
"active": kv.get("ActiveState"),
"sub": kv.get("SubState"),
"result": kv.get("Result"),
"exit_status": kv.get("ExecMainStatus"),
"start_ts": kv.get("ExecMainStartTimestamp"),
"exit_ts": kv.get("ExecMainExitTimestamp"),
}
except Exception as e:
return {"unit": unit, "error": str(e)}
def _dir_size_bytes(path: Path) -> int:
total = 0
try:
for p in path.rglob("*"):
try:
if p.is_file():
total += p.stat().st_size
except Exception:
pass
except Exception:
pass
return total
# ─── API Endpoints ───────────────────────────────────────────────────────────
@app.get("/healthz", response_class=PlainTextResponse)
def healthz():
return "ok"
@app.get("/api/config")
def api_config():
return {
"workspace": str(WORKSPACE),
"db_path": str(DB_PATH),
}
@app.get("/api/db_info")
def api_db_info():
if not DB_PATH.exists():
raise FileNotFoundError(f"DB not found: {DB_PATH}")
st = DB_PATH.stat()
return {
"db_path": str(DB_PATH),
"size_bytes": st.st_size,
"mtime": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).isoformat(),
}
@app.get("/api/storage_stats")
def api_storage_stats():
conn = get_db()
c = conn.cursor()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed_true = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_true'
OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)
)
"""
).fetchone()[0]
confirmed_false = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_false'
OR (json_extract(correctness_json, '$.verdict') IS NULL
AND json_extract(correctness_json, '$.confirmed') = 0
AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) > 0)
)
"""
).fetchone()[0]
sources = {
r[0]: r[1]
for r in c.execute(
"SELECT json_extract(metadata_json, '$.source') AS src, COUNT(*) FROM engrams GROUP BY src ORDER BY COUNT(*) DESC"
).fetchall()
if r[0] is not None
}
conn.close()
chroma_dir = WORKSPACE / "data" / "chroma"
emb_cache_dir = WORKSPACE / "data" / "embedding_cache"
vec_state_path = WORKSPACE / "data" / "vector_index_state.json"
vec_state = {}
if vec_state_path.exists():
try:
vec_state = json.loads(vec_state_path.read_text())
except Exception:
vec_state = {}
obsidian_cfg_path = WORKSPACE / "data" / "obsidian_config.json"
obsidian_cfg = None
if obsidian_cfg_path.exists():
try:
obsidian_cfg = json.loads(obsidian_cfg_path.read_text())
except Exception:
obsidian_cfg = {"raw": obsidian_cfg_path.read_text()[:2000]}
backup_files = sorted((WORKSPACE / "data").glob("backup_*.jsonl"))
return {
"sql": {
"total_engrams": total,
"confirmed": confirmed_true,
"rejected": confirmed_false,
"pending": total - confirmed_true - confirmed_false,
"by_source": sources,
},
"vector": {
"chroma_dir": str(chroma_dir),
"chroma_size_bytes": _dir_size_bytes(chroma_dir) if chroma_dir.exists() else 0,
"embedding_cache_dir": str(emb_cache_dir),
"embedding_cache_files": len(list(emb_cache_dir.glob("*.json"))) if emb_cache_dir.exists() else 0,
"vector_state": vec_state,
},
"obsidian": {
"config_path": str(obsidian_cfg_path),
"configured": bool(obsidian_cfg),
"config": obsidian_cfg,
},
"backups": {
"count": len(backup_files),
"latest": str(backup_files[-1]) if backup_files else None,
},
}
@app.get("/api/jobs")
def api_jobs():
# Known units that influence "freshness" of the brain.
units = [
"openclaw-secondbrain-ingest-memory.service",
"openclaw-secondbrain-index-vectors.service",
"openclaw-secondbrain-review.service",
"openclaw-secondbrain-heartbeat.service",
"openclaw-secondbrain-verify-pending.service",
]
return {"items": [_systemd_unit_state(u) for u in units]}
@app.get("/api/insights")
def api_insights(limit: int = Query(8, ge=1, le=50)):
conn = get_db()
c = conn.cursor()
rows = c.execute(
"SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC LIMIT 2000"
).fetchall()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed_true = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_true'
OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)
)
"""
).fetchone()[0]
confirmed_false = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_false'
OR (json_extract(correctness_json, '$.verdict') IS NULL
AND json_extract(correctness_json, '$.confirmed') = 0
AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) > 0)
)
"""
).fetchone()[0]
pending = total - confirmed_true - confirmed_false
tag_counts: dict[str, int] = {}
source_counts: dict[str, int] = {}
host_counts: dict[str, int] = {}
active: list[dict] = []
forgotten: list[dict] = []
for r in rows:
try:
meta = json.loads(r["metadata_json"] or "{}")
except Exception:
meta = {}
src = meta.get("source", "unknown")
source_counts[src] = source_counts.get(src, 0) + 1
for t in (meta.get("tags") or []):
if isinstance(t, str):
tag_counts[t] = tag_counts.get(t, 0) + 1
try:
host = _host_from_meta(r["metadata_json"])
except Exception:
host = None
if host:
host_counts[host] = host_counts.get(host, 0) + 1
access_count = int(meta.get("access_count", 0) or 0)
created = meta.get("created", r["created_at"])
if access_count >= 5 and len(active) < limit:
active.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
if access_count == 0 and len(forgotten) < limit:
forgotten.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
def top_k(d: dict[str, int]) -> list[dict]:
return [
{"key": k, "count": v}
for k, v in sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:limit]
]
conn.close()
return {
"total": total,
"confirmed": confirmed_true,
"rejected": confirmed_false,
"pending": pending,
"top_tags": top_k(tag_counts),
"top_sources": top_k(source_counts),
"top_hosts": top_k(host_counts),
"active_engrams": active,
"forgotten_engrams": forgotten,
}
@app.get("/api/graph")
def api_graph(
limit_nodes: int = Query(0, ge=0, le=50000),
limit_edges: int = Query(0, ge=0, le=200000),
):
"""
Returns a lightweight graph view:
- Nodes: engrams + tag:<tag> + host:<hostname>
- Edges: engram->tag and engram->host plus explicit engrams_links edges.
"""
conn = get_db()
c = conn.cursor()
if limit_nodes > 0:
# Fetch a bigger window than the final node cap so trim can keep hubs + neighbors.
engram_fetch = min(50000, max(1000, int(limit_nodes * 3)))
else:
engram_fetch = None
if limit_edges > 0:
link_fetch = limit_edges
elif limit_nodes > 0:
link_fetch = min(200000, max(2000, int(limit_nodes * 10)))
else:
link_fetch = None
if engram_fetch is None:
rows = c.execute("SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC").fetchall()
else:
rows = c.execute(
"SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC LIMIT ?",
(engram_fetch,),
).fetchall()
if link_fetch is None:
link_rows = c.execute("SELECT from_id, to_id FROM engrams_links ORDER BY rowid DESC").fetchall()
else:
link_rows = c.execute(
"SELECT from_id, to_id FROM engrams_links ORDER BY rowid DESC LIMIT ?",
(link_fetch,),
).fetchall()
conn.close()
nodes: dict[str, dict] = {}
edges: list[dict] = []
def add_node(nid: str, kind: str, label: str | None = None, weight: float | None = None):
if nid not in nodes:
nodes[nid] = {"id": nid, "kind": kind}
if label is not None:
nodes[nid]["label"] = label
if weight is not None:
nodes[nid]["weight"] = weight
for r in rows:
eid = r["id"]
try:
meta = json.loads(r["metadata_json"] or "{}")
except Exception:
meta = {}
try:
corr = json.loads(r["correctness_json"] or "{}")
except Exception:
corr = {}
verdict = corr.get("verdict")
if not isinstance(verdict, str) or not verdict:
if corr.get("confirmed", False):
verdict = "confirmed_true"
elif int(corr.get("rejections", 0) or 0) > 0:
verdict = "confirmed_false"
else:
verdict = "unknown"
add_node(
eid,
"engram",
label=eid[:8],
weight=float(meta.get("access_count", 0) or 0),
)
nodes[eid].update(
{
"source": meta.get("source", "unknown"),
"confidence": float(meta.get("confidence", 0.0) or 0.0),
"created": meta.get("created", r["created_at"]),
"modified": meta.get("modified", r["modified_at"]),
"last_accessed": meta.get("last_accessed"),
"verdict": verdict,
"confirmed": bool(corr.get("confirmed", False)),
"rejections": int(corr.get("rejections", 0) or 0),
"grounding": meta.get("grounding", 0),
"predict_locked": bool(meta.get("predict_locked", False)),
}
)
for t in _safe_json_extract_tags(r["metadata_json"]):
tid = f"tag:{t}"
add_node(tid, "tag", label=t)
edges.append({"from": eid, "to": tid, "kind": "has_tag", "weight": 0.35})
host = _host_from_meta(r["metadata_json"])
if host:
hid = f"host:{host}"
add_node(hid, "host", label=host)
edges.append({"from": eid, "to": hid, "kind": "grounded_at", "weight": 0.25})
for fr, to in link_rows:
add_node(fr, "engram")
add_node(to, "engram")
edges.append({"from": fr, "to": to, "kind": "link", "weight": 1.0})
# Trim nodes to keep payload bounded (prefer engrams and connected tags/hosts)
if limit_nodes > 0 and len(nodes) > limit_nodes:
# Keep a balanced subset: many engrams plus the most-connected non-engrams.
kept: dict[str, dict] = {}
engram_budget = int(limit_nodes * 0.7)
# 1) Keep newest engrams first (they appear first in `rows` loop insertion order)
for r in rows:
eid = r["id"]
if eid in nodes:
kept[eid] = nodes[eid]
if len(kept) >= engram_budget:
break
# 2) Rank remaining nodes by degree within current edge set
degree: dict[str, int] = {}
for e in edges:
degree[e["from"]] = degree.get(e["from"], 0) + 1
degree[e["to"]] = degree.get(e["to"], 0) + 1
remaining = [nid for nid in nodes.keys() if nid not in kept]
remaining.sort(key=lambda nid: degree.get(nid, 0), reverse=True)
for nid in remaining:
kept[nid] = nodes[nid]
if len(kept) >= limit_nodes:
break
nodes = kept
edges = [e for e in edges if e["from"] in nodes and e["to"] in nodes]
return {"nodes": list(nodes.values()), "edges": edges}
@app.get("/api/events")
def api_events():
"""
Server-Sent Events stream for lightweight real-time UI refresh.
"""
import time
def gen():
while True:
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"stats": api_stats(),
"storage": api_storage_stats(),
"jobs": api_jobs(),
"insights": api_insights(limit=8),
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
time.sleep(5)
return StreamingResponse(gen(), media_type="text/event-stream")
@app.exception_handler(FileNotFoundError)
def handle_file_not_found(request: Request, exc: FileNotFoundError):
return JSONResponse(
status_code=503,
content={
"error": str(exc),
"hint": "Set SECOND_BRAIN_DB_PATH or SECOND_BRAIN_WORKSPACE to a valid location.",
},
)
@app.exception_handler(sqlite3.Error)
def handle_sqlite_error(request: Request, exc: sqlite3.Error):
return JSONResponse(status_code=500, content={"error": f"sqlite error: {exc}"})
@app.get("/api/stats")
def api_stats():
conn = get_db()
c = conn.cursor()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed_true = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_true'
OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)
)
"""
).fetchone()[0]
confirmed_false = c.execute(
"""
SELECT COUNT(*) FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') = 'confirmed_false'
OR (json_extract(correctness_json, '$.verdict') IS NULL
AND json_extract(correctness_json, '$.confirmed') = 0
AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) > 0)
)
"""
).fetchone()[0]
pending = total - confirmed_true - confirmed_false
errors = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(metadata_json, '$.tags') LIKE '%error%'"
).fetchone()[0]
avg_conf = c.execute(
"SELECT AVG(json_extract(metadata_json, '$.confidence')) FROM engrams"
).fetchone()[0] or 0.0
conn.close()
return {
"total": total,
"confirmed": confirmed_true,
"rejected": confirmed_false,
"pending": pending,
"errors": errors,
"avg_confidence": round(avg_conf, 2),
}
@app.get("/api/engrams")
def api_engrams(
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
tag: str = Query(None),
confirmed: bool = Query(None),
verdict: str = Query(None),
search: str = Query(None),
min_confidence: float = Query(0.0),
):
conn = get_db()
c = conn.cursor()
where_clauses = ["json_extract(metadata_json, '$.confidence') >= ?"]
params = [min_confidence]
if tag:
where_clauses.append("json_extract(metadata_json, '$.tags') LIKE ?")
params.append(f'%"{tag}"%')
if confirmed is not None:
if confirmed:
# confirmed == statement is true (verdict confirmed_true)
where_clauses.append(
"("
"json_extract(correctness_json, '$.verdict') = 'confirmed_true' "
"OR (json_extract(correctness_json, '$.verdict') IS NULL AND json_extract(correctness_json, '$.confirmed') = 1)"
")"
)
else:
# pending/unresolved (unknown/probable) but exclude confirmed_false.
where_clauses.append(
"("
"json_extract(correctness_json, '$.verdict') IN ('unknown','probable_true','probable_false') "
"OR (json_extract(correctness_json, '$.verdict') IS NULL "
" AND json_extract(correctness_json, '$.confirmed') = 0 "
" AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) = 0)"
")"
)
if verdict:
v = verdict.strip()
if v in ("unknown", "probable_true", "probable_false", "confirmed_true", "confirmed_false"):
where_clauses.append("json_extract(correctness_json, '$.verdict') = ?")
params.append(v)
if search:
# Use FTS
try:
ids = [
r[0] for r in c.execute(
"SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200",
(search,)
).fetchall()
]
if ids:
placeholders = ",".join("?" * len(ids))
where_clauses.append(f"id IN ({placeholders})")
params.extend(ids)
else:
# Full-text fallback on content
where_clauses.append("content LIKE ?")
params.append(f"%{search}%")
except Exception:
where_clauses.append("content LIKE ?")
params.append(f"%{search}%")
where_sql = " AND ".join(where_clauses)
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "limit": limit, "offset": offset}
@app.get("/api/engrams/{engram_id}")
def api_engram_detail(engram_id: str):
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT * FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
# Links
links = c.execute(
"SELECT to_id FROM engrams_links WHERE from_id = ?", (engram_id,)
).fetchall()
result = parse_engram(row)
result["links"] = [r[0] for r in links]
conn.close()
return result
@app.get("/api/pending")
def api_pending(
limit: int = Query(20, ge=1, le=200),
offset: int = Query(0, ge=0),
source: str | None = Query(None),
):
conn = get_db()
c = conn.cursor()
where = ["json_extract(correctness_json, '$.confirmed') = 0"]
params: list = []
if source:
where.append("json_extract(metadata_json, '$.source') = ?")
params.append(source)
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE {' AND '.join(where)}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
items = [parse_engram(r) for r in rows]
conn.close()
return {"items": items, "limit": limit, "offset": offset}
@app.post("/api/engrams")
def api_create_engram(content: str = Form(...), tags: str = Form("")):
content = (content or "").strip()
if not content:
return JSONResponse({"error": "content required"}, status_code=400)
tag_list = [t.strip() for t in (tags or "").split(",") if t.strip()]
now = _now_iso()
engram_id = str(uuid4())
meta = {
"source": "user",
"confidence": 0.7,
"created": now,
"modified": now,
"access_count": 0,
"last_accessed": now,
"tags": tag_list,
"session_id": None,
"agent_id": None,
"grounding": 0,
}
corr = {
"confirmed": False,
"confirmations": 0,
"rejections": 0,
"last_reviewed": None,
"review_history": [],
}
conn = get_db()
c = conn.cursor()
c.execute(
"""
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, embedding_json, created_at, modified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
engram_id,
content,
json.dumps(meta, ensure_ascii=False),
json.dumps(corr, ensure_ascii=False),
"[]",
"{}",
None,
now,
now,
),
)
conn.commit()
conn.close()
return {"id": engram_id}
@app.post("/api/engrams/{engram_id}/confirm")
def api_confirm_engram(engram_id: str, reason: str = Form("")):
try:
return _update_correctness(engram_id, action="confirm", reason=reason)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/reject")
def api_reject_engram(engram_id: str, reason: str = Form("")):
try:
return _update_correctness(engram_id, action="reject", reason=reason)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/refresh")
def api_refresh_engram(engram_id: str):
try:
return _bump_access(engram_id)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/refresh")
def api_refresh(engram_id: str):
conn = get_db()
c = conn.cursor()
row = c.execute(
"SELECT metadata_json, correctness_json FROM engrams WHERE id = ?", (engram_id,)
).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
meta = json.loads(row["metadata_json"] or "{}")
correctness = json.loads(row["correctness_json"] or "{}")
# Simple heuristic: confidence based on confirmations vs rejections
conf = 0.5
conf += 0.1 * correctness.get("confirmations", 0)
conf -= 0.15 * correctness.get("rejections", 0)
conf = max(0.1, min(1.0, conf))
meta["confidence"] = round(conf, 2)
meta["modified"] = datetime.now(timezone.utc).isoformat()
c.execute(
"UPDATE engrams SET metadata_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(meta), datetime.now(timezone.utc).isoformat(), engram_id),
)
conn.commit()
conn.close()
return {"success": True, "new_confidence": round(conf, 2)}
@app.post("/api/links/accept")
def api_accept_link(from_id: str = Form(...), to_id: str = Form(...)):
"""Erstelle einen Link zwischen zwei Engrammen (aus Vorschlag)."""
conn = get_db()
c = conn.cursor()
# Prüfe Existenz beider Engramme
for eid in (from_id, to_id):
if not c.execute("SELECT 1 FROM engrams WHERE id = ?", (eid,)).fetchone():
conn.close()
return JSONResponse({"error": f"Engram {eid} not found"}, status_code=404)
# Vermeide Duplikate
c.execute("SELECT 1 FROM engrams_links WHERE from_id = ? AND to_id = ?", (from_id, to_id))
if c.fetchone():
conn.close()
return {"ok": True, "message": "link already exists"}
# Link erstellen
c.execute(
"INSERT INTO engrams_links (from_id, to_id) VALUES (?, ?)",
(from_id, to_id)
)
conn.commit()
conn.close()
return {"ok": True}
@app.post("/api/engrams")
def api_create_engram(content: str = Form(...), tags: str = Form(""), source: str = Form("web")):
engram_id = f"web-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S-%f')[:20]}"
now = datetime.now(timezone.utc).isoformat()
meta = {
"source": source,
"confidence": 0.5,
"created": now,
"modified": now,
"access_count": 0,
"last_accessed": now,
"tags": [t.strip() for t in tags.split(",") if t.strip()] or ["web"],
"session_id": None,
"agent_id": None,
"grounding": 0,
"hash": "",
}
correctness = {
"verdict": "unknown",
"evidence": [],
"confirmed": False,
"confirmations": 0,
"rejections": 0,
"last_reviewed": None,
"review_history": [],
}
conn = get_db()
c = conn.cursor()
c.execute(
"""
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, created_at, modified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(engram_id, content, json.dumps(meta), json.dumps(correctness), "[]", '{"parent": null, "children": [], "depth": 0}', now, now),
)
conn.commit()
conn.close()
return {"success": True, "engram_id": engram_id}
@app.get("/api/pending")
def api_pending(limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0)):
conn = get_db()
c = conn.cursor()
rows = c.execute(
"""
SELECT * FROM engrams
WHERE (
json_extract(correctness_json, '$.verdict') IN ('unknown','probable_true','probable_false')
OR (json_extract(correctness_json, '$.verdict') IS NULL
AND json_extract(correctness_json, '$.confirmed') = 0
AND COALESCE(json_extract(correctness_json, '$.rejections'), 0) = 0)
)
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "limit": limit, "offset": offset}
@app.get("/api/search")
def api_search(
q: str = Query(..., min_length=1),
min_confidence: float = Query(0.0),
limit: int = Query(20, ge=1, le=100),
):
conn = get_db()
c = conn.cursor()
try:
ids = [
r[0] for r in c.execute(
"SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200",
(q,)
).fetchall()
]
if ids:
placeholders = ",".join("?" * len(ids))
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE id IN ({placeholders})
AND json_extract(metadata_json, '$.confidence') >= ?
ORDER BY created_at DESC
LIMIT ?
""",
ids + [min_confidence, limit],
).fetchall()
else:
rows = []
except Exception:
rows = c.execute(
"""
SELECT * FROM engrams
WHERE content LIKE ? AND json_extract(metadata_json, '$.confidence') >= ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{q}%", min_confidence, limit),
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "query": q}
# ─── Frontend ────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
def dashboard(request: Request):
with open(WORKSPACE / "templates" / "dashboard.html", "r", encoding="utf-8") as f:
html = f.read()
return HTMLResponse(content=html)
# ─── Main ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("fastapi_app:app", host=HOST, port=PORT)