Repositories / agent-snapshot.git

agent-snapshot.git

Clone (read-only): git clone http://git.guha-anderson.com/git/agent-snapshot.git

Branch

Write blobs as Parquet row groups

Co-authored-by: Cursor <cursoragent@cursor.com>
Author
Arjun Guha <a.guha@northeastern.edu>
Date
2026-05-03 12:32:29 -0400
Commit
16f105d8e0360cdedc448e5cbacf03b51b85e9c0
README.md
index f863f74..f61fb49 100644
--- a/README.md
+++ b/README.md
@@ -60,15 +60,12 @@ A snapshot is a directory bundle:
 ```text
 snapshot-dir/
   manifest.json
-  blobs/
-    part-00000.parquet
-    part-00001.parquet
-    ...
+  blobs.parquet
 ```
 
-The `blobs/` directory is created only when at least one blob is captured. Each
-`part-*.parquet` file is a Parquet table with `key` and binary `content` columns
-(Snappy-compressed column chunks). Rows are written in bounded batches during
+`blobs.parquet` is created only when at least one blob is captured. It is a
+Parquet table with `key` and binary `content` columns
+(Snappy-compressed column chunks). Rows are written in bounded row groups during
 capture so the tool does not keep every blob in memory.
 
 `manifest.json` contains:
@@ -92,7 +89,7 @@ Each file record contains:
 Metadata records include whether the path exists, file type, mode, size, mtime,
 and optionally a `blob` key. Blob keys are state-qualified absolute paths such
 as `before:/repo/input.txt` or `after:/repo/generated.txt`; payloads for those
-keys are stored under `blobs/` as described above.
+keys are stored in `blobs.parquet` as described above.
 
 Clean Git-tracked reads typically have no blob:
 
src/ocaml/agent_snapshot.ml
index 9c18550..2cb5d54 100644
--- a/src/ocaml/agent_snapshot.ml
+++ b/src/ocaml/agent_snapshot.ml
@@ -338,27 +338,31 @@ type blob_store_entry =
   }
 [@@deriving arrow]
 
-(** Row batches written as separate Parquet files under [blobs/] so capture never
-    holds all blob payloads in memory. *)
+(** Row batches written as row groups so capture never holds all
+    blob payloads in memory. *)
 let blob_batch_max = 64
 
 let blob_batch_keys_rev : string list ref = ref []
 let blob_batch_contents_rev : string list ref = ref []
 let blob_batch_count : int ref = ref 0
-let blob_part_counter : int ref = ref 0
 let blob_dir_ready : bool ref = ref false
+let blob_row_group_writer : Arrow_c_api.Writer.Row_group_writer.t option ref = ref None
+let blob_row_groups_written : bool ref = ref false
 
 let reset_blob_writer () : unit =
   blob_batch_keys_rev := [];
   blob_batch_contents_rev := [];
   blob_batch_count := 0;
-  blob_part_counter := 0;
-  blob_dir_ready := false
+  blob_dir_ready := false;
+  blob_row_group_writer := None;
+  blob_row_groups_written := false
 
-let ensure_blob_dir () : unit =
-  if not !blob_dir_ready then (
-    mkdir_p (concat_path !snapshot_dir "blobs");
-    blob_dir_ready := true)
+let ensure_blob_dir () : unit = blob_dir_ready := true
+
+let active_blob_row_group_writer () : Arrow_c_api.Writer.Row_group_writer.t =
+  match !blob_row_group_writer with
+  | Some writer -> writer
+  | None -> invalid_arg "blob row group writer is not active"
 
 let flush_blob_batch () : unit =
   if !blob_batch_count = 0 then ()
@@ -368,14 +372,31 @@ let flush_blob_batch () : unit =
     blob_batch_keys_rev := [];
     blob_batch_contents_rev := [];
     blob_batch_count := 0;
