192 lines
6.2 KiB
Python
192 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import hashlib
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
|
|
|
from src.engram import Engram, Grounding
|
|
from src.store import EngramStore
|
|
|
|
|
|
def _now_utc_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _hash16(text: str) -> str:
|
|
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
|
|
|
|
|
|
def _iter_jsonl(path: Path) -> Iterable[Dict[str, Any]]:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
for line_no, line in enumerate(f, start=1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except Exception:
|
|
raise SystemExit(f"Invalid JSON at {path}:{line_no}")
|
|
if not isinstance(obj, dict):
|
|
continue
|
|
yield obj
|
|
|
|
|
|
def _marker_to_content(marker_obj: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]:
|
|
marker = str(marker_obj.get("marker", "")).strip()
|
|
details = str(marker_obj.get("details", "")).strip()
|
|
checks = marker_obj.get("checks") or []
|
|
sources = marker_obj.get("sources") or []
|
|
|
|
if not marker:
|
|
raise ValueError("missing marker")
|
|
|
|
evidence: List[Dict[str, Any]] = []
|
|
for src in sources:
|
|
if not isinstance(src, dict):
|
|
continue
|
|
url = (src.get("url") or "").strip()
|
|
title = (src.get("title") or "").strip()
|
|
if not url:
|
|
continue
|
|
evidence.append({"url": url, "title": title})
|
|
|
|
lines: List[str] = []
|
|
lines.append(f"WEBDEV_MARKER: {marker}")
|
|
if details:
|
|
lines.append("")
|
|
lines.append(f"Details: {details}")
|
|
if isinstance(checks, list) and checks:
|
|
lines.append("")
|
|
lines.append("Checks:")
|
|
for c in checks[:8]:
|
|
c = str(c).strip()
|
|
if c:
|
|
lines.append(f"- {c}")
|
|
if evidence:
|
|
lines.append("")
|
|
lines.append("Sources:")
|
|
for ev in evidence[:12]:
|
|
title = (ev.get("title") or "").strip()
|
|
url = (ev.get("url") or "").strip()
|
|
if title:
|
|
lines.append(f"- {title}: {url}")
|
|
else:
|
|
lines.append(f"- {url}")
|
|
return "\n".join(lines).strip(), evidence
|
|
|
|
|
|
def _tags_for(marker_obj: Dict[str, Any]) -> List[str]:
|
|
tags = ["web_design", "web_development", "mobile"]
|
|
area = str(marker_obj.get("area", "")).strip()
|
|
if area:
|
|
tags.append(area)
|
|
return tags
|
|
|
|
|
|
def import_markers(
|
|
db_path: Path,
|
|
jsonl_paths: List[Path],
|
|
source: str,
|
|
verdict: str,
|
|
agent_id: str,
|
|
dry_run: bool,
|
|
) -> Dict[str, int]:
|
|
store = EngramStore(str(db_path))
|
|
|
|
stats = {"seen": 0, "imported": 0, "skipped_dup": 0, "skipped_invalid": 0}
|
|
seen_hashes: set[str] = set()
|
|
|
|
# Preload existing hashes (fast-ish; avoids duplicate spam).
|
|
existing_hashes: set[str] = set()
|
|
try:
|
|
cur = store._conn.execute("SELECT metadata_json FROM engrams") # noqa: SLF001
|
|
for row in cur.fetchall():
|
|
try:
|
|
meta = json.loads(row["metadata_json"])
|
|
h = meta.get("hash")
|
|
if isinstance(h, str) and h:
|
|
existing_hashes.add(h)
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
# If this fails (schema mismatch), proceed without preload.
|
|
existing_hashes = set()
|
|
|
|
for path in jsonl_paths:
|
|
for marker_obj in _iter_jsonl(path):
|
|
if (marker_obj.get("kind") or "") != "web_design_marker":
|
|
continue
|
|
stats["seen"] += 1
|
|
try:
|
|
content, evidence = _marker_to_content(marker_obj)
|
|
except Exception:
|
|
stats["skipped_invalid"] += 1
|
|
continue
|
|
|
|
h = _hash16(content)
|
|
if h in seen_hashes or h in existing_hashes:
|
|
stats["skipped_dup"] += 1
|
|
continue
|
|
seen_hashes.add(h)
|
|
|
|
eg = Engram.create(
|
|
content=content,
|
|
source=source,
|
|
confidence=0.75,
|
|
tags=_tags_for(marker_obj),
|
|
session_id=None,
|
|
agent_id=agent_id or str(marker_obj.get("agent_id") or ""),
|
|
grounding=Grounding.SOURCED,
|
|
)
|
|
# Overwrite hash to exactly match our content representation.
|
|
eg.metadata["hash"] = h
|
|
eg.metadata["modified"] = _now_utc_iso()
|
|
eg.metadata["created"] = marker_obj.get("created_at") or eg.metadata["created"]
|
|
|
|
eg.correctness.set_verdict(
|
|
by=agent_id or "importer",
|
|
verdict=verdict,
|
|
note=f"Imported from {path.name}",
|
|
evidence=evidence,
|
|
)
|
|
|
|
if not dry_run:
|
|
store.save(eg)
|
|
stats["imported"] += 1
|
|
|
|
return stats
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser(description="Import web_design_marker JSONL files into brain.sqlite")
|
|
p.add_argument("--db", default="second-brain/data/brain.sqlite", help="Path to brain.sqlite")
|
|
p.add_argument("--glob", default="/tmp/web_design_markers_*.jsonl", help="Glob for marker JSONL files")
|
|
p.add_argument("--source", default="web_research", help="Engram source")
|
|
p.add_argument("--verdict", default="probable_true", help="Correctness verdict")
|
|
p.add_argument("--agent-id", default="web_research_import", help="Agent id to record")
|
|
p.add_argument("--dry-run", action="store_true", help="Parse/dedupe but do not write to DB")
|
|
args = p.parse_args()
|
|
|
|
db_path = Path(args.db)
|
|
jsonl_paths = sorted(Path("/").glob(args.glob.lstrip("/"))) if args.glob.startswith("/") else sorted(Path(".").glob(args.glob))
|
|
if not jsonl_paths:
|
|
raise SystemExit(f"No files match glob: {args.glob}")
|
|
|
|
stats = import_markers(
|
|
db_path=db_path,
|
|
jsonl_paths=jsonl_paths,
|
|
source=args.source,
|
|
verdict=args.verdict,
|
|
agent_id=args.agent_id,
|
|
dry_run=bool(args.dry_run),
|
|
)
|
|
|
|
print(json.dumps({"db": str(db_path), "files": [str(p) for p in jsonl_paths], "stats": stats}, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|