Repositories / agent-snapshot.git

tests/test_agent_snapshot.py

Clone (read-only): git clone http://git.guha-anderson.com/git/agent-snapshot.git

Branch
21049 bytes · 0be46589c585
import json import os import pty import select import shutil import signal import subprocess import time from pathlib import Path import pandas as pd import pytest ROOT = Path(__file__).resolve().parents[1] BIN = ROOT / "_build" / "default" / "src" / "ocaml" / "agent_snapshot.exe" TESTDATA = ROOT / "testdata" WORKTREE = TESTDATA / "runtime_repo" # Use the system Python rather than uv's managed interpreter. The snapshotter # intentionally observes interpreter and loader activity too, and a uv-managed # Python in the user's home directory can be writable by the current user. That # would force blobs for the interpreter itself and make the tests about uv's # environment instead of Agent Snapshot's file classification rules. PYTHON = "/usr/bin/python3" def run(cmd, **kwargs): return subprocess.run(cmd, cwd=ROOT, text=True, check=True, **kwargs) @pytest.fixture(scope="session", autouse=True) def build_agent_snapshot(): # The tests exercise the real CLI binary instead of calling internal helper # functions. That keeps the acceptance criteria aligned with ptrace behavior, # process launch, Dune wiring, and manifest writing as users will run them. run(["bash", "-lc", ". /scratch/arjun/ocaml/env.sh && dune build src/ocaml/agent_snapshot.exe"]) assert BIN.exists() @pytest.fixture(autouse=True) def runtime_git_repo(monkeypatch): # Most snapshot policy depends on Git state, but tests should not mutate the # project checkout's own index or .git directory. Each test gets a disposable # repository under testdata and the helper programs operate there via env. shutil.rmtree(WORKTREE, ignore_errors=True) WORKTREE.mkdir(parents=True) (WORKTREE / "clean.txt").write_text("clean tracked fixture\nline two\n") (WORKTREE / "dirty.txt").write_text("dirty tracked fixture original\n") (WORKTREE / "nested").mkdir() (WORKTREE / "nested" / "info.txt").write_text("nested tracked fixture\n") subprocess.run(["git", "init"], cwd=WORKTREE, text=True, check=True, capture_output=True) subprocess.run(["git", "config", "user.email", "tests@example.invalid"], cwd=WORKTREE, check=True) subprocess.run(["git", "config", "user.name", "Agent Snapshot Tests"], cwd=WORKTREE, check=True) subprocess.run(["git", "add", "."], cwd=WORKTREE, check=True) subprocess.run( ["git", "commit", "-m", "Initial fixture"], cwd=WORKTREE, text=True, check=True, capture_output=True, ) monkeypatch.setenv("AGENT_SNAPSHOT_TEST_REPO", str(WORKTREE)) yield shutil.rmtree(WORKTREE, ignore_errors=True) @pytest.fixture(autouse=True) def ignore_config(tmp_path, monkeypatch): config_home = tmp_path / "xdg-config" config_path = config_home / "agent-snapshot" / "ignore.json" config_path.parent.mkdir(parents=True) config_path.write_text("[]\n") monkeypatch.setenv("HOME", str(ROOT)) monkeypatch.setenv("XDG_CONFIG_HOME", str(config_home)) return config_path class Snapshot: # Small manifest assertion helper. Tests should read like snapshot behavior, # not like repeated JSON tree walking, because the important contract is # "this path was observed and classified this way". def __init__(self, path: Path): self.path = path self.manifest = json.loads((path / "manifest.json").read_text()) def file(self, path: Path): target = str(path.resolve()) for item in self.manifest["files"]: if item["path"] == target: return item raise AssertionError(f"{target} not present in snapshot") def _blobs_frame(self) -> pd.DataFrame: blobs = self.path / "blobs.parquet" if not blobs.exists(): return pd.DataFrame(columns=["key", "content"]) return pd.read_parquet(blobs) def blob_bytes(self, key: str) -> bytes: state, absolute_path = key.split(":", 1) assert state in {"before", "after"} assert Path(absolute_path).is_absolute() df = self._blobs_frame() row = df.loc[df["key"] == key, "content"] assert len(row) == 1, f"blob key {key!r} not found or not unique in Parquet store" raw = row.iloc[0] if isinstance(raw, bytes): return raw if isinstance(raw, memoryview): return raw.tobytes() return bytes(raw) def blob_text(self, key: str) -> str: return self.blob_bytes(key).decode() def capture(tmp_path: Path, *command: str) -> Snapshot: # Every test gets a fresh bundle directory so stale blobs cannot mask capture # bugs. out = tmp_path / "snapshot" run([str(BIN), "--snapshot-dir", str(out), *command]) return Snapshot(out) def test_missing_ignore_config_creates_defaults(tmp_path, ignore_config): ignore_config.unlink() assert not ignore_config.exists() out = tmp_path / "snapshot" completed = subprocess.run( [str(BIN), "--snapshot-dir", str(out), PYTHON, "test_programs/read_clean.py"], cwd=ROOT, text=True, check=True, capture_output=True, ) assert ignore_config.exists() assert f"Created default ignore file: {ignore_config}\n" in completed.stderr data = json.loads(ignore_config.read_text()) assert data == [ "$HOME/.cache", "$HOME/.claude", "$HOME/.codex", "$HOME/.cursor", "$XDG_CONFIG_HOME/agent-snapshot/ignore.json", "/tmp/scratch-output", "/proc", "/dev", "/usr", "/bin", ] def test_ignore_config_suppresses_files_directories_and_itself(tmp_path, ignore_config): ignored_file = WORKTREE / "ignored_file.txt" ignored_dir = WORKTREE / "ignored_dir" ignored_file.write_text("ignored file payload\n") ignored_dir.mkdir() (ignored_dir / "nested.txt").write_text("ignored nested payload\n") ignore_config.write_text(json.dumps([str(ignored_file), str(ignored_dir)]) + "\n") snap = capture(tmp_path, PYTHON, "test_programs/read_ignored_paths.py") manifest_paths = {item["path"] for item in snap.manifest["files"]} assert str((WORKTREE / "clean.txt").resolve()) in manifest_paths assert str(ignored_file.resolve()) not in manifest_paths assert str((ignored_dir / "nested.txt").resolve()) not in manifest_paths assert str(ignore_config.resolve()) not in manifest_paths def test_ignore_config_expands_home_prefix(tmp_path, ignore_config): ignored_file = WORKTREE / "ignored_file.txt" ignored_dir = WORKTREE / "ignored_dir" ignored_file.write_text("ignored file payload\n") ignored_dir.mkdir() (ignored_dir / "nested.txt").write_text("nested payload\n") ignore_config.write_text(json.dumps(["$HOME/testdata/runtime_repo/ignored_file.txt"]) + "\n") snap = capture(tmp_path, PYTHON, "test_programs/read_ignored_paths.py") manifest_paths = {item["path"] for item in snap.manifest["files"]} assert str(ignored_file.resolve()) not in manifest_paths def test_ignore_config_expands_xdg_config_home_fallback(tmp_path, ignore_config): home = tmp_path / "home" config_path = home / ".config" / "agent-snapshot" / "ignore.json" ignored_file = WORKTREE / "ignored_file.txt" ignored_dir = WORKTREE / "ignored_dir" ignored_file.write_text("ignored file payload\n") ignored_dir.mkdir() (ignored_dir / "nested.txt").write_text("nested payload\n") config_path.parent.mkdir(parents=True) config_path.write_text(json.dumps(["$XDG_CONFIG_HOME/agent-snapshot/ignore.json"]) + "\n") env = os.environ.copy() env.pop("XDG_CONFIG_HOME", None) env["HOME"] = str(home) out = tmp_path / "snapshot" subprocess.run( [str(BIN), "--snapshot-dir", str(out), PYTHON, "test_programs/read_ignored_paths.py"], cwd=ROOT, text=True, check=True, env=env, ) manifest = json.loads((out / "manifest.json").read_text()) manifest_paths = {item["path"] for item in manifest["files"]} assert str(config_path.resolve()) not in manifest_paths def test_clean_git_tracked_read_records_repo_without_blob(tmp_path): # A clean tracked file is the primary compactness case. The program really # reads testdata/clean.txt, but the snapshot should rely on Git repository # root + HEAD + relative path instead of copying file contents into blobs. snap = capture(tmp_path, PYTHON, "test_programs/read_clean.py") clean = snap.file(WORKTREE / "clean.txt") assert "read" in clean["operations"] assert clean["git"]["tracked"] is True assert clean["git"]["dirty"] is False assert "size" not in clean["before"] assert "size" not in clean["after"] assert clean["before"].get("blob") is None assert clean["after"].get("blob") is None assert any(repo["root"] == str(WORKTREE.resolve()) for repo in snap.manifest["git_repositories"]) def test_git_internal_directory_writes_are_ignored(tmp_path): snap = capture(tmp_path, PYTHON, "test_programs/write_git_internal.py") manifest_paths = {item["path"] for item in snap.manifest["files"]} assert str((WORKTREE / ".git" / "delete_me").resolve()) not in manifest_paths def test_proc_dev_and_special_files_are_ignored(tmp_path): fifo = WORKTREE / "runtime.fifo" os.mkfifo(fifo) snap = capture(tmp_path, PYTHON, "test_programs/read_proc_dev_special.py") manifest_paths = {item["path"] for item in snap.manifest["files"]} assert not any(path.startswith("/proc/") for path in manifest_paths) assert not any(path.startswith("/dev/") for path in manifest_paths) assert str(fifo.resolve()) not in manifest_paths def test_non_directory_path_component_does_not_crash(tmp_path): snap = capture(tmp_path, PYTHON, "test_programs/read_non_directory_component.py") impossible_path = (WORKTREE / "not_directory" / "2851767" / "ns").resolve(strict=False) manifest_paths = {item["path"] for item in snap.manifest["files"]} assert str(impossible_path) not in manifest_paths def test_written_clean_git_tracked_file_gets_after_blob(tmp_path): snap = capture(tmp_path, PYTHON, "test_programs/rewrite_clean_tracked.py") clean = snap.file(WORKTREE / "clean.txt") assert "write" in clean["operations"] assert clean["git"]["tracked"] is True assert clean["git"]["dirty"] is False assert clean["after"]["blob"] == f"after:{WORKTREE / 'clean.txt'}" assert snap.blob_text(clean["after"]["blob"]) == (WORKTREE / "clean.txt").read_text() def test_file_created_and_committed_by_tracee_still_gets_after_blob(tmp_path): snap = capture(tmp_path, PYTHON, "test_programs/create_and_commit_file.py") created = snap.file(WORKTREE / "committed_by_program.txt") assert "write" in created["operations"] assert created["git"]["tracked"] is True assert created["git"]["dirty"] is False assert snap.blob_text(created["after"]["blob"]) == "created and committed by traced program\n" assert not any("/.git/" in item["path"] for item in snap.manifest["files"]) def test_dirty_untracked_created_and_deleted_files_are_captured(tmp_path): # This test covers the cases where Git metadata is not enough: # - dirty tracked files differ from HEAD, so their content must be blobbed # - untracked files have no commit object to reconstruct from # - created files need before=false and after content # - deleted files need a tombstone so restore can reproduce non-existence (WORKTREE / "dirty.txt").write_text("dirty tracked fixture changed before run\n") (WORKTREE / "untracked_runtime.txt").write_text("untracked input\n") (WORKTREE / "deleted_by_program.txt").write_text("delete me\n") snap = capture(tmp_path, PYTHON, "test_programs/dirty_untracked_write.py") dirty = snap.file(WORKTREE / "dirty.txt") assert dirty["git"]["tracked"] is True assert dirty["git"]["dirty"] is True assert dirty["before"]["blob"] == f"before:{WORKTREE / 'dirty.txt'}" assert snap.blob_text(dirty["before"]["blob"]) == "dirty tracked fixture changed before run\n" untracked = snap.file(WORKTREE / "untracked_runtime.txt") assert untracked["git"]["tracked"] is False assert snap.blob_text(untracked["before"]["blob"]) == "untracked input\n" created = snap.file(WORKTREE / "created_by_program.txt") assert "write" in created["operations"] assert created["before"]["exists"] is False assert snap.blob_text(created["after"]["blob"]) == "created final\n" deleted = snap.file(WORKTREE / "deleted_by_program.txt") assert "delete" in deleted["operations"] assert deleted["after"]["exists"] is False assert deleted["after"]["tombstone"] is True def test_created_then_deleted_file_is_not_manifested_or_blobbed(tmp_path): transient = WORKTREE / "transient_runtime.txt" snap = capture(tmp_path, PYTHON, "test_programs/create_read_delete_transient.py") manifest_paths = {item["path"] for item in snap.manifest["files"]} blob_keys = set(snap._blobs_frame()["key"]) assert not transient.exists() assert str(transient.resolve()) not in manifest_paths assert not any(str(transient.resolve()) in key for key in blob_keys) def test_fork_usr_and_directory_traversal(tmp_path): # ptrace must follow the process tree, not just the initial pid. The helper # forks and writes from the child; missing that write means fork/clone events # are not being attached early enough. The same helper reads /usr/bin/env to # assert that root-owned, non-writable system files are excluded, and iterates # testdata to verify directory traversal is recorded. snap = capture(tmp_path, PYTHON, "test_programs/fork_and_usr.py") child = snap.file(WORKTREE / "child_output.txt") assert "write" in child["operations"] assert snap.blob_text(child["after"]["blob"]) == "child final\n" manifest_paths = {item["path"] for item in snap.manifest["files"]} assert str(Path("/usr/bin/env").resolve()) not in manifest_paths directory = snap.file(WORKTREE) assert "directory" in directory["operations"] def test_restore_word_is_treated_as_traced_command(tmp_path, monkeypatch): restore_bin = tmp_path / "bin" / "restore" restore_bin.parent.mkdir() restore_bin.write_text("#!/bin/sh\nprintf 'restore program ran\\n'\n") restore_bin.chmod(0o755) monkeypatch.setenv("PATH", f"{restore_bin.parent}:{os.environ['PATH']}") out = tmp_path / "snapshot" completed = subprocess.run( [str(BIN), "--snapshot-dir", str(out), "restore"], cwd=ROOT, text=True, check=True, capture_output=True, ) assert completed.stdout == "restore program ran\n" assert (out / "manifest.json").exists() def test_traced_command_options_do_not_need_separator(tmp_path): out = tmp_path / "snapshot" completed = subprocess.run( [str(BIN), "--snapshot-dir", str(out), PYTHON, "-c", "print('direct command')"], cwd=ROOT, text=True, check=True, capture_output=True, ) assert completed.stdout == "direct command\n" assert Snapshot(out).manifest["command"] == [PYTHON, "-c", "print('direct command')"] def test_interactive_shell_children_are_not_reported_as_stopped_jobs(tmp_path): out = tmp_path / "snapshot" pid, fd = pty.fork() if pid == 0: os.chdir(WORKTREE) os.environ["HISTFILE"] = "/dev/null" os.execv(str(BIN), [str(BIN), "--snapshot-dir", str(out), "bash", "-i"]) output = bytearray() def read_available(deadline: float) -> str: while time.monotonic() < deadline: ready, _, _ = select.select([fd], [], [], 0.05) if fd not in ready: continue try: chunk = os.read(fd, 4096) except OSError: break if not chunk: break output.extend(chunk) return output.decode(errors="replace") try: read_available(time.monotonic() + 1.0) os.write(fd, b"git diff >/tmp/agent-snapshot-pty-gitdiff.txt 2>&1; printf 'rc:%s\\n' \"$?\"; exit\n") text = "" deadline = time.monotonic() + 5.0 while time.monotonic() < deadline: text = read_available(time.monotonic() + 0.2) if "rc:" in text: break assert "rc:0" in text assert "Stopped" not in text finally: try: os.close(fd) except OSError: pass try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass try: os.waitpid(pid, 0) except ChildProcessError: pass def test_run_prints_stderr_summary(tmp_path): out = tmp_path / "snapshot" completed = subprocess.run( [str(BIN), "--snapshot-dir", str(out), "/bin/true"], cwd=ROOT, text=True, check=True, capture_output=True, ) assert completed.stderr.endswith( f"Snapshot directory: {out}\n" "Worked in 0 repositories. Saved 0 updated files. Saved 0 read files in the snapshot that were not committed.\n" ) def test_help_includes_copyright_and_license(): completed = subprocess.run( [str(BIN), "--help=plain"], cwd=ROOT, text=True, check=True, capture_output=True, ) assert "Copyright (c) 2026 Arjun Guha" in completed.stdout assert "BSD-3 licensed." in completed.stdout def test_no_args_prints_plain_usage_error(): completed = subprocess.run( [str(BIN)], cwd=ROOT, text=True, capture_output=True, ) assert completed.returncode == 1 assert completed.stderr == "agent-snapshot: usage: agent-snapshot [--snapshot-dir SNAPDIR] command args...\n" def test_rename_records_source_tombstone_and_destination_content(tmp_path): # Rename is not just a write: a replay-equivalent snapshot needs to know that # the source path stopped existing and that the destination path acquired the # content. This catches implementations that only model the destination open. (WORKTREE / "rename_source.txt").write_text("renamed payload\n") snap = capture(tmp_path, PYTHON, "test_programs/rename_paths.py") source = snap.file(WORKTREE / "rename_source.txt") assert "delete" in source["operations"] assert source["before"]["exists"] is True assert source["after"]["exists"] is False assert source["after"]["tombstone"] is True destination = snap.file(WORKTREE / "rename destination.txt") assert "write" in destination["operations"] assert destination["before"]["exists"] is False assert snap.blob_text(destination["after"]["blob"]) == "renamed payload\n" def test_staged_and_unstaged_dirty_git_files_are_both_captured(tmp_path): # Git has more than one kind of "dirty". A staged change and an unstaged # worktree change are both unreconstructable from HEAD alone, so both should # receive blobs even though their status bits differ. staged_path = WORKTREE / "clean.txt" unstaged_path = WORKTREE / "dirty.txt" staged_path.write_text("staged dirty content\n") subprocess.run(["git", "add", "clean.txt"], cwd=WORKTREE, check=True) unstaged_path.write_text("unstaged dirty content\n") snap = capture(tmp_path, PYTHON, "test_programs/read_git_dirty_modes.py") staged = snap.file(staged_path) assert staged["git"]["tracked"] is True assert staged["git"]["dirty"] is True assert snap.blob_text(staged["before"]["blob"]) == "staged dirty content\n" unstaged = snap.file(unstaged_path) assert unstaged["git"]["tracked"] is True assert unstaged["git"]["dirty"] is True assert snap.blob_text(unstaged["before"]["blob"]) == "unstaged dirty content\n" def test_text_peculiar_file_names_are_recorded_and_blobbed(tmp_path): # Spaces and embedded newlines are valid UTF-8 paths and should work with the # current manifest design. The raw non-UTF-8 byte case is kept separate below # because nlohmann/json rejects invalid UTF-8 in JSON strings. (WORKTREE / "name with spaces.txt").write_text("space payload\n") (WORKTREE / "name with\nnewline.txt").write_text("newline payload\n") snap = capture(tmp_path, PYTHON, "test_programs/read_peculiar_text_names.py") spaced = snap.file(WORKTREE / "name with spaces.txt") assert "read" in spaced["operations"] assert snap.blob_text(spaced["before"]["blob"]) == "space payload\n" newline = snap.file(WORKTREE / "name with\nnewline.txt") assert "read" in newline["operations"] assert snap.blob_text(newline["before"]["blob"]) == "newline payload\n" def test_non_utf8_filename_exposes_json_string_limitation(tmp_path): bytes_path = os.path.join(os.fsencode(WORKTREE), b"non-utf8-\xff.txt") with open(bytes_path, "wb") as handle: handle.write(b"non utf8 payload\n") capture(tmp_path, PYTHON, "test_programs/read_non_utf8_filename.py")