Crdt.Rx_server

crdt · API reference

JSON-Rx server router/dispatcher

JSON-Rx server router/dispatcher.

This module implements server-side RPC infrastructure for JSON-Rx protocol:

  • Method registration and routing
  • Request/response handling
  • Subscription management
  • Error handling The server uses OCaml 5.4 effects for transport abstraction, allowing it to work with any IO backend (Eio, Lwt, Unix blocking).

see https://jsonjoy.com/specs/json-rx JSON-Rx specification

Handler Types

type 'a handler_result = 
  | Ok of 'a
  | Error of Value.t

Result type for method handlers

type method_handler = Value.t option -> Value.t handler_result

Method handler function type. Takes request data and returns a result or error.

type notification_handler = Value.t option -> unit

Notification handler function type. Takes notification data and returns unit (no response).

type subscription_handler = (Value.t -> unit) -> unit -> unit

Subscription handler function type. Takes subscription callback and returns a cleanup function. The callback is called for each data item to push to the client. Returns a function to call when the subscription ends.

Server State

type subscription = {
  id : int;
  channel : string;
  cleanup : unit -> unit;
}

Active subscription

type t = {
  mutable methods : (string, method_handler) Hashtbl.t;
  mutable notifications : (string, notification_handler) Hashtbl.t;
  mutable channels : (string, subscription_handler) Hashtbl.t;
  mutable subscriptions : (int, subscription) Hashtbl.t;
  mutable next_sub_id : int;
}

Server state

Server Creation

val create : unit -> t

Create a new server

Method Registration

val register_method : t -> string -> method_handler -> unit

Register a method handler

val register_notification : t -> string -> notification_handler -> unit

Register a notification handler

val register_channel : t -> string -> subscription_handler -> unit

Register a subscription channel

val unregister_method : t -> string -> unit

Unregister a method

val unregister_notification : t -> string -> unit

Unregister a notification handler

val unregister_channel : t -> string -> unit

Unregister a channel

Message Handling

val handle_request : t -> int -> string -> Value.t option -> Rx.message

Handle a request message

val handle_notification : t -> string -> Value.t option -> unit

Handle a notification message (no response)

type send_fn = Rx.message -> unit

Send callback for subscriptions

val handle_subscribe : t -> send_fn -> int -> string -> unit

Handle a subscribe message

val handle_unsubscribe : t -> int -> unit

Handle an unsubscribe message

val handle_message : t -> send_fn -> Rx.message -> Rx.message option

Handle an incoming message and return optional response

Server Loop

Main server loop that processes messages from a connection.

val process_one : t -> send_fn -> bool

Process a single message from the connection. Uses IO effects for reading/writing.

val send_message : Rx.message -> unit

Send a message to the connection using effects

val run_connection : t -> unit

Run the server loop on a connection until it closes

Convenience Functions

val register_echo : t -> unit

Register an echo method that returns its input

val register_ping : t -> unit

Register a ping method that returns "pong"

val create_with_defaults : unit -> t

Create a server with common methods registered

Batch Processing

Process multiple messages at once (for testing or batched protocols).

val process_batch : t -> Rx.message list -> Rx.message list

Process a list of messages and collect responses

Statistics

val method_count : t -> int

Get the number of registered methods

val notification_count : t -> int

Get the number of registered notification handlers

val channel_count : t -> int

Get the number of registered channels

val subscription_count : t -> int

Get the number of active subscriptions

val method_names : t -> string list

Get list of registered method names

val channel_names : t -> string list

Get list of registered channel names