#!/usr/bin/env python3 """ Auto-discover an Obsidian vault on this server and (optionally) write it into: second-brain/data/obsidian_config.json Safety: - Only writes when exactly one vault is detected (unambiguous). - A "vault" is a directory that contains a `.obsidian/` folder. """ from __future__ import annotations import argparse import json import os from pathlib import Path from typing import Iterable WORKSPACE = Path("/root/.openclaw/workspace") BRAIN_DIR = WORKSPACE / "second-brain" CONFIG_PATH = BRAIN_DIR / "data" / "obsidian_config.json" def _iter_common_candidates() -> Iterable[Path]: env = os.environ.get("OBSIDIAN_VAULT_PATH") if env: yield Path(env).expanduser() home = Path.home() for p in [ home / "Obsidian", home / "ObsidianVault", home / "Vault", home / "Vaults", home / "Documents" / "Obsidian", home / "Documents" / "Vaults", home / "Syncthing" / "Obsidian", Path("/srv/obsidian"), Path("/srv/Obsidian"), Path("/data/obsidian"), Path("/data/Obsidian"), WORKSPACE / "obsidian", WORKSPACE / "vault", WORKSPACE / "vaults", ]: yield p def _is_vault_dir(p: Path) -> bool: try: return p.exists() and p.is_dir() and (p / ".obsidian").exists() and (p / ".obsidian").is_dir() except Exception: return False def _bounded_find_obsidian_dirs(root: Path, *, max_depth: int) -> list[Path]: """ Find `.obsidian` directories below root, limited by depth to keep runtime bounded. """ results: list[Path] = [] try: root = root.resolve() except Exception: return results if not root.exists() or not root.is_dir(): return results def depth_of(path: Path) -> int: try: return len(path.relative_to(root).parts) except Exception: return 9999 # Breadth-first-ish scan with pruning queue = [root] while queue: current = queue.pop(0) if depth_of(current) > max_depth: continue try: entries = list(current.iterdir()) except Exception: continue for e in entries: name = e.name if name in (".git", "node_modules", "__pycache__", ".cache", ".venv", "venv", "tmp", "proc", "sys", "dev"): continue if name.startswith(".") and name not in (".obsidian",): continue if name == ".obsidian" and e.is_dir(): results.append(e) continue if e.is_dir() and not e.is_symlink(): queue.append(e) return results def discover(*, roots: list[Path], max_depth: int) -> list[Path]: vaults: set[Path] = set() for p in _iter_common_candidates(): if _is_vault_dir(p): vaults.add(p.resolve()) for root in roots: for obsidian_dir in _bounded_find_obsidian_dirs(root, max_depth=max_depth): vaults.add(obsidian_dir.parent.resolve()) return sorted(vaults) def main() -> int: ap = argparse.ArgumentParser(description="Discover Obsidian vault and optionally write config") ap.add_argument("--write", action="store_true", help="Write detected vault_path into obsidian_config.json") ap.add_argument( "--roots", nargs="*", default=[str(Path.home()), "/srv", "/data", "/mnt", str(WORKSPACE)], help="Roots to scan (bounded). Default: home,/srv,/data,/mnt,workspace", ) ap.add_argument("--max-depth", type=int, default=4, help="Max directory depth to scan under each root") args = ap.parse_args() roots = [Path(r).expanduser() for r in args.roots] vaults = discover(roots=roots, max_depth=int(args.max_depth)) if not vaults: print("No Obsidian vault found (no `.obsidian/` directories detected).") return 1 if len(vaults) > 1: print("Multiple Obsidian vaults found; refusing to write config:") for v in vaults: print(f"- {v}") return 2 vault = vaults[0] print(f"Detected Obsidian vault: {vault}") if not args.write: return 0 if not CONFIG_PATH.exists(): raise SystemExit(f"Missing config file: {CONFIG_PATH}") cfg = json.loads(CONFIG_PATH.read_text(encoding="utf-8")) cfg["vault_path"] = str(vault) CONFIG_PATH.write_text(json.dumps(cfg, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") print(f"Wrote vault_path to: {CONFIG_PATH}") return 0 if __name__ == "__main__": raise SystemExit(main())