hive — workers & supervision

hive is the worker runtime of the stack: workers are Eio fibers with GADT-typed mailboxes and pure message handlers, organised under supervisors with restart strategies, plus timers, one-shot tasks, a typed registry, groups and an injectable pubsub capability — local by default, cluster-aware when you opt in. Every snippet below is cut from a complete program under the site repository's examples/ directory, and all three programs run in this site's test suite.

API reference

The shape of a hive worker

A worker is a fiber plus a bounded, typed mailbox, started under an Eio switch. What you write is the decision logic: a pure handle function from a message and the current state to a reaction — the next state, the reply, and a list of actions for the runtime to perform. The fiber, the mailbox, ordered delivery and backpressure live in the library, not in your code.

Because OCaml has no higher-kinded type parameters, the server layer is a functor: each worker module instantiates Hive.Server.Make with its own message GADT, the way you instantiate Map.Make with a key type. The GADT's type parameter is the reply type, so the worker's protocol is checked by the compiler, not by discipline.

Everything composes with Eio's structured concurrency: handles are tied to a switch, stopping is a value-level operation, and when the switch ends, everything under it is cleaned up.

one worker: mailbox → pure handler → new state

        casts & calls
   producers ──────────────▶ ┌────────────────┐
                             │ bounded mailbox │  (typed, ordered)
                             └───────┬─────────┘
                                     │ one message at a time
                                     ▼
                          ┌──────────────────────┐
                          │  handle msg state     │  pure: no fibers, no IO
                          │   → continue/reply +  │
                          │     effects as data   │
                          └──────────┬────────────┘
                                     │ new state
                                     ▼
                              (loop, in one fiber)

A typed counter worker

The smallest complete worker: two messages, a pure handle, a spec, and a main that exercises the whole lifecycle.

The message GADT is the protocol

examples/hive_counter/main.ml
(* The message GADT: the type parameter is the reply type.
   Casts (fire-and-forget) are [unit msg]; calls carry their reply
   type, so an ill-typed send does not compile. *)
type _ msg =
  | Incr : int -> unit msg (* cast: add [n] to the counter *)
  | Total : int msg (* call: read the current total *)

(* One functor application per worker module, like [Map.Make]. *)
module Counter = Hive.Server.Make (struct
  type 'r t = 'r msg
end)

Read the message type first: Incr is a cast — fire-and-forget, reply type unit — while Total is a call that replies with an int. Because the reply type rides on the constructor, call counter Total is an int-typed expression, and answering a call with the wrong type does not compile.

Pure decision logic

examples/hive_counter/main.ml
(* Pure decision logic: state in, reaction out. No fibers, no IO —
   unit-testable by calling it directly with a message and a state. *)
let handle : type r. r msg -> int -> (int, r) Counter.reaction =
 fun msg state ->
  match msg with
  | Incr n -> Counter.continue (state + n) ()
  | Total -> Counter.continue state state

(* The spec bundles the callbacks; defaults cover the rest. *)
let spec =
  {
    Counter.init = (fun () -> 0);
    handle;
    handle_info = Counter.ignore_info;
    terminate = Counter.no_terminate;
  }

handle does no I/O and spawns no fibers, so testing it is calling it: assert that handle (Incr 2) 3 continues with state 5. continue is the no-action reaction; ignore_info and no_terminate are the defaults for the two callbacks this worker does not need.

Casts, calls, and errors as values

examples/hive_counter/main.ml
(* Start the server: a fiber plus a typed mailbox under [sw]. *)
let counter = Counter.start ~sw ~clock spec in
(* Casts are asynchronous; a full mailbox applies backpressure. *)
List.iter (fun n -> Counter.cast counter (Incr n)) [ 1; 2; 3; 4 ];
(* A call is synchronous request/reply, with errors as values.
   The mailbox is ordered, so this call sees all the casts above. *)
(match Counter.call counter Total with
| Ok total -> Printf.printf "total after increments 1+2+3+4: %d\n" total
| Error `Timeout | Error `Stopped ->
    prerr_endline "error: counter unavailable";
    exit 1);

