#!/usr/bin/env python3 import argparse import json import hashlib from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple from src.engram import Engram, Grounding from src.store import EngramStore def _now_utc_iso() -> str: return datetime.now(timezone.utc).isoformat() def _hash16(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] def _iter_jsonl(path: Path) -> Iterable[Dict[str, Any]]: with path.open("r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: raise SystemExit(f"Invalid JSON at {path}:{line_no}") if not isinstance(obj, dict): continue yield obj def _marker_to_content(marker_obj: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]: marker = str(marker_obj.get("marker", "")).strip() details = str(marker_obj.get("details", "")).strip() checks = marker_obj.get("checks") or [] sources = marker_obj.get("sources") or [] if not marker: raise ValueError("missing marker") evidence: List[Dict[str, Any]] = [] for src in sources: if not isinstance(src, dict): continue url = (src.get("url") or "").strip() title = (src.get("title") or "").strip() if not url: continue evidence.append({"url": url, "title": title}) lines: List[str] = [] lines.append(f"WEBDEV_MARKER: {marker}") if details: lines.append("") lines.append(f"Details: {details}") if isinstance(checks, list) and checks: lines.append("") lines.append("Checks:") for c in checks[:8]: c = str(c).strip() if c: lines.append(f"- {c}") if evidence: lines.append("") lines.append("Sources:") for ev in evidence[:12]: title = (ev.get("title") or "").strip() url = (ev.get("url") or "").strip() if title: lines.append(f"- {title}: {url}") else: lines.append(f"- {url}") return "\n".join(lines).strip(), evidence def _tags_for(marker_obj: Dict[str, Any]) -> List[str]: tags = ["web_design", "web_development", "mobile"] area = str(marker_obj.get("area", "")).strip() if area: tags.append(area) return tags def import_markers( db_path: Path, jsonl_paths: List[Path], source: str, verdict: str, agent_id: str, dry_run: bool, ) -> Dict[str, int]: store = EngramStore(str(db_path)) stats = {"seen": 0, "imported": 0, "skipped_dup": 0, "skipped_invalid": 0} seen_hashes: set[str] = set() # Preload existing hashes (fast-ish; avoids duplicate spam). existing_hashes: set[str] = set() try: cur = store._conn.execute("SELECT metadata_json FROM engrams") # noqa: SLF001 for row in cur.fetchall(): try: meta = json.loads(row["metadata_json"]) h = meta.get("hash") if isinstance(h, str) and h: existing_hashes.add(h) except Exception: continue except Exception: # If this fails (schema mismatch), proceed without preload. existing_hashes = set() for path in jsonl_paths: for marker_obj in _iter_jsonl(path): if (marker_obj.get("kind") or "") != "web_design_marker": continue stats["seen"] += 1 try: content, evidence = _marker_to_content(marker_obj) except Exception: stats["skipped_invalid"] += 1 continue h = _hash16(content) if h in seen_hashes or h in existing_hashes: stats["skipped_dup"] += 1 continue seen_hashes.add(h) eg = Engram.create( content=content, source=source, confidence=0.75, tags=_tags_for(marker_obj), session_id=None, agent_id=agent_id or str(marker_obj.get("agent_id") or ""), grounding=Grounding.SOURCED, ) # Overwrite hash to exactly match our content representation. eg.metadata["hash"] = h eg.metadata["modified"] = _now_utc_iso() eg.metadata["created"] = marker_obj.get("created_at") or eg.metadata["created"] eg.correctness.set_verdict( by=agent_id or "importer", verdict=verdict, note=f"Imported from {path.name}", evidence=evidence, ) if not dry_run: store.save(eg) stats["imported"] += 1 return stats def main() -> None: p = argparse.ArgumentParser(description="Import web_design_marker JSONL files into brain.sqlite") p.add_argument("--db", default="second-brain/data/brain.sqlite", help="Path to brain.sqlite") p.add_argument("--glob", default="/tmp/web_design_markers_*.jsonl", help="Glob for marker JSONL files") p.add_argument("--source", default="web_research", help="Engram source") p.add_argument("--verdict", default="probable_true", help="Correctness verdict") p.add_argument("--agent-id", default="web_research_import", help="Agent id to record") p.add_argument("--dry-run", action="store_true", help="Parse/dedupe but do not write to DB") args = p.parse_args() db_path = Path(args.db) jsonl_paths = sorted(Path("/").glob(args.glob.lstrip("/"))) if args.glob.startswith("/") else sorted(Path(".").glob(args.glob)) if not jsonl_paths: raise SystemExit(f"No files match glob: {args.glob}") stats = import_markers( db_path=db_path, jsonl_paths=jsonl_paths, source=args.source, verdict=args.verdict, agent_id=args.agent_id, dry_run=bool(args.dry_run), ) print(json.dumps({"db": str(db_path), "files": [str(p) for p in jsonl_paths], "stats": stats}, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()