Crdt.Sync
crdt · API reference
CRDT document synchronization protocol
CRDT document synchronization over JSON-Rx.
This module implements document sync protocol using JSON-Rx:
- Subscribe to document changes (real-time updates)
- Send/receive patches
- Initial sync (snapshot + pending patches)
- Conflict-free merge (CRDT semantics) see https://jsonjoy.com/specs/json-crdt JSON CRDT specification
Types
type doc_state = {
doc_id : string;
clock : Clock.clock_vector;
patch_count : int; (* Number of patches applied *)
}Document state summary for sync
Protocol Constants
module Methods : sig ... endRPC method names
module Channels : sig ... endSubscription channels
Server-Side Sync
type server_doc = {
id : string;
model : Model.t;
mutable patches : Patch.t list; (* Patches applied in order *)
mutable subscribers : (Value.t -> unit) list;
}Synchronized document on server
type server = {
docs : (string, server_doc) Hashtbl.t;
rx_server : Rx_server.t;
}Server sync state
val create_server : unit -> serverCreate a new sync server
val register_doc : server -> doc_id:string -> model:Model.t -> server_docRegister a document for sync
val get_doc_state : server_doc -> doc_stateGet document state
val clock_to_list : Clock.clock_vector -> (int * int) listClock to list for encoding
val encode_doc_state : doc_state -> Value.tEncode document state to Value.t for JSON-Rx
val decode_doc_state : Value.t -> doc_state optionDecode document state from Value.t
val apply_patch : server -> string -> string -> (doc_state, string) resultApply patch to server document and broadcast to subscribers
val subscribe_doc :
server ->
string ->
(Value.t -> unit) ->
(doc_state, string) resultSubscribe to document changes
val unsubscribe_doc : server -> string -> (Value.t -> unit) -> unitUnsubscribe from document changes
val get_patches_since : server_doc -> Clock.clock_vector -> Patch.t listGet patches since a given clock (for initial sync)
val encode_patch : Patch.t -> stringEncode a patch for transmission
Client-Side Sync
type client_doc = {
id : string;
model : Model.t;
mutable pending_patches : Patch.t list; (* Patches not yet acked by server *)
mutable on_change : (Patch.t -> unit) option;
}Synchronized document on client
type client = {
docs : (string, client_doc) Hashtbl.t;
rx_client : Rx_client.t;
subscription_ids : (string, int) Hashtbl.t;
}Client sync state
val create_client : Rx_client.t -> clientCreate a new sync client
val open_doc : client -> doc_id:string -> model:Model.t -> client_docOpen a document for sync
val on_doc_change : client_doc -> (Patch.t -> unit) -> unitSet change callback for a document
val handle_incoming_patch : client -> string -> string -> (unit, string) resultHandle incoming patch from server
Sync Protocol Helpers
type sync_comparison =
| Equal
| Behind
| Ahead
| ConcurrentCompare two document states
val compare_states : doc_state -> doc_state -> sync_comparisonTesting Helpers
val create_sync_pair :
doc_id:string ->
session1:int ->
session2:int ->
server * server_doc * client * client_docCreate a pair of documents that can sync
val client_doc_patches : client_doc -> Patch.t listHelper to get pending patches from client doc
val add_client_patch : client_doc -> Patch.t -> unitAdd a patch to client pending list
val is_dominated_by_clock : Clock.timestamp -> Clock.clock_vector -> boolCheck if timestamp is dominated by clock
val get_client_patches_since : client_doc -> Clock.clock_vector -> Patch.t listGet patches from client that server doesn't have
val sync_server_to_client : server_doc -> client_doc -> intSimulate sync: apply server patches to client
val sync_client_to_server : client_doc -> server_doc -> intSimulate sync: apply client pending patches to server
val bidirectional_sync : server_doc -> client_doc -> int * intTwo-way sync between server and client
val server_doc_view : server_doc -> Value.tGet view of server document
val client_doc_view : client_doc -> Value.tGet view of client document