start runs init in the calling fiber (so a failing init fails start, not the worker later) and forks the server fiber under the switch. A cast enqueues and returns; when the mailbox is full it blocks the producer — backpressure, not an unbounded queue. A call is synchronous request/reply and returns a result: Timeout and Stopped are variants to match on, never exceptions to forget. The default call timeout is five seconds.

Stop, join, outcomes

examples/hive_counter/main.ml
(* Orderly shutdown: stop, then await the worker's outcome. *)
Counter.stop counter;
match Eio.Promise.await (Counter.join counter) with
| Hive.Outcome.Completed () -> print_endline "counter stopped cleanly"
| Hive.Outcome.Failed exn ->
    Printf.eprintf "error: counter crashed: %s\n" (Printexc.to_string exn);
    exit 1
| Hive.Outcome.Cancelled ->
    prerr_endline "error: counter was cancelled";
    exit 1

stop is graceful: it queues behind already-sent messages, runs the terminate callback, then resolves any pending calls with Error `Stopped. join returns a promise of the worker's Hive.Outcome — Completed, Failed with the exception, or Cancelled — the same value a supervisor consults to decide restarts.

$ dune exec examples/hive_counter/main.exe

Supervision

Under real load you rarely start workers by hand: a supervisor owns a sub-switch, starts its children in order under it, and restarts them according to a policy when they fail. Because children live under the supervisor's switch, cancelling the switch is structured shutdown for free.

Child specs and restart policies

examples/hive_supervision/main.ml
(* A transient child is restarted only when it fails. The first run
   crashes; the supervisor backs off, restarts it; the second run
   completes and stays down. *)
let attempts = ref 0 in
let flaky =
  Supervisor.child_spec ~id:"flaky" ~restart:Supervisor.Transient
    ~backoff:{ Supervisor.initial = 0.01; max_delay = 0.1; multiplier = 2.0 }
    (fun child_sw ->
      Supervisor.child_of_fn child_sw (fun () ->
          incr attempts;
          Printf.printf "flaky child: attempt %d\n" !attempts;
          match !attempts with 1 -> failwith "first run crashes" | _ -> ()))
in
let sup =
  Supervisor.start ~sw ~clock ~strategy:Supervisor.One_for_one [ flaky ]
in

A child spec is an id plus a start function that receives the supervisor's sub-switch and is re-run on every restart. The restart policy is per child: Permanent children are always restarted, Transient ones only after a Failed outcome (this flaky child completes on its second run and stays down), Temporary ones never. Restart delays back off exponentially up to a cap and reset once a child has stayed up longer than the cap. child_of_fn adapts a plain thunk; anything exposing join and shutdown — a server, a task, another supervisor — can be a child.

Strategies and shutdown

examples/hive_supervision/main.ml
(* A supervisor is itself a joinable unit of work. *)
Supervisor.stop sup;
match Eio.Promise.await (Supervisor.join sup) with
| Outcome.Completed () -> print_endline "supervisor stopped cleanly"
| Outcome.Failed _ | Outcome.Cancelled ->
    prerr_endline "error: supervisor did not stop cleanly";
    exit 1

The strategy decides the blast radius of a failure: One_for_one restarts only the failed child, One_for_all restarts all of them, Rest_for_one restarts the failed child and those started after it. The await flag picks the stop semantics — ask each child to shut down (the default) or wait for running children to finish on their own. The supervisor is itself joinable, so trees of supervisors compose. In an araara application this is what application.ml starts under its root switch.

$ dune exec examples/hive_supervision/main.exe

Timers and one-shot tasks

Two small companions cover the work that is not a long-lived worker: scheduled callbacks and background one-shots.

Scheduler: after and every

examples/hive_supervision/main.ml
(* Timers run on the Eio clock: periodic [every], one-shot [after],
   both cancellable. *)
let ticks = ref 0 in
let ticker =
  Scheduler.every ~sw ~clock ~immediate:true 0.01 (fun () -> incr ticks)
in
let fired, resolve = Eio.Promise.create () in
let _once : Scheduler.timer =
  Scheduler.after ~sw ~clock 0.05 (fun () -> Eio.Promise.resolve resolve ())
in
Eio.Promise.await fired;
Scheduler.cancel ticker;
Printf.printf "periodic ticks before the one-shot fired: at least %d\n"
  (min !ticks 2);

Timers run on the Eio clock under your switch: after fires once, every fires periodically (immediate:true adds a tick at t=0), and cancel stops either — idempotent, and a no-op after a one-shot has fired.

Task: don't block the request

examples/hive_supervision/main.ml
(* One-shot background work: each task resolves to an outcome. *)
let jobs = List.map (fun n -> Task.async ~sw (fun () -> n * n)) [ 2; 3; 4 ] in
let squares =
  List.filter_map (fun o -> Outcome.value o) (Task.await_all jobs)
in
Printf.printf "squares: %s\n"
  (String.concat ", " (List.map string_of_int squares));
(* An exception does not escape: it becomes a [Failed] outcome. *)
let risky = Task.async ~sw (fun () -> failwith "out of cheese") in
(match Task.await risky with
| Outcome.Failed _ -> print_endline "risky task failed, as a value"
| Outcome.Completed () | Outcome.Cancelled ->
    prerr_endline "error: expected the task to fail";
    exit 1);

Task.async forks a fiber and gives back a typed handle; await (or await_all) blocks for the Outcome. An exception raised inside the task becomes Failed — a value the caller matches on, not a crash that takes the switch down. cancel resolves the outcome to Cancelled. Tasks plug directly into supervisors via child_of_fn.

Actions and out-of-band messages

A handler never performs effects; it returns them. The reaction's action list is interpreted by the runtime after the reply is delivered: Send enqueues on any typed address, Send_after schedules a named timer, Publish broadcasts through the injected pubsub, and Stop terminates after the current message. This keeps every callback pure and unit-testable.

Actions as data

examples/hive_registry/main.ml
(* An audit worker records every event it hears about. [Remind] shows
   actions-as-data: the handler returns a [Send_after], the runtime
   performs it and later delivers [Info.Timer] back to the worker. *)
type _ msg =
  | Remind : string -> unit msg (* cast: schedule a named timer *)
  | Seen : string list msg (* call: everything recorded so far *)

module Audit = Hive.Server.Make (struct
  type 'r t = 'r msg
end)

let handle : type r. r msg -> string list -> (string list, r) Audit.reaction =
 fun msg state ->
  match msg with
  | Remind name ->
      let actions = [ Audit.Action.Send_after { delay = 0.01; timer = name } ] in
      { Audit.state; reply = (); actions }
  | Seen -> Audit.continue state (List.rev state)

The Remind handler does not sleep and does not fork: it returns a Send_after action and the runtime does the scheduling. A test asserts on the returned action list — no clock required.

handle_info: timers and bridged deliveries

examples/hive_registry/main.ml
(* Out-of-band messages — fired timers, bridged pubsub deliveries —
   arrive in [handle_info], also pure. *)
let handle_info info state =
  match info with
  | Hive.Server.Info.Timer name -> Audit.continue (("timer " ^ name) :: state) ()
  | Hive.Server.Info.Pubsub { topic; payload } ->
      Audit.continue ((topic ^ ": " ^ payload) :: state) ()

let spec =
  { Audit.init = (fun () -> []); handle; handle_info;
    terminate = Audit.no_terminate }

Messages that are not part of the worker's GADT — a named timer firing, a pubsub delivery — arrive as Info values in handle_info, which is just as pure as handle. Workers that need neither use the ignore_info default.

Naming, fan-out, and pubsub

Three small capabilities connect workers to each other. All are plain values you create and pass — nothing global, nothing magic.

The pubsub capability

Pubsub is a topic broker you hold as a plain value: subscribe a callback to a topic, publish a payload to every subscriber, and call the returned function to unsubscribe.

examples/hive_pubsub/main.ml
(* A subscriber is a topic and a callback. The callback runs once per
   payload published to that topic, synchronously in the publisher's
   fiber. subscribe returns a function that releases the subscription. *)
let received_a = ref [] in
let received_b = ref [] in
let unsubscribe_a =
  bus.Pubsub.subscribe ~topic:"news" (fun ~payload ->
      received_a := payload :: !received_a)
in
let _unsubscribe_b =
  bus.Pubsub.subscribe ~topic:"news" (fun ~payload ->
      received_b := payload :: !received_b)
in
examples/hive_pubsub/main.ml
(* publish fans a payload out to every subscriber of the topic; a topic
   with no subscribers simply drops it. *)
bus.Pubsub.publish ~topic:"news" ~payload:"headline: pubsub now lives in hive";
bus.Pubsub.publish ~topic:"news" ~payload:"headline: examples build clean";
bus.Pubsub.publish ~topic:"weather" ~payload:"ignored: nobody is listening";

Hive.Pubsub.local is the single-process implementation — publish invokes each subscriber synchronously in the publisher's fiber, so a callback that blocks (enqueuing into a full mailbox) backpressures the publisher. hive itself never picks an implementation: the application injects one at composition time, and a clustered app supplies a node-spanning broker in its place.

That injected capability is what bridges pubsub to workers. Server.subscribe turns a topic into Info.Pubsub messages delivered to the worker's mailbox, and the returned function releases the subscription:

examples/hive_registry/main.ml
(* The pubsub capability is injected — hive never picks a transport.
   [subscribe] bridges a topic straight into the worker's mailbox. *)
let pubsub = Pubsub.local () in
let audit = Audit.start ~sw ~clock ~pubsub spec in
let unsubscribe = Audit.subscribe audit ~pubsub ~topic:"deploys" in
pubsub.Pubsub.publish ~topic:"deploys" ~payload:"api v42 rolled out";
Audit.cast audit (Remind "rotate-logs");
Eio.Time.sleep clock 0.05;
(match Audit.call audit Seen with
| Ok events -> List.iter print_endline events
| Error `Timeout | Error `Stopped ->
    prerr_endline "error: audit worker unavailable";
    exit 1);
unsubscribe ();
$ dune exec examples/hive_pubsub/main.exe

The typed registry

examples/hive_registry/main.ml
(* The registry maps a name to a capability, type-safely: a lookup
   succeeds only with the same key that registered the value. *)
let registry = Registry.create () in
let audit_key : Audit.t Registry.key = Registry.key "audit" in
(match Registry.register registry audit_key audit with
| Ok () -> ()
| Error `Already_registered ->
    prerr_endline "error: the audit name is already taken";
    exit 1);
