Crdt.Rga
crdt · API reference
RGA (Replicated Growable Array) core implementation
RGA (Replicated Growable Array) implementation.
This module implements the core RGA data structure used by str, bin, and arr nodes. The RGA provides:
- Insert after reference ID with deterministic concurrent ordering
- Delete by ID range with tombstone handling
- Convergence: all replicas applying same operations reach same state Key concepts:
- Each element has a unique timestamp ID
- Insert specifies which existing element to insert after
- Concurrent inserts at same position are ordered by timestamp comparison
- Deletes mark elements as tombstones (not removed) The implementation uses chunks for efficiency - contiguous elements with sequential IDs are grouped together. Chunks may be split during delete operations.
Types
type 'a chunk = {
id : Clock.timestamp; (* Starting timestamp of this chunk *)
span : int; (* Number of elements (IDs consumed) *)
data : 'a; (* The actual data *)
deleted : bool; (* Tombstone flag *)
parent : Clock.timestamp option; (* What this chunk was inserted after (None for root/splits) *)
}A chunk of contiguous elements in the RGA. For str: data is string, span is UTF-16 code unit count For bin: data is bytes, span is byte count For arr: data is timestamp (element reference), span is always 1
type 'a t = {
chunks : 'a chunk Dynarray.t; (* Main chunk sequence *)
mutable pending : 'a chunk list; (* Pending appends (in reverse order) *)
mutable last_chunk : 'a chunk option; (* Cached last chunk for fast appends *)
mutable search_hint : int option; (* Advisory last-found chunk index for nearby timestamp lookups. *)
}The RGA container - a sequence of chunks. The Dynarray spine avoids allocating list prefixes on mid-sequence insertions; pending keeps pure append workloads cheap until a full sequence operation is required.
val consolidate : 'a t -> unitval chunk_count : 'a t -> intval chunk_at : 'a t -> int -> 'a chunkval append_chunk : 'a t -> 'a chunk -> unitval insert_chunk_at : 'a t -> int -> 'a chunk -> unitval insert_two_chunks_at : 'a t -> int -> 'a chunk -> 'a chunk -> unitval replace_chunks : 'a t -> 'a chunk list -> unitConstructors
val empty : unit -> 'a tCreate an empty RGA
val copy : 'a t -> 'a tIndependent copy: shares the (immutable) chunk records but no mutable structure, so further edits to either side are isolated.
val singleton :
id:Clock.timestamp ->
data:'a ->
span_fn:('a -> int) ->
parent:Clock.timestamp option ->
'a tCreate an RGA with initial data.
parameter id Starting timestamp parameter data The data parameter span_fn Function to compute span from data
val from_chunks : 'a chunk list -> 'a tBuild an RGA from a list of chunks in order. Use this when decoding snapshots where chunks are already in correct order.
Queries
val is_empty : 'a t -> boolCheck if RGA is empty (no chunks, or all deleted with no content)
val total_span : 'a t -> intGet total span including deleted elements
val visible_span : 'a t -> intGet visible span (excluding deleted elements)
val last_id : 'a t -> Clock.timestamp optionGet the last ID in the RGA (inclusive)
val find_chunk_containing : 'a t -> Clock.timestamp -> 'a chunk optionFind a chunk containing the given timestamp. Returns None if not found.
val contains : 'a t -> Clock.timestamp -> boolCheck if a timestamp exists in the RGA
Insert Operations
val compare_for_rga : Clock.timestamp -> Clock.timestamp -> intCompare two timestamps for RGA ordering. Higher timestamp wins (inserted later in logical time). Tie-break by session ID (higher wins).
NOTE: This may not match json-joy's exact ordering for concurrent inserts. See issue crdt-4pq for json-joy compatibility work.
type insert_pos =
| Pos_at of int (* Insert at this chunk index *)
| Pos_split of int * int (* Split chunk at index, insert after offset within chunk *)Result of finding insertion position. May require splitting a chunk.
val chunk_contains_ts : 'a chunk -> Clock.timestamp -> boolval find_insert_position :
'a t ->
after_ts:Clock.timestamp option ->
new_ts:Clock.timestamp ->
insert_posFind the insertion position for a new chunk.
parameter after_ts The timestamp to insert after (None means insert at head) parameter new_ts The timestamp of the new chunk being inserted
val is_append_position : 'a t -> Clock.timestamp option -> boolCheck if we can use the fast append path. Returns true if 'after' points to the last element of the last chunk.
val insert_with_split :
'a t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
data:'a ->
span_fn:('a -> int) ->
split_fn:('a -> int -> 'a * 'a) ->
unitInsert a new chunk into the RGA.
parameter rga The RGA to modify parameter after The timestamp to insert after (or None for head) parameter chunk_id The timestamp for the new chunk parameter data The data to insert parameter span_fn Function to compute span from data parameter split_fn Function to split data at offset (needed for mid-chunk inserts)
val insert :
'a t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
data:'a ->
span_fn:('a -> int) ->
unitInsert without split support (for backward compatibility)
Delete Operations
val split_chunk_at :
split_data:('a -> int -> 'a * 'a) ->
'a chunk ->
int ->
'a chunk option * 'a chunk optionSplit a chunk at a given offset within the chunk. Returns (before, after) where 'before' contains elements 0..offset-1 and 'after' contains elements offset..span-1.
parameter split_data Function to split the data at offset
val apply_deletes_to_chunk :
split_data:('a -> int -> 'a * 'a) ->
'a chunk ->
Clock.timespan list ->
'a chunk listApply delete spans to a single chunk, splitting as needed
val process_single_chunk :
split_data:('a -> int -> 'a * 'a) ->
'a chunk ->
Clock.timespan ->
'a chunk listval span_overlaps_chunk : Clock.timespan -> 'a chunk -> boolval prepend_single_delete :
split_data:('a -> int -> 'a * 'a) ->
'a chunk ->
Clock.timespan ->
'a chunk list ->
'a chunk listval find_single_overlapping_chunk : 'a t -> Clock.timespan -> int optionval remember_overlap : 'a t -> int -> int optionval find_first_overlapping_chunk_forward : 'a t -> Clock.timespan -> int optionval find_first_overlapping_chunk_reverse : 'a t -> Clock.timespan -> int optionval find_first_overlapping_chunk_near :
'a t ->
Clock.timespan ->
int ->
int optionval find_first_overlapping_chunk : 'a t -> Clock.timespan -> int optionval replace_single_chunk : 'a t -> int -> 'a chunk list -> unitval delete :
'a t ->
spans:Clock.timespan list ->
split_data:('a -> int -> 'a * 'a) ->
unitDelete a range of IDs from the RGA. This marks elements as deleted (tombstones) and may split chunks.
parameter spans List of timespans to delete parameter split_data Function to split chunk data at offset
Iteration and Views
val iter : ('a chunk -> unit) -> 'a t -> unitIterate over all chunks
val iter_visible : ('a chunk -> unit) -> 'a t -> unitIterate over visible (non-deleted) chunks
val fold : ('a -> 'b chunk -> 'a) -> 'a -> 'b t -> 'aFold over all chunks
val fold_visible : ('a -> 'b chunk -> 'a) -> 'a -> 'b t -> 'aFold over visible chunks
val to_list : 'a t -> 'a chunk listGet list of all chunks
val to_visible_list : 'a t -> 'a chunk listGet list of visible chunks
Data-Specific Helpers
val utf16_offset_to_byte : string -> int -> intReturn the byte index for a UTF-16 code unit offset in a UTF-8 string.
val split_string : string -> int -> string * stringSplit function for strings. json-joy string RGA IDs advance in UTF-16 code units, while OCaml strings are UTF-8 encoded.
val split_bytes : bytes -> int -> bytes * bytesSplit function for bytes
val split_single : 'a -> 'b -> 'cSplit function for single-element arrays (no split needed, returns dummy)
val split_list : 'a list -> int -> 'a list * 'a listval string_span : string -> intSpan function for strings in json-joy's UTF-16 code units.
val bytes_span : bytes -> intSpan function for bytes
val element_span : 'a -> intSpan function for array elements (always 1)
val list_span : 'a list -> intConvenience Functions for Common Types
val insert_string :
string t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
text:string ->
unitInsert a string into an RGA
val insert_bytes :
bytes t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
data:bytes ->
unitInsert bytes into an RGA
val insert_element :
'a t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
value:'a ->
unitInsert an element reference into an RGA
val insert_elements :
'a list t ->
after:Clock.timestamp option ->
chunk_id:Clock.timestamp ->
values:'a list ->
unitval delete_string : string t -> spans:Clock.timespan list -> unitDelete spans from a string RGA
val delete_bytes : bytes t -> spans:Clock.timespan list -> unitDelete spans from a bytes RGA
val delete_elements : 'a t -> spans:Clock.timespan list -> unitDelete spans from an element RGA
val delete_element_lists : 'a list t -> spans:Clock.timespan list -> unitView Functions
val iter_string_chunks : 'a t -> ('a -> unit) -> unitIterate over visible string chunks in order.
val string_byte_length : string t -> intTotal byte length of visible string chunks. This is byte length, not UTF-16 span.
val write_string_to_bytes : string t -> dst:bytes -> dst_pos:int -> intWrite visible string chunks into dst starting at dst_pos. Returns the next write offset.
val write_string_to_bigstring :
string t ->
dst:(char, 'a, 'b) Bigarray.Array1.t ->
dst_pos:int ->
intWrite visible string chunks into a Bigarray-backed byte buffer starting at dst_pos. Returns the next write offset.
val view_string : string t -> stringGet the visible string content
val bytes_length : bytes t -> intTotal byte length of visible bytes chunks.
val write_bytes_to_bytes : bytes t -> dst:bytes -> dst_pos:int -> intWrite visible bytes chunks into dst starting at dst_pos. Returns the next write offset.
val view_bytes : bytes t -> bytesGet the visible bytes content
val view_elements : 'a t -> 'a listGet the visible element references
Position and Span Finding
val visible_length : 'a t -> intGet visible length (count of non-deleted elements)
val find_last_id : 'a t -> Clock.timestamp optionFind the last ID in the RGA (for append operations)
val find_position_string : string t -> int -> Clock.timestamp optionFind the ID to insert after for a given character position in a string RGA.
parameter pos Character position (0 = before first char, 1 = after first char, etc.) returns The timestamp to insert after, or None for inserting at head
val find_position_bytes : bytes t -> int -> Clock.timestamp optionFind the ID to insert after for a given byte position in a bytes RGA.
val find_position_arr : 'a t -> int -> Clock.timestamp optionFind the ID to insert after for a given index in an array RGA.
val find_spans_string : string t -> pos:int -> len:int -> Clock.timespan listFind timespans to delete for a range in a string RGA.
parameter pos Start position parameter len Number of characters to delete returns List of timespans covering the range
val find_spans_bytes : bytes t -> pos:int -> len:int -> Clock.timespan listFind timespans to delete for a range in a bytes RGA.
val find_spans_arr : 'a t -> pos:int -> len:int -> Clock.timespan listFind timespans to delete for a range in an array RGA.
Debug/Testing
val pp_chunk :
(Format.formatter -> 'a -> unit) ->
Format.formatter ->
'a chunk ->
unitPretty print an RGA for debugging
val pp : (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a t -> unit