160 lines
4.5 KiB
Python
160 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Auto-discover an Obsidian vault on this server and (optionally) write it into:
|
|
second-brain/data/obsidian_config.json
|
|
|
|
Safety:
|
|
- Only writes when exactly one vault is detected (unambiguous).
|
|
- A "vault" is a directory that contains a `.obsidian/` folder.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
WORKSPACE = Path("/root/.openclaw/workspace")
|
|
BRAIN_DIR = WORKSPACE / "second-brain"
|
|
CONFIG_PATH = BRAIN_DIR / "data" / "obsidian_config.json"
|
|
|
|
|
|
def _iter_common_candidates() -> Iterable[Path]:
|
|
env = os.environ.get("OBSIDIAN_VAULT_PATH")
|
|
if env:
|
|
yield Path(env).expanduser()
|
|
|
|
home = Path.home()
|
|
for p in [
|
|
home / "Obsidian",
|
|
home / "ObsidianVault",
|
|
home / "Vault",
|
|
home / "Vaults",
|
|
home / "Documents" / "Obsidian",
|
|
home / "Documents" / "Vaults",
|
|
home / "Syncthing" / "Obsidian",
|
|
Path("/srv/obsidian"),
|
|
Path("/srv/Obsidian"),
|
|
Path("/data/obsidian"),
|
|
Path("/data/Obsidian"),
|
|
WORKSPACE / "obsidian",
|
|
WORKSPACE / "vault",
|
|
WORKSPACE / "vaults",
|
|
]:
|
|
yield p
|
|
|
|
|
|
def _is_vault_dir(p: Path) -> bool:
|
|
try:
|
|
return p.exists() and p.is_dir() and (p / ".obsidian").exists() and (p / ".obsidian").is_dir()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _bounded_find_obsidian_dirs(root: Path, *, max_depth: int) -> list[Path]:
|
|
"""
|
|
Find `.obsidian` directories below root, limited by depth to keep runtime bounded.
|
|
"""
|
|
results: list[Path] = []
|
|
try:
|
|
root = root.resolve()
|
|
except Exception:
|
|
return results
|
|
|
|
if not root.exists() or not root.is_dir():
|
|
return results
|
|
|
|
def depth_of(path: Path) -> int:
|
|
try:
|
|
return len(path.relative_to(root).parts)
|
|
except Exception:
|
|
return 9999
|
|
|
|
# Breadth-first-ish scan with pruning
|
|
queue = [root]
|
|
while queue:
|
|
current = queue.pop(0)
|
|
if depth_of(current) > max_depth:
|
|
continue
|
|
try:
|
|
entries = list(current.iterdir())
|
|
except Exception:
|
|
continue
|
|
|
|
for e in entries:
|
|
name = e.name
|
|
if name in (".git", "node_modules", "__pycache__", ".cache", ".venv", "venv", "tmp", "proc", "sys", "dev"):
|
|
continue
|
|
if name.startswith(".") and name not in (".obsidian",):
|
|
continue
|
|
if name == ".obsidian" and e.is_dir():
|
|
results.append(e)
|
|
continue
|
|
if e.is_dir() and not e.is_symlink():
|
|
queue.append(e)
|
|
|
|
return results
|
|
|
|
|
|
def discover(*, roots: list[Path], max_depth: int) -> list[Path]:
|
|
vaults: set[Path] = set()
|
|
|
|
for p in _iter_common_candidates():
|
|
if _is_vault_dir(p):
|
|
vaults.add(p.resolve())
|
|
|
|
for root in roots:
|
|
for obsidian_dir in _bounded_find_obsidian_dirs(root, max_depth=max_depth):
|
|
vaults.add(obsidian_dir.parent.resolve())
|
|
|
|
return sorted(vaults)
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="Discover Obsidian vault and optionally write config")
|
|
ap.add_argument("--write", action="store_true", help="Write detected vault_path into obsidian_config.json")
|
|
ap.add_argument(
|
|
"--roots",
|
|
nargs="*",
|
|
default=[str(Path.home()), "/srv", "/data", "/mnt", str(WORKSPACE)],
|
|
help="Roots to scan (bounded). Default: home,/srv,/data,/mnt,workspace",
|
|
)
|
|
ap.add_argument("--max-depth", type=int, default=4, help="Max directory depth to scan under each root")
|
|
args = ap.parse_args()
|
|
|
|
roots = [Path(r).expanduser() for r in args.roots]
|
|
vaults = discover(roots=roots, max_depth=int(args.max_depth))
|
|
|
|
if not vaults:
|
|
print("No Obsidian vault found (no `.obsidian/` directories detected).")
|
|
return 1
|
|
|
|
if len(vaults) > 1:
|
|
print("Multiple Obsidian vaults found; refusing to write config:")
|
|
for v in vaults:
|
|
print(f"- {v}")
|
|
return 2
|
|
|
|
vault = vaults[0]
|
|
print(f"Detected Obsidian vault: {vault}")
|
|
|
|
if not args.write:
|
|
return 0
|
|
|
|
if not CONFIG_PATH.exists():
|
|
raise SystemExit(f"Missing config file: {CONFIG_PATH}")
|
|
|
|
cfg = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
|
cfg["vault_path"] = str(vault)
|
|
CONFIG_PATH.write_text(json.dumps(cfg, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
|
|
print(f"Wrote vault_path to: {CONFIG_PATH}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|