(match Registry.whereis registry audit_key with
| Some _server -> print_endline "audit worker found under its name"
| None ->
    prerr_endline "error: audit worker not registered";
    exit 1);

The registry maps names to capabilities with no unsafe casts: keys carry a type witness, so a lookup returns the value only when made with the same key that registered it. Registration is atomic — first one wins — and the backing table is lock-free and safe across domains. Create the key once, next to the worker module that owns the name, and share it.

Groups

examples/hive_registry/main.ml
(* A group is a named set with no uniqueness — the unit of fan-out. *)
let groups = Group.create () in
let auditors : Audit.t Group.key = Group.key "auditors" in
let second = Audit.start ~sw ~clock spec in
let joined =
  List.filter_map
    (fun worker ->
      match Group.join groups auditors worker with
      | Ok membership -> Some membership
      | Error `Name_in_use -> None)
    [ audit; second ]
in
Printf.printf "group members: %d\n" (List.length (Group.members groups auditors));
List.iter (fun m -> Group.leave groups m) joined;
Printf.printf "after leaving: %d\n" (List.length (Group.members groups auditors));

Where the registry enforces uniqueness, a group is a named set: many members per name, used to fan a message out to all of them. join returns a membership token, the only way to leave — so two joins are two memberships, and leaving one cannot accidentally evict the other.

$ dune exec examples/hive_registry/main.exe

Going multi-node: hive.cluster

When one node stops being enough, the hive.cluster sub-library extends the same worker model across machines: a cluster-wide name registry and groups, remote messaging to a worker on another node, replicated state, and cluster singletons. It is opt-in — single-node apps never link it — and it is the deepest part of hive, so the rest of this page walks through how it is built, how nodes find each other, and what each piece does.

The layered architecture

hive.cluster is functored over three capabilities

  your workers · Dist_registry · Dist_group · Placement · Remote
  ───────────────────────────────────────────────────────────────
                          hive.cluster
       (functor over 3 capabilities — swappable, testable)
  ┌──────────────┬───────────────┬────────────────────────────┐
  │  MEMBERSHIP  │   TRANSPORT   │           REPLICA          │
  │ who is here  │  ship bytes   │   converging state (CRDT)  │
  └──────┬───────┴───────┬───────┴──────────────┬─────────────┘
         │               │                      │
   hive.cluster.swim (one adapter)        hive.cluster.crdt
      swim — SWIM gossip over UDP/TCP        crdt — json-joy docs

Every cluster feature is a functor over three module types: MEMBERSHIP (who is in the cluster and which nodes are reachable), TRANSPORT (ship bytes to a named node), and REPLICA (a converging state type). hive.cluster itself depends on none of swim, crdt or the network — it is pure logic over those interfaces, which is why it can be tested against in-memory fakes that simulate partitions and message loss.

In production you plug in two adapters: hive.cluster.swim supplies MEMBERSHIP and TRANSPORT over the swim gossip protocol, and hive.cluster.crdt supplies REPLICA as json-joy CRDT documents. A node's identity is an id plus an incarnation number, so a message addressed to a node from before it restarted is fenced out.

SWIM gossip and ports

two nodes on loopback, one port each

        node A                                 node B
   127.0.0.1:9471                          127.0.0.1:9472
   ┌──────────────┐   UDP :port  probe ───▶ ┌──────────────┐
   │   swim node  │   ◀─── ack   UDP :port  │   swim node  │
   │              │   gossip piggybacked    │              │
   │              │   on every packet ────▶ │              │
   │              │   TCP :port  join /     │              │
   │              │   ◀── push-pull state ─▶│              │
   └──────────────┘                         └──────────────┘
   one port (default 7946) carries both: UDP for probes + gossip,
   TCP for the join-time state exchange.

Membership runs the SWIM protocol. Each node periodically picks a random peer and sends it a UDP probe; a missed ack triggers indirect probes routed through other peers before the node is marked Suspect and, if still silent, Dead. Every packet piggybacks a few membership updates (Alive / Suspect / Dead, Join / Leave), so news of a change spreads exponentially across the cluster without a central coordinator — gossip, not broadcast.

All of this rides one configurable port, bind_port, which defaults to 7946. UDP on that port carries the probes, acks and gossip; TCP on the same port carries the join-time push/pull state exchange, where a joining node and a seed trade their full membership views so a newcomer converges immediately instead of waiting for gossip to trickle in. Open that one UDP+TCP port between nodes; optional AES-GCM with a shared key encrypts everything on it.

Starting a node

examples/hive_cluster/main.ml
(* The adapter's [config] takes a dotted-quad [bind_addr] (it converts to
   swim's raw-byte form internally) and a [bind_port]. We start from
   [Swim.default_config] and tighten the gossip interval so the two nodes
   converge quickly in this demo. *)
let config ~port =
  {
    (SwimA.config ~bind_addr:host ~bind_port:port ()) with
    Swim.Types.protocol_interval = 0.1;
    cluster_name = "hive-cluster-example";
  }

Hive_cluster_swim.config takes a dotted bind address and the bind_port (here distinct loopback ports so two nodes share one machine); it also carries the SWIM timing dials and the optional cluster_name and secret_key. Hive_cluster_swim.create then opens the sockets and starts the protocol fibers under the switch — the resulting value is both the MEMBERSHIP and the TRANSPORT for the functors below.

examples/hive_cluster/main.ml
(* Boot a node: create the swim endpoint, then wire a registry on top of
   it BEFORE joining, so nothing the cluster sends can race a missing
   subscription. *)
let boot ~sw ~env ~port =
  match SwimA.create ~sw ~env ~config:(config ~port) with
  | Error `Invalid_key ->
      prerr_endline "error: swim secret key must be exactly 16 bytes";
      exit 1
  | Ok node ->
      let reg = Reg.create ~membership:node ~transport:node ~name:"shared" () in
      (node, reg)

let () =
  Eio_main.run @@ fun env ->
  Eio.Switch.run @@ fun sw ->
  let clock = Eio.Stdenv.clock env in
  let env = swim_env ~sw env in
  let node_a, reg_a = boot ~sw ~env ~port:port_a in
  let node_b, reg_b = boot ~sw ~env ~port:port_b in
  Printf.printf "node A = %s (swim :%d)\n%!"
    (Node.to_string (SwimA.Membership.self node_a))
    port_a;
  Printf.printf "node B = %s (swim :%d)\n%!"
    (Node.to_string (SwimA.Membership.self node_b))
    port_b;
  (* Join B to A's seed. The seed string raw-encodes the dotted address;
     joining runs a TCP push/pull state exchange, so both membership views
     converge immediately. *)
  let seed = SwimA.seed ~host ~port:port_a in
  (match SwimA.join node_b ~seed_nodes:[ seed ] with
  | Ok () -> Printf.printf "node B joined seed %s:%d\n%!" host port_a
  | Error `No_seeds_reachable ->
      prerr_endline "error: no seed nodes reachable";
      exit 1);

A node is wired before it joins: build the registry, groups, placement and remote components over the node, then join a seed. join performs the TCP push/pull exchange with the seed's address (`No_seeds_reachable if none answer), after which gossip keeps every node's view current.

Replicated state, registry and groups

Replica is the one replication mechanism, and the distributed registry and groups are instances of it. A local update applies immediately and broadcasts a sequence-numbered delta to reachable members; a receiver that spots a gap requests a sync, which replies with the missing tail or, past the bounded buffer, a full snapshot. Reads are always local — there is no quorum on the read path.

examples/hive_cluster/main.ml
(* Register a name on node A. This mutates A's local replica; the change
   then propagates to B over the registry's delta gossip. *)
(match Reg.register reg_a "spawner" ~meta:"worker-7" with
| Ok () -> Printf.printf "node A registered \"spawner\" -> worker-7\n%!"
| Error `Already_registered ->
    prerr_endline "error: name already registered";
    exit 1);
examples/hive_cluster/main.ml
(* Gossip is asynchronous, so poll node B's view until it sees A's
   binding — bounded to ~5s (50 polls of 0.1s) so the demo always
   terminates deterministically. *)
let rec converge n =
  match Reg.whereis reg_b "spawner" with
  | Some (owner, meta) when Node.same_id owner (SwimA.Membership.self node_a) ->
      Printf.printf
        "node B sees \"spawner\" held by %s with meta %S after %d poll(s)\n%!"
        (Node.to_string owner) meta n;
      true
  | _ when n >= 50 -> false
  | _ ->
      Eio.Time.sleep clock 0.1;
      converge (n + 1)
in
let converged = converge 0 in

Dist_registry is a cluster-wide name → (node, metadata) map (last-writer-wins on conflict); Dist_group is a cluster-wide set with add-wins semantics. The example registers a name on one node and reads it back on the other once gossip has carried it across — convergence is asynchronous, so real code waits on an on_update callback rather than polling.

$ dune exec examples/hive_cluster/main.exe

Remote messaging and placement

Remote makes a local worker reachable by name from another node: the receiving node exposes a decoder, the sender holds a typed address (node, name, encoder), and send_to ships a best-effort, cast-style message (`Unreachable when the node has left). It is the cross-node equivalent of a cast.

Placement runs cluster singletons. Every node declares the same named spec; a consistent-hash ring (with virtual nodes for balance) picks one owner, and ownership follows membership — when the owner leaves, the ring reassigns and the worker starts elsewhere with a graceful state handoff. Placement is AP; Placement_cp is a CP variant that uses majority-quorum leases to run at most one instance, trading availability (nothing runs without a majority) for that guarantee.

Consistency: what AP costs you

Replication, the registry, groups and Placement are AP: during a network partition both sides stay available, so a name can briefly have two holders and a placed singleton can briefly run on two nodes — the duplicate is detected and stopped when the partition heals. Workers placed under AP must therefore tolerate running twice (be idempotent, or fence their own side effects); that is the documented contract. There is no consensus log and no persistence — state lives in memory and converges after heals. When you need single-writer safety, use Placement_cp or run an external consensus system behind a worker. The swim and crdt pages document the layers underneath.