Second-brain 2.0: hybrid retrieval, obsidian bridge, vector watermark, tests
This commit is contained in:
159
scripts/discover_obsidian_vault.py
Normal file
159
scripts/discover_obsidian_vault.py
Normal file
@@ -0,0 +1,159 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Auto-discover an Obsidian vault on this server and (optionally) write it into:
|
||||
second-brain/data/obsidian_config.json
|
||||
|
||||
Safety:
|
||||
- Only writes when exactly one vault is detected (unambiguous).
|
||||
- A "vault" is a directory that contains a `.obsidian/` folder.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
WORKSPACE = Path("/root/.openclaw/workspace")
|
||||
BRAIN_DIR = WORKSPACE / "second-brain"
|
||||
CONFIG_PATH = BRAIN_DIR / "data" / "obsidian_config.json"
|
||||
|
||||
|
||||
def _iter_common_candidates() -> Iterable[Path]:
|
||||
env = os.environ.get("OBSIDIAN_VAULT_PATH")
|
||||
if env:
|
||||
yield Path(env).expanduser()
|
||||
|
||||
home = Path.home()
|
||||
for p in [
|
||||
home / "Obsidian",
|
||||
home / "ObsidianVault",
|
||||
home / "Vault",
|
||||
home / "Vaults",
|
||||
home / "Documents" / "Obsidian",
|
||||
home / "Documents" / "Vaults",
|
||||
home / "Syncthing" / "Obsidian",
|
||||
Path("/srv/obsidian"),
|
||||
Path("/srv/Obsidian"),
|
||||
Path("/data/obsidian"),
|
||||
Path("/data/Obsidian"),
|
||||
WORKSPACE / "obsidian",
|
||||
WORKSPACE / "vault",
|
||||
WORKSPACE / "vaults",
|
||||
]:
|
||||
yield p
|
||||
|
||||
|
||||
def _is_vault_dir(p: Path) -> bool:
|
||||
try:
|
||||
return p.exists() and p.is_dir() and (p / ".obsidian").exists() and (p / ".obsidian").is_dir()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _bounded_find_obsidian_dirs(root: Path, *, max_depth: int) -> list[Path]:
|
||||
"""
|
||||
Find `.obsidian` directories below root, limited by depth to keep runtime bounded.
|
||||
"""
|
||||
results: list[Path] = []
|
||||
try:
|
||||
root = root.resolve()
|
||||
except Exception:
|
||||
return results
|
||||
|
||||
if not root.exists() or not root.is_dir():
|
||||
return results
|
||||
|
||||
def depth_of(path: Path) -> int:
|
||||
try:
|
||||
return len(path.relative_to(root).parts)
|
||||
except Exception:
|
||||
return 9999
|
||||
|
||||
# Breadth-first-ish scan with pruning
|
||||
queue = [root]
|
||||
while queue:
|
||||
current = queue.pop(0)
|
||||
if depth_of(current) > max_depth:
|
||||
continue
|
||||
try:
|
||||
entries = list(current.iterdir())
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
for e in entries:
|
||||
name = e.name
|
||||
if name in (".git", "node_modules", "__pycache__", ".cache", ".venv", "venv", "tmp", "proc", "sys", "dev"):
|
||||
continue
|
||||
if name.startswith(".") and name not in (".obsidian",):
|
||||
continue
|
||||
if name == ".obsidian" and e.is_dir():
|
||||
results.append(e)
|
||||
continue
|
||||
if e.is_dir() and not e.is_symlink():
|
||||
queue.append(e)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def discover(*, roots: list[Path], max_depth: int) -> list[Path]:
|
||||
vaults: set[Path] = set()
|
||||
|
||||
for p in _iter_common_candidates():
|
||||
if _is_vault_dir(p):
|
||||
vaults.add(p.resolve())
|
||||
|
||||
for root in roots:
|
||||
for obsidian_dir in _bounded_find_obsidian_dirs(root, max_depth=max_depth):
|
||||
vaults.add(obsidian_dir.parent.resolve())
|
||||
|
||||
return sorted(vaults)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description="Discover Obsidian vault and optionally write config")
|
||||
ap.add_argument("--write", action="store_true", help="Write detected vault_path into obsidian_config.json")
|
||||
ap.add_argument(
|
||||
"--roots",
|
||||
nargs="*",
|
||||
default=[str(Path.home()), "/srv", "/data", "/mnt", str(WORKSPACE)],
|
||||
help="Roots to scan (bounded). Default: home,/srv,/data,/mnt,workspace",
|
||||
)
|
||||
ap.add_argument("--max-depth", type=int, default=4, help="Max directory depth to scan under each root")
|
||||
args = ap.parse_args()
|
||||
|
||||
roots = [Path(r).expanduser() for r in args.roots]
|
||||
vaults = discover(roots=roots, max_depth=int(args.max_depth))
|
||||
|
||||
if not vaults:
|
||||
print("No Obsidian vault found (no `.obsidian/` directories detected).")
|
||||
return 1
|
||||
|
||||
if len(vaults) > 1:
|
||||
print("Multiple Obsidian vaults found; refusing to write config:")
|
||||
for v in vaults:
|
||||
print(f"- {v}")
|
||||
return 2
|
||||
|
||||
vault = vaults[0]
|
||||
print(f"Detected Obsidian vault: {vault}")
|
||||
|
||||
if not args.write:
|
||||
return 0
|
||||
|
||||
if not CONFIG_PATH.exists():
|
||||
raise SystemExit(f"Missing config file: {CONFIG_PATH}")
|
||||
|
||||
cfg = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
||||
cfg["vault_path"] = str(vault)
|
||||
CONFIG_PATH.write_text(json.dumps(cfg, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
||||
print(f"Wrote vault_path to: {CONFIG_PATH}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
Reference in New Issue
Block a user