Crdt.Rx_client
crdt · API reference
JSON-Rx client with request/response correlation
JSON-Rx client.
This module implements client-side RPC infrastructure for JSON-Rx protocol:
- Request/response correlation
- Subscription management with callbacks
- Notification sending
- Uses OCaml 5.4 effects for transport abstraction see https://jsonjoy.com/specs/json-rx JSON-Rx specification
Types
type response_result =
| Ok of Value.t
| Error of Value.tResponse result type
type pending_request = {
id : int;
mutable result : response_result option;
mutable callback : (response_result -> unit) option;
}Pending request state
type subscription = {
id : int;
channel : string;
on_data : Value.t -> unit;
on_complete : unit -> unit;
mutable active : bool;
}Subscription state
type t = {
mutable next_id : int;
pending : (int, pending_request) Hashtbl.t;
subscriptions : (int, subscription) Hashtbl.t;
mutable connected : bool;
}Client state
Client Creation
val create : unit -> tCreate a new client
val next_request_id : t -> intGet a unique request ID
Sending Messages
val send_message : Rx.message -> unitSend a message to the server using effects
val request : t -> method_:string -> ?data:Value.t -> unit -> pending_requestSend a request and register for response
val request_async :
t ->
method_:string ->
?data:Value.t ->
callback:(response_result -> unit) ->
unit ->
unitSend a request with callback for async response
val notify : 'a -> method_:string -> ?data:Value.t -> unit -> unitSend a notification (fire and forget)
val subscribe :
t ->
channel:string ->
on_data:(Value.t -> unit) ->
?on_complete:(unit -> unit) ->
unit ->
intSubscribe to a channel
val unsubscribe : t -> int -> unitUnsubscribe from a channel
Receiving Messages
val handle_response : t -> int -> Value.t -> unitHandle a response message
val handle_error : t -> int -> Value.t -> unitHandle an error message
val handle_data : t -> int -> Value.t -> unitHandle subscription data
val handle_complete : t -> int -> unitHandle subscription complete
val handle_message : t -> Rx.message -> unitHandle an incoming message
val receive_one : t -> boolProcess one incoming message. Returns false if connection closed.
val await_response : t -> pending_request -> response_resultProcess messages until a specific request completes or times out
val run_receive_loop : t -> unitRun the receive loop until disconnected
Synchronous API
Blocking request/response for simple use cases.
val call : t -> method_:string -> ?data:Value.t -> unit -> response_resultSend a request and wait for response
Batch Operations
For testing - process messages without effects.
val process_messages : t -> Rx.message list -> unitProcess a list of incoming messages
val get_pending_result : t -> int -> response_result optionGet result of a pending request (for testing)
Statistics
val pending_count : t -> intGet number of pending requests
val subscription_count : t -> intGet number of active subscriptions
val is_connected : t -> boolCheck if client is connected
val disconnect : t -> unitMark client as disconnected
Client-Server Integration
For in-process testing without actual network.
val create_test_pair :
unit ->
t * Rx_server.t * (Rx.message -> unit) * (unit -> unit)Create a connected client-server pair for testing. Returns (client, server, process_fn) where process_fn routes client messages through the server and responses back to client.