Crdt.Rga

crdt · API reference

RGA (Replicated Growable Array) core implementation

RGA (Replicated Growable Array) implementation.

This module implements the core RGA data structure used by str, bin, and arr nodes. The RGA provides:

  • Insert after reference ID with deterministic concurrent ordering
  • Delete by ID range with tombstone handling
  • Convergence: all replicas applying same operations reach same state Key concepts:
  • Each element has a unique timestamp ID
  • Insert specifies which existing element to insert after
  • Concurrent inserts at same position are ordered by timestamp comparison
  • Deletes mark elements as tombstones (not removed) The implementation uses chunks for efficiency - contiguous elements with sequential IDs are grouped together. Chunks may be split during delete operations.

Types

type 'a chunk = {
  id : Clock.timestamp; (* Starting timestamp of this chunk *)
  span : int; (* Number of elements (IDs consumed) *)
  data : 'a; (* The actual data *)
  deleted : bool; (* Tombstone flag *)
  parent : Clock.timestamp option; (* What this chunk was inserted after (None for root/splits) *)
}

A chunk of contiguous elements in the RGA. For str: data is string, span is UTF-16 code unit count For bin: data is bytes, span is byte count For arr: data is timestamp (element reference), span is always 1

type 'a t = {
  chunks : 'a chunk Dynarray.t; (* Main chunk sequence *)
  mutable pending : 'a chunk list; (* Pending appends (in reverse order) *)
  mutable last_chunk : 'a chunk option; (* Cached last chunk for fast appends *)
  mutable search_hint : int option; (* Advisory last-found chunk index for nearby timestamp lookups. *)
}

The RGA container - a sequence of chunks. The Dynarray spine avoids allocating list prefixes on mid-sequence insertions; pending keeps pure append workloads cheap until a full sequence operation is required.

val consolidate : 'a t -> unit
val chunk_count : 'a t -> int
val chunk_at : 'a t -> int -> 'a chunk
val append_chunk : 'a t -> 'a chunk -> unit
val insert_chunk_at : 'a t -> int -> 'a chunk -> unit
val insert_two_chunks_at : 'a t -> int -> 'a chunk -> 'a chunk -> unit
val replace_chunks : 'a t -> 'a chunk list -> unit

Constructors

val empty : unit -> 'a t

Create an empty RGA

val copy : 'a t -> 'a t

Independent copy: shares the (immutable) chunk records but no mutable structure, so further edits to either side are isolated.

