Files
second-brain/fastapi_app.py

905 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Second Brain FastAPI Dashboard
Goals:
- "Release-ready" defaults (no hardcoded absolute paths)
- Minimal config via env vars
- Serves the existing static dashboard (templates/dashboard.html + static/)
"""
import json
import os
import sqlite3
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4
from urllib.parse import urlparse
from fastapi import FastAPI, Form, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
# ─── Config ──────────────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent
WORKSPACE = Path(os.environ.get("SECOND_BRAIN_WORKSPACE", str(REPO_ROOT))).resolve()
DB_PATH = Path(os.environ.get("SECOND_BRAIN_DB_PATH", str(WORKSPACE / "data" / "brain.sqlite"))).resolve()
PORT = int(os.environ.get("SECOND_BRAIN_PORT", os.environ.get("PORT", "8501")))
HOST = os.environ.get("SECOND_BRAIN_HOST", "0.0.0.0")
def create_app() -> FastAPI:
app = FastAPI(title="Second Brain Dashboard")
static_dir = WORKSPACE / "static"
if static_dir.is_dir():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
return app
app = create_app()
# ─── Helpers ─────────────────────────────────────────────────────────────────
def get_db():
if not DB_PATH.exists():
raise FileNotFoundError(f"DB not found: {DB_PATH}")
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def parse_engram(row: sqlite3.Row) -> dict:
meta = json.loads(row["metadata_json"] or "{}")
correctness = json.loads(row["correctness_json"] or "{}")
return {
"id": row["id"],
"content": row["content"],
"confidence": meta.get("confidence", 0.0),
"confirmed": correctness.get("confirmed", False),
"confirmations": correctness.get("confirmations", 0),
"rejections": correctness.get("rejections", 0),
"tags": meta.get("tags", []),
"created": meta.get("created", row["created_at"]),
"modified": meta.get("modified", row["modified_at"]),
"last_reviewed": correctness.get("last_reviewed"),
"review_history": correctness.get("review_history", []),
"source": meta.get("source", "unknown"),
"access_count": meta.get("access_count", 0),
"grounding": meta.get("grounding", 0),
}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _update_correctness(engram_id: str, *, action: str, reason: str | None = None) -> dict:
"""
Update correctness_json for an engram. action: confirm|reject
"""
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
raise FileNotFoundError(f"Engram not found: {engram_id}")
corr = json.loads(row["correctness_json"] or "{}")
corr.setdefault("confirmed", False)
corr.setdefault("confirmations", 0)
corr.setdefault("rejections", 0)
corr.setdefault("review_history", [])
corr["last_reviewed"] = _now_iso()
entry = {
"by": "dashboard",
"action": action,
"at": corr["last_reviewed"],
"note": (reason or "").strip(),
}
try:
corr["review_history"].append(entry)
except Exception:
corr["review_history"] = [entry]
if action == "confirm":
corr["confirmed"] = True
corr["confirmations"] = int(corr.get("confirmations", 0) or 0) + 1
elif action == "reject":
corr["rejections"] = int(corr.get("rejections", 0) or 0) + 1
c.execute(
"UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(corr, ensure_ascii=False), corr["last_reviewed"], engram_id),
)
conn.commit()
conn.close()
return {"ok": True}
def _bump_access(engram_id: str) -> dict:
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT metadata_json FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
raise FileNotFoundError(f"Engram not found: {engram_id}")
meta = json.loads(row["metadata_json"] or "{}")
meta["access_count"] = int(meta.get("access_count", 0) or 0) + 1
meta["last_accessed"] = _now_iso()
c.execute(
"UPDATE engrams SET metadata_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(meta, ensure_ascii=False), meta["last_accessed"], engram_id),
)
conn.commit()
conn.close()
return {"ok": True}
def _safe_json_extract_tags(meta_json: str) -> list[str]:
try:
d = json.loads(meta_json or "{}")
tags = d.get("tags") or []
return [t for t in tags if isinstance(t, str)]
except Exception:
return []
def _host_from_meta(meta_json: str) -> str | None:
try:
d = json.loads(meta_json or "{}")
grounding = d.get("grounding")
url = d.get("url")
if isinstance(grounding, dict) and isinstance(grounding.get("url"), str):
url = grounding.get("url")
if not isinstance(url, str):
return None
parsed = urlparse(url)
return parsed.hostname
except Exception:
return None
def _systemd_unit_state(unit: str) -> dict:
"""
Best-effort systemd status snapshot for a known unit.
Never raises; returns minimal fields.
"""
try:
out = subprocess.check_output(
["systemctl", "show", unit, "--no-page", "--property=ActiveState,SubState,Result,ExecMainStatus,ExecMainCode,ExecMainStartTimestamp,ExecMainExitTimestamp"],
text=True,
stderr=subprocess.STDOUT,
timeout=2,
)
kv = {}
for line in out.splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v
return {
"unit": unit,
"active": kv.get("ActiveState"),
"sub": kv.get("SubState"),
"result": kv.get("Result"),
"exit_status": kv.get("ExecMainStatus"),
"start_ts": kv.get("ExecMainStartTimestamp"),
"exit_ts": kv.get("ExecMainExitTimestamp"),
}
except Exception as e:
return {"unit": unit, "error": str(e)}
def _dir_size_bytes(path: Path) -> int:
total = 0
try:
for p in path.rglob("*"):
try:
if p.is_file():
total += p.stat().st_size
except Exception:
pass
except Exception:
pass
return total
# ─── API Endpoints ───────────────────────────────────────────────────────────
@app.get("/healthz", response_class=PlainTextResponse)
def healthz():
return "ok"
@app.get("/api/config")
def api_config():
return {
"workspace": str(WORKSPACE),
"db_path": str(DB_PATH),
}
@app.get("/api/db_info")
def api_db_info():
if not DB_PATH.exists():
raise FileNotFoundError(f"DB not found: {DB_PATH}")
st = DB_PATH.stat()
return {
"db_path": str(DB_PATH),
"size_bytes": st.st_size,
"mtime": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).isoformat(),
}
@app.get("/api/storage_stats")
def api_storage_stats():
conn = get_db()
c = conn.cursor()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1"
).fetchone()[0]
sources = {
r[0]: r[1]
for r in c.execute(
"SELECT json_extract(metadata_json, '$.source') AS src, COUNT(*) FROM engrams GROUP BY src ORDER BY COUNT(*) DESC"
).fetchall()
if r[0] is not None
}
conn.close()
chroma_dir = WORKSPACE / "data" / "chroma"
emb_cache_dir = WORKSPACE / "data" / "embedding_cache"
vec_state_path = WORKSPACE / "data" / "vector_index_state.json"
vec_state = {}
if vec_state_path.exists():
try:
vec_state = json.loads(vec_state_path.read_text())
except Exception:
vec_state = {}
obsidian_cfg_path = WORKSPACE / "data" / "obsidian_config.json"
obsidian_cfg = None
if obsidian_cfg_path.exists():
try:
obsidian_cfg = json.loads(obsidian_cfg_path.read_text())
except Exception:
obsidian_cfg = {"raw": obsidian_cfg_path.read_text()[:2000]}
backup_files = sorted((WORKSPACE / "data").glob("backup_*.jsonl"))
return {
"sql": {
"total_engrams": total,
"confirmed": confirmed,
"pending": total - confirmed,
"by_source": sources,
},
"vector": {
"chroma_dir": str(chroma_dir),
"chroma_size_bytes": _dir_size_bytes(chroma_dir) if chroma_dir.exists() else 0,
"embedding_cache_dir": str(emb_cache_dir),
"embedding_cache_files": len(list(emb_cache_dir.glob("*.json"))) if emb_cache_dir.exists() else 0,
"vector_state": vec_state,
},
"obsidian": {
"config_path": str(obsidian_cfg_path),
"configured": bool(obsidian_cfg),
"config": obsidian_cfg,
},
"backups": {
"count": len(backup_files),
"latest": str(backup_files[-1]) if backup_files else None,
},
}
@app.get("/api/jobs")
def api_jobs():
# Known units that influence "freshness" of the brain.
units = [
"openclaw-secondbrain-ingest-memory.service",
"openclaw-secondbrain-index-vectors.service",
"openclaw-secondbrain-review.service",
"openclaw-secondbrain-heartbeat.service",
"openclaw-secondbrain-verify-pending.service",
]
return {"items": [_systemd_unit_state(u) for u in units]}
@app.get("/api/insights")
def api_insights(limit: int = Query(8, ge=1, le=50)):
conn = get_db()
c = conn.cursor()
rows = c.execute(
"SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC LIMIT 2000"
).fetchall()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1"
).fetchone()[0]
pending = total - confirmed
tag_counts: dict[str, int] = {}
source_counts: dict[str, int] = {}
host_counts: dict[str, int] = {}
active: list[dict] = []
forgotten: list[dict] = []
for r in rows:
meta = json.loads(r["metadata_json"] or "{}")
src = meta.get("source", "unknown")
source_counts[src] = source_counts.get(src, 0) + 1
for t in (meta.get("tags") or []):
if isinstance(t, str):
tag_counts[t] = tag_counts.get(t, 0) + 1
host = _host_from_meta(r["metadata_json"])
if host:
host_counts[host] = host_counts.get(host, 0) + 1
access_count = int(meta.get("access_count", 0) or 0)
created = meta.get("created", r["created_at"])
if access_count >= 5 and len(active) < limit:
active.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
if access_count == 0 and len(forgotten) < limit:
forgotten.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
def top_k(d: dict[str, int]) -> list[dict]:
return [
{"key": k, "count": v}
for k, v in sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:limit]
]
conn.close()
return {
"total": total,
"confirmed": confirmed,
"pending": pending,
"top_tags": top_k(tag_counts),
"top_sources": top_k(source_counts),
"top_hosts": top_k(host_counts),
"active_engrams": active,
"forgotten_engrams": forgotten,
}
@app.get("/api/graph")
def api_graph(limit_nodes: int = Query(200, ge=50, le=1000)):
"""
Returns a lightweight graph view:
- Nodes: engrams + tag:<tag> + host:<hostname>
- Edges: engram->tag and engram->host plus explicit engrams_links edges.
"""
conn = get_db()
c = conn.cursor()
rows = c.execute("SELECT id, metadata_json FROM engrams ORDER BY created_at DESC LIMIT 1000").fetchall()
link_rows = c.execute("SELECT from_id, to_id FROM engrams_links ORDER BY rowid DESC LIMIT 2000").fetchall()
conn.close()
nodes: dict[str, dict] = {}
edges: list[dict] = []
def add_node(nid: str, kind: str, label: str | None = None, weight: float | None = None):
if nid not in nodes:
nodes[nid] = {"id": nid, "kind": kind}
if label is not None:
nodes[nid]["label"] = label
if weight is not None:
nodes[nid]["weight"] = weight
for r in rows:
eid = r["id"]
add_node(eid, "engram", label=eid[:8])
for t in _safe_json_extract_tags(r["metadata_json"]):
tid = f"tag:{t}"
add_node(tid, "tag", label=t)
edges.append({"from": eid, "to": tid, "kind": "has_tag"})
host = _host_from_meta(r["metadata_json"])
if host:
hid = f"host:{host}"
add_node(hid, "host", label=host)
edges.append({"from": eid, "to": hid, "kind": "grounded_at"})
for fr, to in link_rows:
add_node(fr, "engram")
add_node(to, "engram")
edges.append({"from": fr, "to": to, "kind": "link"})
# Trim nodes to keep payload bounded (prefer engrams and connected tags/hosts)
if len(nodes) > limit_nodes:
# Keep a balanced subset: many engrams plus the most-connected non-engrams.
kept: dict[str, dict] = {}
engram_budget = int(limit_nodes * 0.7)
# 1) Keep newest engrams first (they appear first in `rows` loop insertion order)
for r in rows:
eid = r["id"]
if eid in nodes:
kept[eid] = nodes[eid]
if len(kept) >= engram_budget:
break
# 2) Rank remaining nodes by degree within current edge set
degree: dict[str, int] = {}
for e in edges:
degree[e["from"]] = degree.get(e["from"], 0) + 1
degree[e["to"]] = degree.get(e["to"], 0) + 1
remaining = [nid for nid in nodes.keys() if nid not in kept]
remaining.sort(key=lambda nid: degree.get(nid, 0), reverse=True)
for nid in remaining:
kept[nid] = nodes[nid]
if len(kept) >= limit_nodes:
break
nodes = kept
edges = [e for e in edges if e["from"] in nodes and e["to"] in nodes]
return {"nodes": list(nodes.values()), "edges": edges}
@app.get("/api/events")
def api_events():
"""
Server-Sent Events stream for lightweight real-time UI refresh.
"""
import time
def gen():
while True:
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"stats": api_stats(),
"storage": api_storage_stats(),
"jobs": api_jobs(),
"insights": api_insights(limit=8),
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
time.sleep(5)
return StreamingResponse(gen(), media_type="text/event-stream")
@app.exception_handler(FileNotFoundError)
def handle_file_not_found(request: Request, exc: FileNotFoundError):
return JSONResponse(
status_code=503,
content={
"error": str(exc),
"hint": "Set SECOND_BRAIN_DB_PATH or SECOND_BRAIN_WORKSPACE to a valid location.",
},
)
@app.exception_handler(sqlite3.Error)
def handle_sqlite_error(request: Request, exc: sqlite3.Error):
return JSONResponse(status_code=500, content={"error": f"sqlite error: {exc}"})
@app.get("/api/stats")
def api_stats():
conn = get_db()
c = conn.cursor()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1"
).fetchone()[0]
pending = total - confirmed
errors = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(metadata_json, '$.tags') LIKE '%error%'"
).fetchone()[0]
avg_conf = c.execute(
"SELECT AVG(json_extract(metadata_json, '$.confidence')) FROM engrams"
).fetchone()[0] or 0.0
conn.close()
return {
"total": total,
"confirmed": confirmed,
"pending": pending,
"errors": errors,
"avg_confidence": round(avg_conf, 2),
}
@app.get("/api/engrams")
def api_engrams(
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
tag: str = Query(None),
confirmed: bool = Query(None),
search: str = Query(None),
min_confidence: float = Query(0.0),
):
conn = get_db()
c = conn.cursor()
where_clauses = ["json_extract(metadata_json, '$.confidence') >= ?"]
params = [min_confidence]
if tag:
where_clauses.append("json_extract(metadata_json, '$.tags') LIKE ?")
params.append(f'%"{tag}"%')
if confirmed is not None:
where_clauses.append(
f"json_extract(correctness_json, '$.confirmed') = {int(confirmed)}"
)
if search:
# Use FTS
try:
ids = [
r[0] for r in c.execute(
"SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200",
(search,)
).fetchall()
]
if ids:
placeholders = ",".join("?" * len(ids))
where_clauses.append(f"id IN ({placeholders})")
params.extend(ids)
else:
# Full-text fallback on content
where_clauses.append("content LIKE ?")
params.append(f"%{search}%")
except Exception:
where_clauses.append("content LIKE ?")
params.append(f"%{search}%")
where_sql = " AND ".join(where_clauses)
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE {where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "limit": limit, "offset": offset}
@app.get("/api/engrams/{engram_id}")
def api_engram_detail(engram_id: str):
conn = get_db()
c = conn.cursor()
row = c.execute("SELECT * FROM engrams WHERE id = ?", (engram_id,)).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
# Links
links = c.execute(
"SELECT to_id FROM engrams_links WHERE from_id = ?", (engram_id,)
).fetchall()
result = parse_engram(row)
result["links"] = [r[0] for r in links]
conn.close()
return result
@app.get("/api/pending")
def api_pending(
limit: int = Query(20, ge=1, le=200),
offset: int = Query(0, ge=0),
source: str | None = Query(None),
):
conn = get_db()
c = conn.cursor()
where = ["json_extract(correctness_json, '$.confirmed') = 0"]
params: list = []
if source:
where.append("json_extract(metadata_json, '$.source') = ?")
params.append(source)
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE {' AND '.join(where)}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
items = [parse_engram(r) for r in rows]
conn.close()
return {"items": items, "limit": limit, "offset": offset}
@app.post("/api/engrams")
def api_create_engram(content: str = Form(...), tags: str = Form("")):
content = (content or "").strip()
if not content:
return JSONResponse({"error": "content required"}, status_code=400)
tag_list = [t.strip() for t in (tags or "").split(",") if t.strip()]
now = _now_iso()
engram_id = str(uuid4())
meta = {
"source": "user",
"confidence": 0.7,
"created": now,
"modified": now,
"access_count": 0,
"last_accessed": now,
"tags": tag_list,
"session_id": None,
"agent_id": None,
"grounding": 0,
}
corr = {
"confirmed": False,
"confirmations": 0,
"rejections": 0,
"last_reviewed": None,
"review_history": [],
}
conn = get_db()
c = conn.cursor()
c.execute(
"""
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, embedding_json, created_at, modified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
engram_id,
content,
json.dumps(meta, ensure_ascii=False),
json.dumps(corr, ensure_ascii=False),
"[]",
"{}",
None,
now,
now,
),
)
conn.commit()
conn.close()
return {"id": engram_id}
@app.post("/api/engrams/{engram_id}/confirm")
def api_confirm_engram(engram_id: str, reason: str = Form("")):
try:
return _update_correctness(engram_id, action="confirm", reason=reason)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/reject")
def api_reject_engram(engram_id: str, reason: str = Form("")):
try:
return _update_correctness(engram_id, action="reject", reason=reason)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/refresh")
def api_refresh_engram(engram_id: str):
try:
return _bump_access(engram_id)
except FileNotFoundError as e:
return JSONResponse({"error": str(e)}, status_code=404)
@app.post("/api/engrams/{engram_id}/confirm")
def api_confirm(engram_id: str, reason: str = Form("")):
conn = get_db()
c = conn.cursor()
row = c.execute(
"SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,)
).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
correctness = json.loads(row["correctness_json"] or "{}")
correctness["confirmed"] = True
correctness["confirmations"] = correctness.get("confirmations", 0) + 1
correctness["last_reviewed"] = datetime.now(timezone.utc).isoformat()
review_history = correctness.get("review_history", [])
review_history.append({
"by": "web",
"action": "confirm",
"at": datetime.now(timezone.utc).isoformat(),
"note": reason or "confirmed via dashboard",
})
correctness["review_history"] = review_history
c.execute(
"UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(correctness), datetime.now(timezone.utc).isoformat(), engram_id),
)
conn.commit()
conn.close()
return {"success": True, "engram_id": engram_id}
@app.post("/api/engrams/{engram_id}/reject")
def api_reject(engram_id: str, reason: str = Form("")):
conn = get_db()
c = conn.cursor()
row = c.execute(
"SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,)
).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
correctness = json.loads(row["correctness_json"] or "{}")
correctness["confirmed"] = False
correctness["rejections"] = correctness.get("rejections", 0) + 1
correctness["last_reviewed"] = datetime.now(timezone.utc).isoformat()
review_history = correctness.get("review_history", [])
review_history.append({
"by": "web",
"action": "reject",
"at": datetime.now(timezone.utc).isoformat(),
"note": reason or "rejected via dashboard",
})
correctness["review_history"] = review_history
c.execute(
"UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(correctness), datetime.now(timezone.utc).isoformat(), engram_id),
)
conn.commit()
conn.close()
return {"success": True, "engram_id": engram_id}
@app.post("/api/engrams/{engram_id}/refresh")
def api_refresh(engram_id: str):
conn = get_db()
c = conn.cursor()
row = c.execute(
"SELECT metadata_json, correctness_json FROM engrams WHERE id = ?", (engram_id,)
).fetchone()
if not row:
conn.close()
return JSONResponse({"error": "Not found"}, status_code=404)
meta = json.loads(row["metadata_json"] or "{}")
correctness = json.loads(row["correctness_json"] or "{}")
# Simple heuristic: confidence based on confirmations vs rejections
conf = 0.5
conf += 0.1 * correctness.get("confirmations", 0)
conf -= 0.15 * correctness.get("rejections", 0)
conf = max(0.1, min(1.0, conf))
meta["confidence"] = round(conf, 2)
meta["modified"] = datetime.now(timezone.utc).isoformat()
c.execute(
"UPDATE engrams SET metadata_json = ?, modified_at = ? WHERE id = ?",
(json.dumps(meta), datetime.now(timezone.utc).isoformat(), engram_id),
)
conn.commit()
conn.close()
return {"success": True, "new_confidence": round(conf, 2)}
@app.post("/api/engrams")
def api_create_engram(content: str = Form(...), tags: str = Form(""), source: str = Form("web")):
engram_id = f"web-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S-%f')[:20]}"
now = datetime.now(timezone.utc).isoformat()
meta = {
"source": source,
"confidence": 0.5,
"created": now,
"modified": now,
"access_count": 0,
"last_accessed": now,
"tags": [t.strip() for t in tags.split(",") if t.strip()] or ["web"],
"session_id": None,
"agent_id": None,
"grounding": 0,
"hash": "",
}
correctness = {
"confirmed": False,
"confirmations": 0,
"rejections": 0,
"last_reviewed": None,
"review_history": [],
}
conn = get_db()
c = conn.cursor()
c.execute(
"""
INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, created_at, modified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(engram_id, content, json.dumps(meta), json.dumps(correctness), "[]", '{"parent": null, "children": [], "depth": 0}', now, now),
)
conn.commit()
conn.close()
return {"success": True, "engram_id": engram_id}
@app.get("/api/pending")
def api_pending(limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0)):
conn = get_db()
c = conn.cursor()
rows = c.execute(
"""
SELECT * FROM engrams
WHERE json_extract(correctness_json, '$.confirmed') = 0
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "limit": limit, "offset": offset}
@app.get("/api/search")
def api_search(
q: str = Query(..., min_length=1),
min_confidence: float = Query(0.0),
limit: int = Query(20, ge=1, le=100),
):
conn = get_db()
c = conn.cursor()
try:
ids = [
r[0] for r in c.execute(
"SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200",
(q,)
).fetchall()
]
if ids:
placeholders = ",".join("?" * len(ids))
rows = c.execute(
f"""
SELECT * FROM engrams
WHERE id IN ({placeholders})
AND json_extract(metadata_json, '$.confidence') >= ?
ORDER BY created_at DESC
LIMIT ?
""",
ids + [min_confidence, limit],
).fetchall()
else:
rows = []
except Exception:
rows = c.execute(
"""
SELECT * FROM engrams
WHERE content LIKE ? AND json_extract(metadata_json, '$.confidence') >= ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{q}%", min_confidence, limit),
).fetchall()
result = [parse_engram(r) for r in rows]
conn.close()
return {"items": result, "query": q}
# ─── Frontend ────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
def dashboard(request: Request):
with open(WORKSPACE / "templates" / "dashboard.html", "r", encoding="utf-8") as f:
html = f.read()
return HTMLResponse(content=html)
# ─── Main ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("fastapi_app:app", host=HOST, port=PORT)