#!/usr/bin/env python3 """ Second Brain FastAPI Dashboard Goals: - "Release-ready" defaults (no hardcoded absolute paths) - Minimal config via env vars - Serves the existing static dashboard (templates/dashboard.html + static/) """ import json import os import sqlite3 import subprocess from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse from fastapi import FastAPI, Form, Query, Request from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse from fastapi.staticfiles import StaticFiles # ─── Config ────────────────────────────────────────────────────────────────── REPO_ROOT = Path(__file__).resolve().parent WORKSPACE = Path(os.environ.get("SECOND_BRAIN_WORKSPACE", str(REPO_ROOT))).resolve() DB_PATH = Path(os.environ.get("SECOND_BRAIN_DB_PATH", str(WORKSPACE / "data" / "brain.sqlite"))).resolve() PORT = int(os.environ.get("SECOND_BRAIN_PORT", os.environ.get("PORT", "8501"))) HOST = os.environ.get("SECOND_BRAIN_HOST", "0.0.0.0") def create_app() -> FastAPI: app = FastAPI(title="Second Brain Dashboard") static_dir = WORKSPACE / "static" if static_dir.is_dir(): app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") return app app = create_app() # ─── Helpers ───────────────────────────────────────────────────────────────── def get_db(): if not DB_PATH.exists(): raise FileNotFoundError(f"DB not found: {DB_PATH}") conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def parse_engram(row: sqlite3.Row) -> dict: meta = json.loads(row["metadata_json"] or "{}") correctness = json.loads(row["correctness_json"] or "{}") return { "id": row["id"], "content": row["content"], "confidence": meta.get("confidence", 0.0), "confirmed": correctness.get("confirmed", False), "confirmations": correctness.get("confirmations", 0), "rejections": correctness.get("rejections", 0), "tags": meta.get("tags", []), "created": meta.get("created", row["created_at"]), "modified": meta.get("modified", row["modified_at"]), "last_reviewed": correctness.get("last_reviewed"), "review_history": correctness.get("review_history", []), "source": meta.get("source", "unknown"), "access_count": meta.get("access_count", 0), "grounding": meta.get("grounding", 0), } def _safe_json_extract_tags(meta_json: str) -> list[str]: try: d = json.loads(meta_json or "{}") tags = d.get("tags") or [] return [t for t in tags if isinstance(t, str)] except Exception: return [] def _host_from_meta(meta_json: str) -> str | None: try: d = json.loads(meta_json or "{}") grounding = d.get("grounding") url = d.get("url") if isinstance(grounding, dict) and isinstance(grounding.get("url"), str): url = grounding.get("url") if not isinstance(url, str): return None parsed = urlparse(url) return parsed.hostname except Exception: return None def _systemd_unit_state(unit: str) -> dict: """ Best-effort systemd status snapshot for a known unit. Never raises; returns minimal fields. """ try: out = subprocess.check_output( ["systemctl", "show", unit, "--no-page", "--property=ActiveState,SubState,Result,ExecMainStatus,ExecMainCode,ExecMainStartTimestamp,ExecMainExitTimestamp"], text=True, stderr=subprocess.STDOUT, timeout=2, ) kv = {} for line in out.splitlines(): if "=" in line: k, v = line.split("=", 1) kv[k] = v return { "unit": unit, "active": kv.get("ActiveState"), "sub": kv.get("SubState"), "result": kv.get("Result"), "exit_status": kv.get("ExecMainStatus"), "start_ts": kv.get("ExecMainStartTimestamp"), "exit_ts": kv.get("ExecMainExitTimestamp"), } except Exception as e: return {"unit": unit, "error": str(e)} def _dir_size_bytes(path: Path) -> int: total = 0 try: for p in path.rglob("*"): try: if p.is_file(): total += p.stat().st_size except Exception: pass except Exception: pass return total # ─── API Endpoints ─────────────────────────────────────────────────────────── @app.get("/healthz", response_class=PlainTextResponse) def healthz(): return "ok" @app.get("/api/config") def api_config(): return { "workspace": str(WORKSPACE), "db_path": str(DB_PATH), } @app.get("/api/db_info") def api_db_info(): if not DB_PATH.exists(): raise FileNotFoundError(f"DB not found: {DB_PATH}") st = DB_PATH.stat() return { "db_path": str(DB_PATH), "size_bytes": st.st_size, "mtime": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).isoformat(), } @app.get("/api/storage_stats") def api_storage_stats(): conn = get_db() c = conn.cursor() total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0] confirmed = c.execute( "SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1" ).fetchone()[0] sources = { r[0]: r[1] for r in c.execute( "SELECT json_extract(metadata_json, '$.source') AS src, COUNT(*) FROM engrams GROUP BY src ORDER BY COUNT(*) DESC" ).fetchall() if r[0] is not None } conn.close() chroma_dir = WORKSPACE / "data" / "chroma" emb_cache_dir = WORKSPACE / "data" / "embedding_cache" vec_state_path = WORKSPACE / "data" / "vector_index_state.json" vec_state = {} if vec_state_path.exists(): try: vec_state = json.loads(vec_state_path.read_text()) except Exception: vec_state = {} obsidian_cfg_path = WORKSPACE / "data" / "obsidian_config.json" obsidian_cfg = None if obsidian_cfg_path.exists(): try: obsidian_cfg = json.loads(obsidian_cfg_path.read_text()) except Exception: obsidian_cfg = {"raw": obsidian_cfg_path.read_text()[:2000]} backup_files = sorted((WORKSPACE / "data").glob("backup_*.jsonl")) return { "sql": { "total_engrams": total, "confirmed": confirmed, "pending": total - confirmed, "by_source": sources, }, "vector": { "chroma_dir": str(chroma_dir), "chroma_size_bytes": _dir_size_bytes(chroma_dir) if chroma_dir.exists() else 0, "embedding_cache_dir": str(emb_cache_dir), "embedding_cache_files": len(list(emb_cache_dir.glob("*.json"))) if emb_cache_dir.exists() else 0, "vector_state": vec_state, }, "obsidian": { "config_path": str(obsidian_cfg_path), "configured": bool(obsidian_cfg), "config": obsidian_cfg, }, "backups": { "count": len(backup_files), "latest": str(backup_files[-1]) if backup_files else None, }, } @app.get("/api/jobs") def api_jobs(): # Known units that influence "freshness" of the brain. units = [ "openclaw-secondbrain-ingest-memory.service", "openclaw-secondbrain-index-vectors.service", "openclaw-secondbrain-review.service", "openclaw-secondbrain-heartbeat.service", "openclaw-secondbrain-verify-pending.service", ] return {"items": [_systemd_unit_state(u) for u in units]} @app.get("/api/insights") def api_insights(limit: int = Query(8, ge=1, le=50)): conn = get_db() c = conn.cursor() rows = c.execute( "SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC LIMIT 2000" ).fetchall() total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0] confirmed = c.execute( "SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1" ).fetchone()[0] pending = total - confirmed tag_counts: dict[str, int] = {} source_counts: dict[str, int] = {} host_counts: dict[str, int] = {} active: list[dict] = [] forgotten: list[dict] = [] for r in rows: meta = json.loads(r["metadata_json"] or "{}") src = meta.get("source", "unknown") source_counts[src] = source_counts.get(src, 0) + 1 for t in (meta.get("tags") or []): if isinstance(t, str): tag_counts[t] = tag_counts.get(t, 0) + 1 host = _host_from_meta(r["metadata_json"]) if host: host_counts[host] = host_counts.get(host, 0) + 1 access_count = int(meta.get("access_count", 0) or 0) created = meta.get("created", r["created_at"]) if access_count >= 5 and len(active) < limit: active.append( { "id": r["id"], "access_count": access_count, "source": src, "created": created, } ) if access_count == 0 and len(forgotten) < limit: forgotten.append( { "id": r["id"], "access_count": access_count, "source": src, "created": created, } ) def top_k(d: dict[str, int]) -> list[dict]: return [ {"key": k, "count": v} for k, v in sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:limit] ] conn.close() return { "total": total, "confirmed": confirmed, "pending": pending, "top_tags": top_k(tag_counts), "top_sources": top_k(source_counts), "top_hosts": top_k(host_counts), "active_engrams": active, "forgotten_engrams": forgotten, } @app.get("/api/graph") def api_graph(limit_nodes: int = Query(200, ge=50, le=1000)): """ Returns a lightweight graph view: - Nodes: engrams + tag: + host: - Edges: engram->tag and engram->host plus explicit engrams_links edges. """ conn = get_db() c = conn.cursor() rows = c.execute("SELECT id, metadata_json FROM engrams ORDER BY created_at DESC LIMIT 1000").fetchall() link_rows = c.execute("SELECT from_id, to_id FROM engrams_links ORDER BY rowid DESC LIMIT 2000").fetchall() conn.close() nodes: dict[str, dict] = {} edges: list[dict] = [] def add_node(nid: str, kind: str, label: str | None = None, weight: float | None = None): if nid not in nodes: nodes[nid] = {"id": nid, "kind": kind} if label is not None: nodes[nid]["label"] = label if weight is not None: nodes[nid]["weight"] = weight for r in rows: eid = r["id"] add_node(eid, "engram", label=eid[:8]) for t in _safe_json_extract_tags(r["metadata_json"]): tid = f"tag:{t}" add_node(tid, "tag", label=t) edges.append({"from": eid, "to": tid, "kind": "has_tag"}) host = _host_from_meta(r["metadata_json"]) if host: hid = f"host:{host}" add_node(hid, "host", label=host) edges.append({"from": eid, "to": hid, "kind": "grounded_at"}) for fr, to in link_rows: add_node(fr, "engram") add_node(to, "engram") edges.append({"from": fr, "to": to, "kind": "link"}) # Trim nodes to keep payload bounded (prefer engrams and connected tags/hosts) if len(nodes) > limit_nodes: # Keep a balanced subset: many engrams plus the most-connected non-engrams. kept: dict[str, dict] = {} engram_budget = int(limit_nodes * 0.7) # 1) Keep newest engrams first (they appear first in `rows` loop insertion order) for r in rows: eid = r["id"] if eid in nodes: kept[eid] = nodes[eid] if len(kept) >= engram_budget: break # 2) Rank remaining nodes by degree within current edge set degree: dict[str, int] = {} for e in edges: degree[e["from"]] = degree.get(e["from"], 0) + 1 degree[e["to"]] = degree.get(e["to"], 0) + 1 remaining = [nid for nid in nodes.keys() if nid not in kept] remaining.sort(key=lambda nid: degree.get(nid, 0), reverse=True) for nid in remaining: kept[nid] = nodes[nid] if len(kept) >= limit_nodes: break nodes = kept edges = [e for e in edges if e["from"] in nodes and e["to"] in nodes] return {"nodes": list(nodes.values()), "edges": edges} @app.get("/api/events") def api_events(): """ Server-Sent Events stream for lightweight real-time UI refresh. """ import time def gen(): while True: payload = { "ts": datetime.now(timezone.utc).isoformat(), "stats": api_stats(), "storage": api_storage_stats(), "jobs": api_jobs(), "insights": api_insights(limit=8), } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" time.sleep(5) return StreamingResponse(gen(), media_type="text/event-stream") @app.exception_handler(FileNotFoundError) def handle_file_not_found(request: Request, exc: FileNotFoundError): return JSONResponse( status_code=503, content={ "error": str(exc), "hint": "Set SECOND_BRAIN_DB_PATH or SECOND_BRAIN_WORKSPACE to a valid location.", }, ) @app.exception_handler(sqlite3.Error) def handle_sqlite_error(request: Request, exc: sqlite3.Error): return JSONResponse(status_code=500, content={"error": f"sqlite error: {exc}"}) @app.get("/api/stats") def api_stats(): conn = get_db() c = conn.cursor() total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0] confirmed = c.execute( "SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1" ).fetchone()[0] pending = total - confirmed errors = c.execute( "SELECT COUNT(*) FROM engrams WHERE json_extract(metadata_json, '$.tags') LIKE '%error%'" ).fetchone()[0] avg_conf = c.execute( "SELECT AVG(json_extract(metadata_json, '$.confidence')) FROM engrams" ).fetchone()[0] or 0.0 conn.close() return { "total": total, "confirmed": confirmed, "pending": pending, "errors": errors, "avg_confidence": round(avg_conf, 2), } @app.get("/api/engrams") def api_engrams( limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0), tag: str = Query(None), confirmed: bool = Query(None), search: str = Query(None), min_confidence: float = Query(0.0), ): conn = get_db() c = conn.cursor() where_clauses = ["json_extract(metadata_json, '$.confidence') >= ?"] params = [min_confidence] if tag: where_clauses.append("json_extract(metadata_json, '$.tags') LIKE ?") params.append(f'%"{tag}"%') if confirmed is not None: where_clauses.append( f"json_extract(correctness_json, '$.confirmed') = {int(confirmed)}" ) if search: # Use FTS try: ids = [ r[0] for r in c.execute( "SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200", (search,) ).fetchall() ] if ids: placeholders = ",".join("?" * len(ids)) where_clauses.append(f"id IN ({placeholders})") params.extend(ids) else: # Full-text fallback on content where_clauses.append("content LIKE ?") params.append(f"%{search}%") except Exception: where_clauses.append("content LIKE ?") params.append(f"%{search}%") where_sql = " AND ".join(where_clauses) rows = c.execute( f""" SELECT * FROM engrams WHERE {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ? """, params + [limit, offset], ).fetchall() result = [parse_engram(r) for r in rows] conn.close() return {"items": result, "limit": limit, "offset": offset} @app.get("/api/engrams/{engram_id}") def api_engram_detail(engram_id: str): conn = get_db() c = conn.cursor() row = c.execute("SELECT * FROM engrams WHERE id = ?", (engram_id,)).fetchone() if not row: conn.close() return JSONResponse({"error": "Not found"}, status_code=404) # Links links = c.execute( "SELECT to_id FROM engrams_links WHERE from_id = ?", (engram_id,) ).fetchall() result = parse_engram(row) result["links"] = [r[0] for r in links] conn.close() return result @app.post("/api/engrams/{engram_id}/confirm") def api_confirm(engram_id: str, reason: str = Form("")): conn = get_db() c = conn.cursor() row = c.execute( "SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,) ).fetchone() if not row: conn.close() return JSONResponse({"error": "Not found"}, status_code=404) correctness = json.loads(row["correctness_json"] or "{}") correctness["confirmed"] = True correctness["confirmations"] = correctness.get("confirmations", 0) + 1 correctness["last_reviewed"] = datetime.now(timezone.utc).isoformat() review_history = correctness.get("review_history", []) review_history.append({ "by": "web", "action": "confirm", "at": datetime.now(timezone.utc).isoformat(), "note": reason or "confirmed via dashboard", }) correctness["review_history"] = review_history c.execute( "UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?", (json.dumps(correctness), datetime.now(timezone.utc).isoformat(), engram_id), ) conn.commit() conn.close() return {"success": True, "engram_id": engram_id} @app.post("/api/engrams/{engram_id}/reject") def api_reject(engram_id: str, reason: str = Form("")): conn = get_db() c = conn.cursor() row = c.execute( "SELECT correctness_json FROM engrams WHERE id = ?", (engram_id,) ).fetchone() if not row: conn.close() return JSONResponse({"error": "Not found"}, status_code=404) correctness = json.loads(row["correctness_json"] or "{}") correctness["confirmed"] = False correctness["rejections"] = correctness.get("rejections", 0) + 1 correctness["last_reviewed"] = datetime.now(timezone.utc).isoformat() review_history = correctness.get("review_history", []) review_history.append({ "by": "web", "action": "reject", "at": datetime.now(timezone.utc).isoformat(), "note": reason or "rejected via dashboard", }) correctness["review_history"] = review_history c.execute( "UPDATE engrams SET correctness_json = ?, modified_at = ? WHERE id = ?", (json.dumps(correctness), datetime.now(timezone.utc).isoformat(), engram_id), ) conn.commit() conn.close() return {"success": True, "engram_id": engram_id} @app.post("/api/engrams/{engram_id}/refresh") def api_refresh(engram_id: str): conn = get_db() c = conn.cursor() row = c.execute( "SELECT metadata_json, correctness_json FROM engrams WHERE id = ?", (engram_id,) ).fetchone() if not row: conn.close() return JSONResponse({"error": "Not found"}, status_code=404) meta = json.loads(row["metadata_json"] or "{}") correctness = json.loads(row["correctness_json"] or "{}") # Simple heuristic: confidence based on confirmations vs rejections conf = 0.5 conf += 0.1 * correctness.get("confirmations", 0) conf -= 0.15 * correctness.get("rejections", 0) conf = max(0.1, min(1.0, conf)) meta["confidence"] = round(conf, 2) meta["modified"] = datetime.now(timezone.utc).isoformat() c.execute( "UPDATE engrams SET metadata_json = ?, modified_at = ? WHERE id = ?", (json.dumps(meta), datetime.now(timezone.utc).isoformat(), engram_id), ) conn.commit() conn.close() return {"success": True, "new_confidence": round(conf, 2)} @app.post("/api/engrams") def api_create_engram(content: str = Form(...), tags: str = Form(""), source: str = Form("web")): engram_id = f"web-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S-%f')[:20]}" now = datetime.now(timezone.utc).isoformat() meta = { "source": source, "confidence": 0.5, "created": now, "modified": now, "access_count": 0, "last_accessed": now, "tags": [t.strip() for t in tags.split(",") if t.strip()] or ["web"], "session_id": None, "agent_id": None, "grounding": 0, "hash": "", } correctness = { "confirmed": False, "confirmations": 0, "rejections": 0, "last_reviewed": None, "review_history": [], } conn = get_db() c = conn.cursor() c.execute( """ INSERT INTO engrams (id, content, metadata_json, correctness_json, links_json, hierarchy_json, created_at, modified_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (engram_id, content, json.dumps(meta), json.dumps(correctness), "[]", '{"parent": null, "children": [], "depth": 0}', now, now), ) conn.commit() conn.close() return {"success": True, "engram_id": engram_id} @app.get("/api/pending") def api_pending(limit: int = Query(20, ge=1, le=100), offset: int = Query(0, ge=0)): conn = get_db() c = conn.cursor() rows = c.execute( """ SELECT * FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 0 ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() result = [parse_engram(r) for r in rows] conn.close() return {"items": result, "limit": limit, "offset": offset} @app.get("/api/search") def api_search( q: str = Query(..., min_length=1), min_confidence: float = Query(0.0), limit: int = Query(20, ge=1, le=100), ): conn = get_db() c = conn.cursor() try: ids = [ r[0] for r in c.execute( "SELECT rowid FROM engrams_fts WHERE content MATCH ? LIMIT 200", (q,) ).fetchall() ] if ids: placeholders = ",".join("?" * len(ids)) rows = c.execute( f""" SELECT * FROM engrams WHERE id IN ({placeholders}) AND json_extract(metadata_json, '$.confidence') >= ? ORDER BY created_at DESC LIMIT ? """, ids + [min_confidence, limit], ).fetchall() else: rows = [] except Exception: rows = c.execute( """ SELECT * FROM engrams WHERE content LIKE ? AND json_extract(metadata_json, '$.confidence') >= ? ORDER BY created_at DESC LIMIT ? """, (f"%{q}%", min_confidence, limit), ).fetchall() result = [parse_engram(r) for r in rows] conn.close() return {"items": result, "query": q} # ─── Frontend ──────────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) def dashboard(request: Request): with open(WORKSPACE / "templates" / "dashboard.html", "r", encoding="utf-8") as f: html = f.read() return HTMLResponse(content=html) # ─── Main ──────────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("fastapi_app:app", host=HOST, port=PORT)