Repositories / agent-snapshot.git
tests/test_agent_snapshot.py
Clone (read-only): git clone http://git.guha-anderson.com/git/agent-snapshot.git
import json
import os
import pty
import select
import shutil
import signal
import subprocess
import time
from pathlib import Path
import pandas as pd
import pytest
ROOT = Path(__file__).resolve().parents[1]
BIN = ROOT / "_build" / "default" / "src" / "ocaml" / "agent_snapshot.exe"
TESTDATA = ROOT / "testdata"
WORKTREE = TESTDATA / "runtime_repo"
# Use the system Python rather than uv's managed interpreter. The snapshotter
# intentionally observes interpreter and loader activity too, and a uv-managed
# Python in the user's home directory can be writable by the current user. That
# would force blobs for the interpreter itself and make the tests about uv's
# environment instead of Agent Snapshot's file classification rules.
PYTHON = "/usr/bin/python3"
def run(cmd, **kwargs):
return subprocess.run(cmd, cwd=ROOT, text=True, check=True, **kwargs)
@pytest.fixture(scope="session", autouse=True)
def build_agent_snapshot():
# The tests exercise the real CLI binary instead of calling internal helper
# functions. That keeps the acceptance criteria aligned with ptrace behavior,
# process launch, Dune wiring, and manifest writing as users will run them.
run(["bash", "-lc", ". /scratch/arjun/ocaml/env.sh && dune build src/ocaml/agent_snapshot.exe"])
assert BIN.exists()
@pytest.fixture(autouse=True)
def runtime_git_repo(monkeypatch):
# Most snapshot policy depends on Git state, but tests should not mutate the
# project checkout's own index or .git directory. Each test gets a disposable
# repository under testdata and the helper programs operate there via env.
shutil.rmtree(WORKTREE, ignore_errors=True)
WORKTREE.mkdir(parents=True)
(WORKTREE / "clean.txt").write_text("clean tracked fixture\nline two\n")
(WORKTREE / "dirty.txt").write_text("dirty tracked fixture original\n")
(WORKTREE / "nested").mkdir()
(WORKTREE / "nested" / "info.txt").write_text("nested tracked fixture\n")
subprocess.run(["git", "init"], cwd=WORKTREE, text=True, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "tests@example.invalid"], cwd=WORKTREE, check=True)
subprocess.run(["git", "config", "user.name", "Agent Snapshot Tests"], cwd=WORKTREE, check=True)
subprocess.run(["git", "add", "."], cwd=WORKTREE, check=True)
subprocess.run(
["git", "commit", "-m", "Initial fixture"],
cwd=WORKTREE,
text=True,
check=True,
capture_output=True,
)
monkeypatch.setenv("AGENT_SNAPSHOT_TEST_REPO", str(WORKTREE))
yield
shutil.rmtree(WORKTREE, ignore_errors=True)
@pytest.fixture(autouse=True)
def ignore_config(tmp_path, monkeypatch):
config_home = tmp_path / "xdg-config"
config_path = config_home / "agent-snapshot" / "ignore.json"
config_path.parent.mkdir(parents=True)
config_path.write_text("[]\n")
monkeypatch.setenv("HOME", str(ROOT))
monkeypatch.setenv("XDG_CONFIG_HOME", str(config_home))
return config_path
class Snapshot:
# Small manifest assertion helper. Tests should read like snapshot behavior,
# not like repeated JSON tree walking, because the important contract is
# "this path was observed and classified this way".
def __init__(self, path: Path):
self.path = path
self.manifest = json.loads((path / "manifest.json").read_text())
def file(self, path: Path):
target = str(path.resolve())
for item in self.manifest["files"]:
if item["path"] == target:
return item
raise AssertionError(f"{target} not present in snapshot")
def _blobs_frame(self) -> pd.DataFrame:
blobs = self.path / "blobs.parquet"
if not blobs.exists():
return pd.DataFrame(columns=["key", "content"])
return pd.read_parquet(blobs)
def blob_bytes(self, key: str) -> bytes:
state, absolute_path = key.split(":", 1)
assert state in {"before", "after"}
assert Path(absolute_path).is_absolute()
df = self._blobs_frame()
row = df.loc[df["key"] == key, "content"]
assert len(row) == 1, f"blob key {key!r} not found or not unique in Parquet store"
raw = row.iloc[0]
if isinstance(raw, bytes):
return raw
if isinstance(raw, memoryview):
return raw.tobytes()
return bytes(raw)
def blob_text(self, key: str) -> str:
return self.blob_bytes(key).decode()
def capture(tmp_path: Path, *command: str) -> Snapshot:
# Every test gets a fresh bundle directory so stale blobs cannot mask capture
# bugs.
out = tmp_path / "snapshot"
run([str(BIN), "--snapshot-dir", str(out), *command])
return Snapshot(out)
def test_missing_ignore_config_creates_defaults(tmp_path, ignore_config):
ignore_config.unlink()
assert not ignore_config.exists()
out = tmp_path / "snapshot"
completed = subprocess.run(
[str(BIN), "--snapshot-dir", str(out), PYTHON, "test_programs/read_clean.py"],
cwd=ROOT,
text=True,
check=True,
capture_output=True,
)
assert ignore_config.exists()
assert f"Created default ignore file: {ignore_config}\n" in completed.stderr
data = json.loads(ignore_config.read_text())
assert data == [
"$HOME/.cache",
"$HOME/.claude",
"$HOME/.codex",
"$HOME/.cursor",
"$XDG_CONFIG_HOME/agent-snapshot/ignore.json",
"/tmp/scratch-output",
"/proc",
"/dev",
"/usr",
"/bin",
]
def test_ignore_config_suppresses_files_directories_and_itself(tmp_path, ignore_config):
ignored_file = WORKTREE / "ignored_file.txt"
ignored_dir = WORKTREE / "ignored_dir"
ignored_file.write_text("ignored file payload\n")
ignored_dir.mkdir()
(ignored_dir / "nested.txt").write_text("ignored nested payload\n")
ignore_config.write_text(json.dumps([str(ignored_file), str(ignored_dir)]) + "\n")
snap = capture(tmp_path, PYTHON, "test_programs/read_ignored_paths.py")
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert str((WORKTREE / "clean.txt").resolve()) in manifest_paths
assert str(ignored_file.resolve()) not in manifest_paths
assert str((ignored_dir / "nested.txt").resolve()) not in manifest_paths
assert str(ignore_config.resolve()) not in manifest_paths
def test_ignore_config_expands_home_prefix(tmp_path, ignore_config):
ignored_file = WORKTREE / "ignored_file.txt"
ignored_dir = WORKTREE / "ignored_dir"
ignored_file.write_text("ignored file payload\n")
ignored_dir.mkdir()
(ignored_dir / "nested.txt").write_text("nested payload\n")
ignore_config.write_text(json.dumps(["$HOME/testdata/runtime_repo/ignored_file.txt"]) + "\n")
snap = capture(tmp_path, PYTHON, "test_programs/read_ignored_paths.py")
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert str(ignored_file.resolve()) not in manifest_paths
def test_ignore_config_expands_xdg_config_home_fallback(tmp_path, ignore_config):
home = tmp_path / "home"
config_path = home / ".config" / "agent-snapshot" / "ignore.json"
ignored_file = WORKTREE / "ignored_file.txt"
ignored_dir = WORKTREE / "ignored_dir"
ignored_file.write_text("ignored file payload\n")
ignored_dir.mkdir()
(ignored_dir / "nested.txt").write_text("nested payload\n")
config_path.parent.mkdir(parents=True)
config_path.write_text(json.dumps(["$XDG_CONFIG_HOME/agent-snapshot/ignore.json"]) + "\n")
env = os.environ.copy()
env.pop("XDG_CONFIG_HOME", None)
env["HOME"] = str(home)
out = tmp_path / "snapshot"
subprocess.run(
[str(BIN), "--snapshot-dir", str(out), PYTHON, "test_programs/read_ignored_paths.py"],
cwd=ROOT,
text=True,
check=True,
env=env,
)
manifest = json.loads((out / "manifest.json").read_text())
manifest_paths = {item["path"] for item in manifest["files"]}
assert str(config_path.resolve()) not in manifest_paths
def test_clean_git_tracked_read_records_repo_without_blob(tmp_path):
# A clean tracked file is the primary compactness case. The program really
# reads testdata/clean.txt, but the snapshot should rely on Git repository
# root + HEAD + relative path instead of copying file contents into blobs.
snap = capture(tmp_path, PYTHON, "test_programs/read_clean.py")
clean = snap.file(WORKTREE / "clean.txt")
assert "read" in clean["operations"]
assert clean["git"]["tracked"] is True
assert clean["git"]["dirty"] is False
assert "size" not in clean["before"]
assert "size" not in clean["after"]
assert clean["before"].get("blob") is None
assert clean["after"].get("blob") is None
assert any(repo["root"] == str(WORKTREE.resolve()) for repo in snap.manifest["git_repositories"])
def test_git_internal_directory_writes_are_ignored(tmp_path):
snap = capture(tmp_path, PYTHON, "test_programs/write_git_internal.py")
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert str((WORKTREE / ".git" / "delete_me").resolve()) not in manifest_paths
def test_proc_dev_and_special_files_are_ignored(tmp_path):
fifo = WORKTREE / "runtime.fifo"
os.mkfifo(fifo)
snap = capture(tmp_path, PYTHON, "test_programs/read_proc_dev_special.py")
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert not any(path.startswith("/proc/") for path in manifest_paths)
assert not any(path.startswith("/dev/") for path in manifest_paths)
assert str(fifo.resolve()) not in manifest_paths
def test_non_directory_path_component_does_not_crash(tmp_path):
snap = capture(tmp_path, PYTHON, "test_programs/read_non_directory_component.py")
impossible_path = (WORKTREE / "not_directory" / "2851767" / "ns").resolve(strict=False)
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert str(impossible_path) not in manifest_paths
def test_written_clean_git_tracked_file_gets_after_blob(tmp_path):
snap = capture(tmp_path, PYTHON, "test_programs/rewrite_clean_tracked.py")
clean = snap.file(WORKTREE / "clean.txt")
assert "write" in clean["operations"]
assert clean["git"]["tracked"] is True
assert clean["git"]["dirty"] is False
assert clean["after"]["blob"] == f"after:{WORKTREE / 'clean.txt'}"
assert snap.blob_text(clean["after"]["blob"]) == (WORKTREE / "clean.txt").read_text()
def test_file_created_and_committed_by_tracee_still_gets_after_blob(tmp_path):
snap = capture(tmp_path, PYTHON, "test_programs/create_and_commit_file.py")
created = snap.file(WORKTREE / "committed_by_program.txt")
assert "write" in created["operations"]
assert created["git"]["tracked"] is True
assert created["git"]["dirty"] is False
assert snap.blob_text(created["after"]["blob"]) == "created and committed by traced program\n"
assert not any("/.git/" in item["path"] for item in snap.manifest["files"])
def test_dirty_untracked_created_and_deleted_files_are_captured(tmp_path):
# This test covers the cases where Git metadata is not enough:
# - dirty tracked files differ from HEAD, so their content must be blobbed
# - untracked files have no commit object to reconstruct from
# - created files need before=false and after content
# - deleted files need a tombstone so restore can reproduce non-existence
(WORKTREE / "dirty.txt").write_text("dirty tracked fixture changed before run\n")
(WORKTREE / "untracked_runtime.txt").write_text("untracked input\n")
(WORKTREE / "deleted_by_program.txt").write_text("delete me\n")
snap = capture(tmp_path, PYTHON, "test_programs/dirty_untracked_write.py")
dirty = snap.file(WORKTREE / "dirty.txt")
assert dirty["git"]["tracked"] is True
assert dirty["git"]["dirty"] is True
assert dirty["before"]["blob"] == f"before:{WORKTREE / 'dirty.txt'}"
assert snap.blob_text(dirty["before"]["blob"]) == "dirty tracked fixture changed before run\n"
untracked = snap.file(WORKTREE / "untracked_runtime.txt")
assert untracked["git"]["tracked"] is False
assert snap.blob_text(untracked["before"]["blob"]) == "untracked input\n"
created = snap.file(WORKTREE / "created_by_program.txt")
assert "write" in created["operations"]
assert created["before"]["exists"] is False
assert snap.blob_text(created["after"]["blob"]) == "created final\n"
deleted = snap.file(WORKTREE / "deleted_by_program.txt")
assert "delete" in deleted["operations"]
assert deleted["after"]["exists"] is False
assert deleted["after"]["tombstone"] is True
def test_created_then_deleted_file_is_not_manifested_or_blobbed(tmp_path):
transient = WORKTREE / "transient_runtime.txt"
snap = capture(tmp_path, PYTHON, "test_programs/create_read_delete_transient.py")
manifest_paths = {item["path"] for item in snap.manifest["files"]}
blob_keys = set(snap._blobs_frame()["key"])
assert not transient.exists()
assert str(transient.resolve()) not in manifest_paths
assert not any(str(transient.resolve()) in key for key in blob_keys)
def test_fork_usr_and_directory_traversal(tmp_path):
# ptrace must follow the process tree, not just the initial pid. The helper
# forks and writes from the child; missing that write means fork/clone events
# are not being attached early enough. The same helper reads /usr/bin/env to
# assert that root-owned, non-writable system files are excluded, and iterates
# testdata to verify directory traversal is recorded.
snap = capture(tmp_path, PYTHON, "test_programs/fork_and_usr.py")
child = snap.file(WORKTREE / "child_output.txt")
assert "write" in child["operations"]
assert snap.blob_text(child["after"]["blob"]) == "child final\n"
manifest_paths = {item["path"] for item in snap.manifest["files"]}
assert str(Path("/usr/bin/env").resolve()) not in manifest_paths
directory = snap.file(WORKTREE)
assert "directory" in directory["operations"]
def test_restore_word_is_treated_as_traced_command(tmp_path, monkeypatch):
restore_bin = tmp_path / "bin" / "restore"
restore_bin.parent.mkdir()
restore_bin.write_text("#!/bin/sh\nprintf 'restore program ran\\n'\n")
restore_bin.chmod(0o755)
monkeypatch.setenv("PATH", f"{restore_bin.parent}:{os.environ['PATH']}")
out = tmp_path / "snapshot"
completed = subprocess.run(
[str(BIN), "--snapshot-dir", str(out), "restore"],
cwd=ROOT,
text=True,
check=True,
capture_output=True,
)
assert completed.stdout == "restore program ran\n"
assert (out / "manifest.json").exists()
def test_traced_command_options_do_not_need_separator(tmp_path):
out = tmp_path / "snapshot"
completed = subprocess.run(
[str(BIN), "--snapshot-dir", str(out), PYTHON, "-c", "print('direct command')"],
cwd=ROOT,
text=True,
check=True,
capture_output=True,
)
assert completed.stdout == "direct command\n"
assert Snapshot(out).manifest["command"] == [PYTHON, "-c", "print('direct command')"]
def test_interactive_shell_children_are_not_reported_as_stopped_jobs(tmp_path):
out = tmp_path / "snapshot"
pid, fd = pty.fork()
if pid == 0:
os.chdir(WORKTREE)
os.environ["HISTFILE"] = "/dev/null"
os.execv(str(BIN), [str(BIN), "--snapshot-dir", str(out), "bash", "-i"])
output = bytearray()
def read_available(deadline: float) -> str:
while time.monotonic() < deadline:
ready, _, _ = select.select([fd], [], [], 0.05)
if fd not in ready:
continue
try:
chunk = os.read(fd, 4096)
except OSError:
break
if not chunk:
break
output.extend(chunk)
return output.decode(errors="replace")
try:
read_available(time.monotonic() + 1.0)
os.write(fd, b"git diff >/tmp/agent-snapshot-pty-gitdiff.txt 2>&1; printf 'rc:%s\\n' \"$?\"; exit\n")
text = ""
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
text = read_available(time.monotonic() + 0.2)
if "rc:" in text:
break
assert "rc:0" in text
assert "Stopped" not in text
finally:
try:
os.close(fd)
except OSError:
pass
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
pass
try:
os.waitpid(pid, 0)
except ChildProcessError:
pass
def test_run_prints_stderr_summary(tmp_path):
out = tmp_path / "snapshot"
completed = subprocess.run(
[str(BIN), "--snapshot-dir", str(out), "/bin/true"],
cwd=ROOT,
text=True,
check=True,
capture_output=True,
)
assert completed.stderr.endswith(
f"Snapshot directory: {out}\n"
"Worked in 0 repositories. Saved 0 updated files. Saved 0 read files in the snapshot that were not committed.\n"
)
def test_help_includes_copyright_and_license():
completed = subprocess.run(
[str(BIN), "--help=plain"],
cwd=ROOT,
text=True,
check=True,
capture_output=True,
)
assert "Copyright (c) 2026 Arjun Guha" in completed.stdout
assert "BSD-3 licensed." in completed.stdout
def test_no_args_prints_plain_usage_error():
completed = subprocess.run(
[str(BIN)],
cwd=ROOT,
text=True,
capture_output=True,
)
assert completed.returncode == 1
assert completed.stderr == "agent-snapshot: usage: agent-snapshot [--snapshot-dir SNAPDIR] command args...\n"
def test_rename_records_source_tombstone_and_destination_content(tmp_path):
# Rename is not just a write: a replay-equivalent snapshot needs to know that
# the source path stopped existing and that the destination path acquired the
# content. This catches implementations that only model the destination open.
(WORKTREE / "rename_source.txt").write_text("renamed payload\n")
snap = capture(tmp_path, PYTHON, "test_programs/rename_paths.py")
source = snap.file(WORKTREE / "rename_source.txt")
assert "delete" in source["operations"]
assert source["before"]["exists"] is True
assert source["after"]["exists"] is False
assert source["after"]["tombstone"] is True
destination = snap.file(WORKTREE / "rename destination.txt")
assert "write" in destination["operations"]
assert destination["before"]["exists"] is False
assert snap.blob_text(destination["after"]["blob"]) == "renamed payload\n"
def test_staged_and_unstaged_dirty_git_files_are_both_captured(tmp_path):
# Git has more than one kind of "dirty". A staged change and an unstaged
# worktree change are both unreconstructable from HEAD alone, so both should
# receive blobs even though their status bits differ.
staged_path = WORKTREE / "clean.txt"
unstaged_path = WORKTREE / "dirty.txt"
staged_path.write_text("staged dirty content\n")
subprocess.run(["git", "add", "clean.txt"], cwd=WORKTREE, check=True)
unstaged_path.write_text("unstaged dirty content\n")
snap = capture(tmp_path, PYTHON, "test_programs/read_git_dirty_modes.py")
staged = snap.file(staged_path)
assert staged["git"]["tracked"] is True
assert staged["git"]["dirty"] is True
assert snap.blob_text(staged["before"]["blob"]) == "staged dirty content\n"
unstaged = snap.file(unstaged_path)
assert unstaged["git"]["tracked"] is True
assert unstaged["git"]["dirty"] is True
assert snap.blob_text(unstaged["before"]["blob"]) == "unstaged dirty content\n"
def test_text_peculiar_file_names_are_recorded_and_blobbed(tmp_path):
# Spaces and embedded newlines are valid UTF-8 paths and should work with the
# current manifest design. The raw non-UTF-8 byte case is kept separate below
# because nlohmann/json rejects invalid UTF-8 in JSON strings.
(WORKTREE / "name with spaces.txt").write_text("space payload\n")
(WORKTREE / "name with\nnewline.txt").write_text("newline payload\n")
snap = capture(tmp_path, PYTHON, "test_programs/read_peculiar_text_names.py")
spaced = snap.file(WORKTREE / "name with spaces.txt")
assert "read" in spaced["operations"]
assert snap.blob_text(spaced["before"]["blob"]) == "space payload\n"
newline = snap.file(WORKTREE / "name with\nnewline.txt")
assert "read" in newline["operations"]
assert snap.blob_text(newline["before"]["blob"]) == "newline payload\n"
def test_non_utf8_filename_exposes_json_string_limitation(tmp_path):
bytes_path = os.path.join(os.fsencode(WORKTREE), b"non-utf8-\xff.txt")
with open(bytes_path, "wb") as handle:
handle.write(b"non utf8 payload\n")
capture(tmp_path, PYTHON, "test_programs/read_non_utf8_filename.py")