-    let part = !blob_part_counter in
-    incr blob_part_counter;
-    let path =
-      concat_path (concat_path !snapshot_dir "blobs") (Printf.sprintf "part-%05d.parquet" part)
-    in
-    let entries = Array.init (Array.length keys) (fun i -> { key = keys.(i); content = contents.(i) }) in
-    let table = arrow_table_of_blob_store_entry entries in
-    Arrow_c_api.Table.write_parquet ~compression:Arrow_c_api.Compression.Snappy table path)
+    let writer = active_blob_row_group_writer () in
+    Arrow_c_api.Writer.Row_group_writer.write_exn
+      writer
+      ~cols:
+        [
+          Arrow_c_api.Writer.utf8 keys ~name:"key";
+          Arrow_c_api.Writer.binary contents ~name:"content";
+        ];
+    blob_row_groups_written := true)
+
+let close_blob_writer () : unit =
+  flush_blob_batch ();
+  blob_row_group_writer := None
+
+let with_blob_row_group_writer (f : unit -> 'a) : 'a =
+  ensure_blob_dir ();
+  let path = concat_path !snapshot_dir "blobs.parquet" in
+  try
+    Arrow_c_api.Writer.with_row_group_writer ~compression:Arrow_c_api.Compression.Snappy path ~f:(fun writer ->
+      blob_row_group_writer := Some writer;
+      Fun.protect ~finally:(fun () -> blob_row_group_writer := None) f)
+  with
+  | Invalid_argument msg when (not !blob_row_groups_written) && msg = "Writer.with_row_group_writer: no row groups were written" ->
+      if FileUtil.test FileUtil.Exists path then FileUtil.rm ~force:FileUtil.Force [ path ];
+      ()
 
 let blob_key (state : string) (path : string) : string = state ^ ":" ^ best_effort_canonical path
 
@@ -389,21 +410,14 @@ let store_blob (state : string) (path : string) : string =
   if !blob_batch_count >= blob_batch_max then flush_blob_batch ();
   key
 
-(** Restore reads every part file under [blobs/] into a map. This runs once per restore,
-    not during capture. *)
+(** Restore reads the blob store into a map. This runs once per restore, not during capture. *)
 let load_blob_store (dir : string) : (string, string) Hashtbl.t =
   let tbl = Hashtbl.create 128 in
-  let root = concat_path dir "blobs" in
-  if FileUtil.test FileUtil.Is_dir root then
-    Sys.readdir root
-    |> Array.to_list
-    |> List.sort String.compare
-    |> List.iter (fun name ->
-           if String.ends_with ~suffix:".parquet" name then (
-             let path = concat_path root name in
-             let table = Arrow_c_api.Parquet_reader.table path in
-             let entries = arrow_blob_store_entry_of_table table in
-             Array.iter (fun { key; content } -> Hashtbl.replace tbl key content) entries));
+  let path = concat_path dir "blobs.parquet" in
+  if FileUtil.test FileUtil.Exists path then (
+    let table = Arrow_c_api.Parquet_reader.table path in
+    let entries = arrow_blob_store_entry_of_table table in
+    Array.iter (fun { key; content } -> Hashtbl.replace tbl key content) entries);
   tbl
 
 let should_capture_content (path : string) (meta : metadata) (git : git_info) : bool =
@@ -697,9 +711,10 @@ let run_snapshot (output : string) (command : string list) : int =
   reset_blob_writer ();
   remove_all output;
   mkdir_p output;
-  trace_command command;
-  finalize_records ();
-  flush_blob_batch ();
+  with_blob_row_group_writer (fun () ->
+    trace_command command;
+    finalize_records ();
+    close_blob_writer ());
   write_manifest output command 0;
   0
 
tests/test_agent_snapshot.py
index f23dcc3..82cceec 100644
--- a/tests/test_agent_snapshot.py
+++ b/tests/test_agent_snapshot.py
@@ -106,13 +106,10 @@ class Snapshot:
         raise AssertionError(f"{target} not present in snapshot")
 
     def _blobs_frame(self) -> pd.DataFrame:
-        blob_dir = self.path / "blobs"
-        if not blob_dir.is_dir():
+        blobs = self.path / "blobs.parquet"
+        if not blobs.exists():
             return pd.DataFrame(columns=["key", "content"])
-        parts = sorted(blob_dir.glob("part-*.parquet"))
-        if not parts:
-            return pd.DataFrame(columns=["key", "content"])
-        return pd.concat([pd.read_parquet(p) for p in parts], ignore_index=True)
+        return pd.read_parquet(blobs)
 
     def blob_bytes(self, key: str) -> bytes:
         state, absolute_path = key.split(":", 1)