feat(dashboard): add status+graph views

This commit is contained in:
2026-05-27 00:11:44 +02:00
parent ec8870ea40
commit e5061b317f
3 changed files with 376 additions and 0 deletions

View File

@@ -11,8 +11,10 @@ Goals:
import json
import os
import sqlite3
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
from fastapi import FastAPI, Form, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
@@ -68,6 +70,56 @@ def parse_engram(row: sqlite3.Row) -> dict:
"grounding": meta.get("grounding", 0),
}
def _safe_json_extract_tags(meta_json: str) -> list[str]:
try:
d = json.loads(meta_json or "{}")
tags = d.get("tags") or []
return [t for t in tags if isinstance(t, str)]
except Exception:
return []
def _host_from_meta(meta_json: str) -> str | None:
try:
d = json.loads(meta_json or "{}")
grounding = d.get("grounding")
url = d.get("url")
if isinstance(grounding, dict) and isinstance(grounding.get("url"), str):
url = grounding.get("url")
if not isinstance(url, str):
return None
parsed = urlparse(url)
return parsed.hostname
except Exception:
return None
def _systemd_unit_state(unit: str) -> dict:
"""
Best-effort systemd status snapshot for a known unit.
Never raises; returns minimal fields.
"""
try:
out = subprocess.check_output(
["systemctl", "show", unit, "--no-page", "--property=ActiveState,SubState,Result,ExecMainStatus,ExecMainCode,ExecMainStartTimestamp,ExecMainExitTimestamp"],
text=True,
stderr=subprocess.STDOUT,
timeout=2,
)
kv = {}
for line in out.splitlines():
if "=" in line:
k, v = line.split("=", 1)
kv[k] = v
return {
"unit": unit,
"active": kv.get("ActiveState"),
"sub": kv.get("SubState"),
"result": kv.get("Result"),
"exit_status": kv.get("ExecMainStatus"),
"start_ts": kv.get("ExecMainStartTimestamp"),
"exit_ts": kv.get("ExecMainExitTimestamp"),
}
except Exception as e:
return {"unit": unit, "error": str(e)}
# ─── API Endpoints ───────────────────────────────────────────────────────────
@@ -83,6 +135,168 @@ def api_config():
"db_path": str(DB_PATH),
}
@app.get("/api/db_info")
def api_db_info():
if not DB_PATH.exists():
raise FileNotFoundError(f"DB not found: {DB_PATH}")
st = DB_PATH.stat()
return {
"db_path": str(DB_PATH),
"size_bytes": st.st_size,
"mtime": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).isoformat(),
}
@app.get("/api/jobs")
def api_jobs():
# Known units that influence "freshness" of the brain.
units = [
"openclaw-secondbrain-ingest-memory.service",
"openclaw-secondbrain-index-vectors.service",
"openclaw-secondbrain-review.service",
"openclaw-secondbrain-heartbeat.service",
"openclaw-secondbrain-verify-pending.service",
]
return {"items": [_systemd_unit_state(u) for u in units]}
@app.get("/api/insights")
def api_insights(limit: int = Query(8, ge=1, le=50)):
conn = get_db()
c = conn.cursor()
rows = c.execute(
"SELECT id, metadata_json, correctness_json, created_at, modified_at FROM engrams ORDER BY created_at DESC LIMIT 2000"
).fetchall()
total = c.execute("SELECT COUNT(*) FROM engrams").fetchone()[0]
confirmed = c.execute(
"SELECT COUNT(*) FROM engrams WHERE json_extract(correctness_json, '$.confirmed') = 1"
).fetchone()[0]
pending = total - confirmed
tag_counts: dict[str, int] = {}
source_counts: dict[str, int] = {}
host_counts: dict[str, int] = {}
active: list[dict] = []
forgotten: list[dict] = []
for r in rows:
meta = json.loads(r["metadata_json"] or "{}")
src = meta.get("source", "unknown")
source_counts[src] = source_counts.get(src, 0) + 1
for t in (meta.get("tags") or []):
if isinstance(t, str):
tag_counts[t] = tag_counts.get(t, 0) + 1
host = _host_from_meta(r["metadata_json"])
if host:
host_counts[host] = host_counts.get(host, 0) + 1
access_count = int(meta.get("access_count", 0) or 0)
created = meta.get("created", r["created_at"])
if access_count >= 5 and len(active) < limit:
active.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
if access_count == 0 and len(forgotten) < limit:
forgotten.append(
{
"id": r["id"],
"access_count": access_count,
"source": src,
"created": created,
}
)
def top_k(d: dict[str, int]) -> list[dict]:
return [
{"key": k, "count": v}
for k, v in sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:limit]
]
conn.close()
return {
"total": total,
"confirmed": confirmed,
"pending": pending,
"top_tags": top_k(tag_counts),
"top_sources": top_k(source_counts),
"top_hosts": top_k(host_counts),
"active_engrams": active,
"forgotten_engrams": forgotten,
}
@app.get("/api/graph")
def api_graph(limit_nodes: int = Query(200, ge=50, le=1000)):
"""
Returns a lightweight graph view:
- Nodes: engrams + tag:<tag> + host:<hostname>
- Edges: engram->tag and engram->host plus explicit engrams_links edges.
"""
conn = get_db()
c = conn.cursor()
rows = c.execute("SELECT id, metadata_json FROM engrams ORDER BY created_at DESC LIMIT 1000").fetchall()
link_rows = c.execute("SELECT from_id, to_id FROM engrams_links ORDER BY rowid DESC LIMIT 2000").fetchall()
conn.close()
nodes: dict[str, dict] = {}
edges: list[dict] = []
def add_node(nid: str, kind: str):
if nid not in nodes:
nodes[nid] = {"id": nid, "kind": kind}
for r in rows:
eid = r["id"]
add_node(eid, "engram")
for t in _safe_json_extract_tags(r["metadata_json"]):
tid = f"tag:{t}"
add_node(tid, "tag")
edges.append({"from": eid, "to": tid, "kind": "has_tag"})
host = _host_from_meta(r["metadata_json"])
if host:
hid = f"host:{host}"
add_node(hid, "host")
edges.append({"from": eid, "to": hid, "kind": "grounded_at"})
for fr, to in link_rows:
add_node(fr, "engram")
add_node(to, "engram")
edges.append({"from": fr, "to": to, "kind": "link"})
# Trim nodes to keep payload bounded (prefer engrams and connected tags/hosts)
if len(nodes) > limit_nodes:
# Keep a balanced subset: many engrams plus the most-connected non-engrams.
kept: dict[str, dict] = {}
engram_budget = int(limit_nodes * 0.7)
# 1) Keep newest engrams first (they appear first in `rows` loop insertion order)
for r in rows:
eid = r["id"]
if eid in nodes:
kept[eid] = nodes[eid]
if len(kept) >= engram_budget:
break
# 2) Rank remaining nodes by degree within current edge set
degree: dict[str, int] = {}
for e in edges:
degree[e["from"]] = degree.get(e["from"], 0) + 1
degree[e["to"]] = degree.get(e["to"], 0) + 1
remaining = [nid for nid in nodes.keys() if nid not in kept]
remaining.sort(key=lambda nid: degree.get(nid, 0), reverse=True)
for nid in remaining:
kept[nid] = nodes[nid]
if len(kept) >= limit_nodes:
break
nodes = kept
edges = [e for e in edges if e["from"] in nodes and e["to"] in nodes]
return {"nodes": list(nodes.values()), "edges": edges}
@app.exception_handler(FileNotFoundError)
def handle_file_not_found(request: Request, exc: FileNotFoundError):