val singleton : 
  id:Clock.timestamp ->
  data:'a ->
  span_fn:('a -> int) ->
  parent:Clock.timestamp option ->
  'a t

Create an RGA with initial data.

parameter id Starting timestamp parameter data The data parameter span_fn Function to compute span from data

val from_chunks : 'a chunk list -> 'a t

Build an RGA from a list of chunks in order. Use this when decoding snapshots where chunks are already in correct order.

Queries

val is_empty : 'a t -> bool

Check if RGA is empty (no chunks, or all deleted with no content)

val total_span : 'a t -> int

Get total span including deleted elements

val visible_span : 'a t -> int

Get visible span (excluding deleted elements)

val last_id : 'a t -> Clock.timestamp option

Get the last ID in the RGA (inclusive)

val find_chunk_containing : 'a t -> Clock.timestamp -> 'a chunk option

Find a chunk containing the given timestamp. Returns None if not found.

val contains : 'a t -> Clock.timestamp -> bool

Check if a timestamp exists in the RGA

Insert Operations

val compare_for_rga : Clock.timestamp -> Clock.timestamp -> int

Compare two timestamps for RGA ordering. Higher timestamp wins (inserted later in logical time). Tie-break by session ID (higher wins).

NOTE: This may not match json-joy's exact ordering for concurrent inserts. See issue crdt-4pq for json-joy compatibility work.

type insert_pos = 
  | Pos_at of int (* Insert at this chunk index *)
  | Pos_split of int * int (* Split chunk at index, insert after offset within chunk *)

Result of finding insertion position. May require splitting a chunk.

val chunk_contains_ts : 'a chunk -> Clock.timestamp -> bool
val find_insert_position : 
  'a t ->
  after_ts:Clock.timestamp option ->
  new_ts:Clock.timestamp ->
  insert_pos

Find the insertion position for a new chunk.

parameter after_ts The timestamp to insert after (None means insert at head) parameter new_ts The timestamp of the new chunk being inserted

val is_append_position : 'a t -> Clock.timestamp option -> bool

Check if we can use the fast append path. Returns true if 'after' points to the last element of the last chunk.

val insert_with_split : 
  'a t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  data:'a ->
  span_fn:('a -> int) ->
  split_fn:('a -> int -> 'a * 'a) ->
  unit

Insert a new chunk into the RGA.

parameter rga The RGA to modify parameter after The timestamp to insert after (or None for head) parameter chunk_id The timestamp for the new chunk parameter data The data to insert parameter span_fn Function to compute span from data parameter split_fn Function to split data at offset (needed for mid-chunk inserts)

val insert : 
  'a t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  data:'a ->
  span_fn:('a -> int) ->
  unit

Insert without split support (for backward compatibility)

Delete Operations

val split_chunk_at : 
  split_data:('a -> int -> 'a * 'a) ->
  'a chunk ->
  int ->
  'a chunk option * 'a chunk option

Split a chunk at a given offset within the chunk. Returns (before, after) where 'before' contains elements 0..offset-1 and 'after' contains elements offset..span-1.

parameter split_data Function to split the data at offset

val apply_deletes_to_chunk : 
  split_data:('a -> int -> 'a * 'a) ->
  'a chunk ->
  Clock.timespan list ->
  'a chunk list

Apply delete spans to a single chunk, splitting as needed

val process_single_chunk : 
  split_data:('a -> int -> 'a * 'a) ->
  'a chunk ->
  Clock.timespan ->
  'a chunk list
val span_overlaps_chunk : Clock.timespan -> 'a chunk -> bool
val prepend_single_delete : 
  split_data:('a -> int -> 'a * 'a) ->
  'a chunk ->
  Clock.timespan ->
  'a chunk list ->
  'a chunk list
val find_single_overlapping_chunk : 'a t -> Clock.timespan -> int option
val remember_overlap : 'a t -> int -> int option
val find_first_overlapping_chunk_forward : 'a t -> Clock.timespan -> int option
val find_first_overlapping_chunk_reverse : 'a t -> Clock.timespan -> int option
val find_first_overlapping_chunk_near : 
  'a t ->
  Clock.timespan ->
  int ->
  int option
val find_first_overlapping_chunk : 'a t -> Clock.timespan -> int option
val replace_single_chunk : 'a t -> int -> 'a chunk list -> unit
val delete : 
  'a t ->
  spans:Clock.timespan list ->
  split_data:('a -> int -> 'a * 'a) ->
  unit

Delete a range of IDs from the RGA. This marks elements as deleted (tombstones) and may split chunks.

parameter spans List of timespans to delete parameter split_data Function to split chunk data at offset

Iteration and Views

val iter : ('a chunk -> unit) -> 'a t -> unit

Iterate over all chunks

val iter_visible : ('a chunk -> unit) -> 'a t -> unit

Iterate over visible (non-deleted) chunks

val fold : ('a -> 'b chunk -> 'a) -> 'a -> 'b t -> 'a

Fold over all chunks

val fold_visible : ('a -> 'b chunk -> 'a) -> 'a -> 'b t -> 'a

Fold over visible chunks

val to_list : 'a t -> 'a chunk list

Get list of all chunks

val to_visible_list : 'a t -> 'a chunk list

Get list of visible chunks

Data-Specific Helpers

val utf16_offset_to_byte : string -> int -> int

Return the byte index for a UTF-16 code unit offset in a UTF-8 string.

val split_string : string -> int -> string * string

Split function for strings. json-joy string RGA IDs advance in UTF-16 code units, while OCaml strings are UTF-8 encoded.

val split_bytes : bytes -> int -> bytes * bytes

Split function for bytes

val split_single : 'a -> 'b -> 'c

Split function for single-element arrays (no split needed, returns dummy)

val split_list : 'a list -> int -> 'a list * 'a list
val string_span : string -> int

Span function for strings in json-joy's UTF-16 code units.

val bytes_span : bytes -> int

Span function for bytes

val element_span : 'a -> int

Span function for array elements (always 1)

val list_span : 'a list -> int

Convenience Functions for Common Types

val insert_string : 
  string t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  text:string ->
  unit

Insert a string into an RGA

val insert_bytes : 
  bytes t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  data:bytes ->
  unit

Insert bytes into an RGA

val insert_element : 
  'a t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  value:'a ->
  unit

Insert an element reference into an RGA

val insert_elements : 
  'a list t ->
  after:Clock.timestamp option ->
  chunk_id:Clock.timestamp ->
  values:'a list ->
  unit
val delete_string : string t -> spans:Clock.timespan list -> unit

Delete spans from a string RGA

val delete_bytes : bytes t -> spans:Clock.timespan list -> unit

Delete spans from a bytes RGA

val delete_elements : 'a t -> spans:Clock.timespan list -> unit

Delete spans from an element RGA

val delete_element_lists : 'a list t -> spans:Clock.timespan list -> unit

View Functions

val iter_string_chunks : 'a t -> ('a -> unit) -> unit

Iterate over visible string chunks in order.

val string_byte_length : string t -> int

Total byte length of visible string chunks. This is byte length, not UTF-16 span.

val write_string_to_bytes : string t -> dst:bytes -> dst_pos:int -> int

Write visible string chunks into dst starting at dst_pos. Returns the next write offset.

val write_string_to_bigstring : 
  string t ->
  dst:(char, 'a, 'b) Bigarray.Array1.t ->
  dst_pos:int ->
  int

Write visible string chunks into a Bigarray-backed byte buffer starting at dst_pos. Returns the next write offset.

val view_string : string t -> string

Get the visible string content

val bytes_length : bytes t -> int

Total byte length of visible bytes chunks.

val write_bytes_to_bytes : bytes t -> dst:bytes -> dst_pos:int -> int

Write visible bytes chunks into dst starting at dst_pos. Returns the next write offset.

val view_bytes : bytes t -> bytes

Get the visible bytes content

val view_elements : 'a t -> 'a list

Get the visible element references

Position and Span Finding

val visible_length : 'a t -> int

Get visible length (count of non-deleted elements)

val find_last_id : 'a t -> Clock.timestamp option

Find the last ID in the RGA (for append operations)

val find_position_string : string t -> int -> Clock.timestamp option

Find the ID to insert after for a given character position in a string RGA.

parameter pos Character position (0 = before first char, 1 = after first char, etc.) returns The timestamp to insert after, or None for inserting at head

val find_position_bytes : bytes t -> int -> Clock.timestamp option

Find the ID to insert after for a given byte position in a bytes RGA.

val find_position_arr : 'a t -> int -> Clock.timestamp option

Find the ID to insert after for a given index in an array RGA.

val find_spans_string : string t -> pos:int -> len:int -> Clock.timespan list

Find timespans to delete for a range in a string RGA.

parameter pos Start position parameter len Number of characters to delete returns List of timespans covering the range

val find_spans_bytes : bytes t -> pos:int -> len:int -> Clock.timespan list

Find timespans to delete for a range in a bytes RGA.

val find_spans_arr : 'a t -> pos:int -> len:int -> Clock.timespan list

Find timespans to delete for a range in an array RGA.

Debug/Testing

val pp_chunk : 
  (Format.formatter -> 'a -> unit) ->
  Format.formatter ->
  'a chunk ->
  unit

Pretty print an RGA for debugging

val pp : (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a t -> unit