Crdt.Rx_client

crdt · API reference

JSON-Rx client with request/response correlation

JSON-Rx client.

This module implements client-side RPC infrastructure for JSON-Rx protocol:

  • Request/response correlation
  • Subscription management with callbacks
  • Notification sending
  • Uses OCaml 5.4 effects for transport abstraction see https://jsonjoy.com/specs/json-rx JSON-Rx specification

Types

type response_result = 
  | Ok of Value.t
  | Error of Value.t

Response result type

type pending_request = {
  id : int;
  mutable result : response_result option;
  mutable callback : (response_result -> unit) option;
}

Pending request state

type subscription = {
  id : int;
  channel : string;
  on_data : Value.t -> unit;
  on_complete : unit -> unit;
  mutable active : bool;
}

Subscription state

type t = {
  mutable next_id : int;
  pending : (int, pending_request) Hashtbl.t;
  subscriptions : (int, subscription) Hashtbl.t;
  mutable connected : bool;
}

Client state

Client Creation

val create : unit -> t

Create a new client

val next_request_id : t -> int

Get a unique request ID

Sending Messages

val send_message : Rx.message -> unit

Send a message to the server using effects

val request : t -> method_:string -> ?data:Value.t -> unit -> pending_request

Send a request and register for response

val request_async : 
  t ->
  method_:string ->
  ?data:Value.t ->
  callback:(response_result -> unit) ->
  unit ->
  unit

Send a request with callback for async response

val notify : 'a -> method_:string -> ?data:Value.t -> unit -> unit

Send a notification (fire and forget)

val subscribe : 
  t ->
  channel:string ->
  on_data:(Value.t -> unit) ->
  ?on_complete:(unit -> unit) ->
  unit ->
  int

Subscribe to a channel

val unsubscribe : t -> int -> unit

Unsubscribe from a channel

Receiving Messages

val handle_response : t -> int -> Value.t -> unit

Handle a response message

val handle_error : t -> int -> Value.t -> unit

Handle an error message

val handle_data : t -> int -> Value.t -> unit

Handle subscription data

val handle_complete : t -> int -> unit

Handle subscription complete

val handle_message : t -> Rx.message -> unit

Handle an incoming message

val receive_one : t -> bool

Process one incoming message. Returns false if connection closed.

val await_response : t -> pending_request -> response_result

Process messages until a specific request completes or times out

val run_receive_loop : t -> unit

Run the receive loop until disconnected

Synchronous API

Blocking request/response for simple use cases.

val call : t -> method_:string -> ?data:Value.t -> unit -> response_result

Send a request and wait for response

Batch Operations

For testing - process messages without effects.

val process_messages : t -> Rx.message list -> unit

Process a list of incoming messages

val get_pending_result : t -> int -> response_result option

Get result of a pending request (for testing)

Statistics

val pending_count : t -> int

Get number of pending requests

val subscription_count : t -> int

Get number of active subscriptions

val is_connected : t -> bool

Check if client is connected

val disconnect : t -> unit

Mark client as disconnected

Client-Server Integration

For in-process testing without actual network.

val create_test_pair : 
  unit ->
  t * Rx_server.t * (Rx.message -> unit) * (unit -> unit)

Create a connected client-server pair for testing. Returns (client, server, process_fn) where process_fn routes client messages through the server and responses back to client.