Hive.Server.Make

hive · API reference

Parameters

module Msg : MSG

Signature

module Action : sig ... end
type ('state, 'reply) reaction = {
  state : 'state; (* New server state after handling the message. *)
  reply : 'reply; (* Reply delivered to the caller. For casts and infos this is (). *)
  actions : Action.t list; (* Runtime effects to perform after the reply is delivered. *)
}

A pure callback result: state, reply and effects-as-data.

type 'state spec = {
  init : unit -> 'state; (* Build the initial state. Runs synchronously in start. *)
  handle : 'r. 'r Msg.t -> 'state -> ('state, 'r) reaction; (* Handle calls and casts. *)
  handle_info : Info.t -> 'state -> ('state, unit) reaction; (* Handle timer and pubsub messages. *)
  terminate : reason -> 'state -> unit; (* Cleanup callback run when the server stops. *)
}

Server callbacks and initial state factory.

Example message GADT and server:

ocaml]
module Msg = struct
  type _ t =
    | Incr : unit t
    | Get : int t
end

module Counter = Hive.Server.Make (Msg)

let spec = {
  Counter.init = (fun () -> 0);
  handle = (fun (type r) (msg : r Msg.t) state ->
    match msg with
    | Incr -> { state = state + 1; reply = (); actions = [] }
    | Get -> { state; reply = state; actions = [] });
  handle_info = Counter.ignore_info;
  terminate = Counter.no_terminate;
}
val continue : 'state -> 'reply -> ('state, 'reply) reaction

continue state reply is the no-action reaction.

val ignore_info : Info.t -> 'state -> ('state, unit) reaction

Default handle_info: keep the state, do nothing.

val no_terminate : reason -> 'state -> unit

Default terminate: do nothing.

type t

A running server.

val start : 
  sw:Eio.Switch.t ->
  clock:_ Eio.Time.clock ->
  ?pubsub:Pubsub.t ->
  ?capacity:int ->
  'state spec ->
  t

Runs spec.init in the calling fiber (a raised exception fails start), then forks the server fiber under sw.

val cast : t -> unit Msg.t -> unit

Asynchronous send. Blocks while the mailbox is full (backpressure); silently dropped when the server has stopped.

val call : 
  ?timeout:float ->
  t ->
  'r Msg.t ->
  ('r, [ `Stopped | `Timeout ]) result

Synchronous request/reply. timeout defaults to default_call_timeout seconds.

val send_info : t -> Info.t -> unit

Deliver an out-of-band message (timer/pubsub bridge). Dropped when the server has stopped.

val subscribe : t -> pubsub:Pubsub.t -> topic:string -> Pubsub.unsubscribe

Bridge a pubsub topic into the server: each delivery arrives as Info.Pubsub { topic; payload } in handle_info. Deliveries after the server stops are dropped; call the returned function to release the subscription.

val stop : t -> unit

Graceful stop: enqueued behind already-queued messages; runs terminate Shutdown, then resolves pending calls with Error Stopped`.

val join : t -> unit Outcome.t Eio.Promise.t

The underlying worker's outcome, for supervision.