Crdt.Rx_server
crdt · API reference
JSON-Rx server router/dispatcher
JSON-Rx server router/dispatcher.
This module implements server-side RPC infrastructure for JSON-Rx protocol:
- Method registration and routing
- Request/response handling
- Subscription management
- Error handling The server uses OCaml 5.4 effects for transport abstraction, allowing it to work with any IO backend (Eio, Lwt, Unix blocking).
see https://jsonjoy.com/specs/json-rx JSON-Rx specification
Handler Types
type 'a handler_result =
| Ok of 'a
| Error of Value.tResult type for method handlers
type method_handler = Value.t option -> Value.t handler_resultMethod handler function type. Takes request data and returns a result or error.
type notification_handler = Value.t option -> unitNotification handler function type. Takes notification data and returns unit (no response).
type subscription_handler = (Value.t -> unit) -> unit -> unitSubscription handler function type. Takes subscription callback and returns a cleanup function. The callback is called for each data item to push to the client. Returns a function to call when the subscription ends.
Server State
type subscription = {
id : int;
channel : string;
cleanup : unit -> unit;
}Active subscription
type t = {
mutable methods : (string, method_handler) Hashtbl.t;
mutable notifications : (string, notification_handler) Hashtbl.t;
mutable channels : (string, subscription_handler) Hashtbl.t;
mutable subscriptions : (int, subscription) Hashtbl.t;
mutable next_sub_id : int;
}Server state
Server Creation
val create : unit -> tCreate a new server
Method Registration
val register_method : t -> string -> method_handler -> unitRegister a method handler
val register_notification : t -> string -> notification_handler -> unitRegister a notification handler
val register_channel : t -> string -> subscription_handler -> unitRegister a subscription channel
val unregister_method : t -> string -> unitUnregister a method
val unregister_notification : t -> string -> unitUnregister a notification handler
val unregister_channel : t -> string -> unitUnregister a channel
Message Handling
val handle_request : t -> int -> string -> Value.t option -> Rx.messageHandle a request message
val handle_notification : t -> string -> Value.t option -> unitHandle a notification message (no response)
type send_fn = Rx.message -> unitSend callback for subscriptions
val handle_subscribe : t -> send_fn -> int -> string -> unitHandle a subscribe message
val handle_unsubscribe : t -> int -> unitHandle an unsubscribe message
val handle_message : t -> send_fn -> Rx.message -> Rx.message optionHandle an incoming message and return optional response
Server Loop
Main server loop that processes messages from a connection.
val process_one : t -> send_fn -> boolProcess a single message from the connection. Uses IO effects for reading/writing.
val send_message : Rx.message -> unitSend a message to the connection using effects
val run_connection : t -> unitRun the server loop on a connection until it closes
Convenience Functions
val register_echo : t -> unitRegister an echo method that returns its input
val register_ping : t -> unitRegister a ping method that returns "pong"
val create_with_defaults : unit -> tCreate a server with common methods registered
Batch Processing
Process multiple messages at once (for testing or batched protocols).
val process_batch : t -> Rx.message list -> Rx.message listProcess a list of messages and collect responses
Statistics
val method_count : t -> intGet the number of registered methods
val notification_count : t -> intGet the number of registered notification handlers
val channel_count : t -> intGet the number of registered channels
val subscription_count : t -> intGet the number of active subscriptions
val method_names : t -> string listGet list of registered method names
val channel_names : t -> string listGet list of registered channel names