swim — cluster membership
swim implements the SWIM protocol — scalable, weakly-consistent, infection-style process group membership — on Eio. Nodes discover each other, detect failures with probes and indirect checks, and gossip Alive/Suspect/Dead state, with optional AES-GCM on the wire and user messaging on top. Every snippet below is cut from a program under examples/ that starts a real cluster on loopback and runs in the test suite.
Starting a node
A node is a Swim.Cluster.t: a UDP socket for probes and gossip, a TCP listener for state exchange, and the protocol loops running as fibers under your switch.
The capability record
(* The library reads its clocks, network and entropy from a [Types.env]
capability record; build one from the standard Eio environment. *)
let swim_env ~sw env =
{ Types.stdenv =
object
method clock = Eio.Stdenv.clock env
method mono_clock = Eio.Stdenv.mono_clock env
method net =
(Eio.Stdenv.net env :> [ `Generic ] Eio.Net.ty Eio.Resource.t)
method secure_random = Eio.Stdenv.secure_random env
end;
sw }swim reaches for no globals: you hand it the clock, monotonic clock, network and entropy explicitly. That makes a node testable against fakes and keeps capability use auditable.
Configuration
(* Start from [default_config] and override what this node needs. *)
let config ~name ~port =
{ Types.default_config with
bind_addr = loopback;
bind_port = port;
node_name = Some name;
cluster_name = "demo" }Start from default_config and override what a node needs: bind_addr (raw 4-byte form, see the note), bind_port, node_name and cluster_name. The failure detector's dials — protocol_interval, probe_timeout, indirect_checks, suspicion_mult, retransmit_mult — also live here; the messaging example tunes them.
Addresses are raw bytes as Eio.Net.Ipaddr.of_raw accepts them — "\127\000\000\001" is 127.0.0.1, not the dotted string. Seed strings use the same form: "
: ".
create and start
let start_node ~sw ~env ~name ~port =
match Cluster.create ~sw ~env ~config:(config ~name ~port) with
| Error `Invalid_key ->
prerr_endline "error: secret key must be exactly 16 bytes";
exit 1
| Ok cluster ->
(* Forks the protocol, UDP and TCP loops as daemon fibers; they
are cancelled automatically when the switch finishes. *)
Cluster.start cluster;
clustercreate validates the configuration (a bad key is Error `Invalid_key before any socket opens); start forks the protocol, UDP and TCP loops as daemon fibers, cancelled automatically when the switch ends.
Joining and membership events
Nodes find each other by joining a seed; thereafter the membership view updates through an event stream.
Joining a seed
(* The seed string is "<addr>:<port>" with the same raw-byte address
form as [bind_addr]. Joining does a TCP push/pull state exchange,
so both sides learn about each other immediately rather than
waiting for gossip. *)
let seed = Printf.sprintf "%s:%d" loopback alpha_port in
(match Cluster.join beta ~seed_nodes:[ seed ] with
| Ok () -> Printf.printf "[beta] joined seed 127.0.0.1:%d\n%!" alpha_port
| Error `No_seeds_reachable ->
prerr_endline "error: no seed nodes reachable";
exit 1);
(* Wait for both membership views to converge. *)
Eio.Fiber.both
(fun () -> await_join ~label:"alpha" alpha)
(fun () -> await_join ~label:"beta" beta);
Cluster.shutdown alpha;
Cluster.shutdown beta;
print_endline "both nodes converged; shut down cleanly"join performs a TCP push/pull state exchange with the seed, so both sides learn about each other immediately rather than waiting a gossip round. `No_seeds_reachable is the error when none answer.
The event stream
(* Block on the node's event stream until it reports a [Join]. The
stream also carries [Leave], [Suspect_event] (a peer missed a probe),
[Alive_event] (a suspect refuted the suspicion) and [Update]. *)
let await_join ~label cluster =
let events = Cluster.events cluster in
let rec loop () =
match Eio.Stream.take events with
| Types.Join node ->
Printf.printf "[%s] saw Join: %s\n%!" label
(Types.node_id_to_string node.Types.id)
| Types.Leave _ | Types.Update _ | Types.Suspect_event _
| Types.Alive_event _ ->
loop ()
in
loop ()Membership changes arrive on an Eio.Stream you drain at your own pace. Join and Leave mark membership; Suspect_event fires when a peer misses a probe; Alive_event when a suspect refutes the suspicion via indirect probes; Update on metadata change. The suspect-then-dead progression is SWIM's accuracy mechanism — a slow node gets a chance to clear its name before being declared dead.
$ dune exec examples/swim_membership/main.exeInspecting the cluster
(* The membership view is queryable at any time: [members] lists every
known node, [find_node] and [is_alive] look one up by id, and
[stats] counts alive/suspect/dead plus message and queue gauges. *)
let print_view ~label cluster =
Printf.printf "[%s] sees %d members:\n%!" label
(Cluster.member_count cluster);
List.iter
(fun (node : Types.node_info) ->
Printf.printf " - %s\n%!" (Types.node_id_to_string node.Types.id))
(Cluster.members cluster);
let stats = Cluster.stats cluster in
Printf.printf "[%s] alive=%d suspect=%d dead=%d\n%!" label
stats.Types.nodes_alive stats.Types.nodes_suspect
stats.Types.nodes_deadThe membership view is queryable at any moment, not just through events: member_count and members list the known nodes, find_node and is_alive look one up by id, and stats reports alive/suspect/dead counts alongside message and queue gauges — enough to drive a health endpoint or a dashboard.
User messaging
On top of membership, a cluster carries application messages two ways.
Direct, point-to-point
(* A direct send is a single UDP packet to one member, addressed by
node id; [`Unknown_node] means the target is not in the local
membership table. *)
let direct = expect_message ~label:"alpha" alpha ~topic:"task" in
let alpha_id = (Cluster.local_node alpha).Types.id in
(match Cluster.send beta ~target:alpha_id ~topic:"task" ~payload:"resize" with
| Ok () -> ()
| Error `Unknown_node ->
prerr_endline "error: alpha is not in beta's membership table";
exit 1);
ignore (Eio.Promise.await direct);send delivers a single UDP packet to one member, addressed by node id; `Unknown_node means the target is not in the local membership table.
Broadcast over gossip
(* A broadcast is queued and piggybacked on the protocol's regular
gossip packets: bandwidth-efficient and eventually delivered to
every node, with no delivery order or latency guarantee. *)
let news = expect_message ~label:"beta" beta ~topic:"news" in
Cluster.broadcast alpha ~topic:"news" ~payload:"deploy finished";
ignore (Eio.Promise.await news);broadcast queues a message that piggybacks on the protocol's regular gossip packets: bandwidth-efficient and eventually delivered to every node, with no ordering or latency guarantee.
Receiving
(* [on_message] registers a handler called with the sender's node_info,
the topic and the payload, for every user message — direct or
broadcast. Senders unknown to the membership table are ignored, so
handlers only ever see cluster members. *)
let expect_message ~label cluster ~topic =
let promise, resolver = Eio.Promise.create () in
Cluster.on_message cluster (fun sender t payload ->
match String.equal t topic && not (Eio.Promise.is_resolved promise) with
| true ->
Printf.printf "[%s] %s from %s: %s\n%!" label topic
(Types.node_id_to_string sender.Types.id)
payload;
Eio.Promise.resolve resolver payload
| false -> ());
promiseon_message registers a handler invoked with the sender's node_info, the topic and the payload, for every user message — direct or broadcast. Senders unknown to the membership table are dropped, so a handler only ever sees real cluster members.
$ dune exec examples/swim_messaging/main.exeTuning the detector
(* The failure-detector dials. A suspect is declared dead after
suspicion_mult * log(n+1) * protocol_interval seconds (capped by
suspicion_max_timeout); each broadcast is retransmitted
retransmit_mult * log(n+1) times before leaving the gossip queue. *)
let config ~name ~port =
{ Types.default_config with
bind_addr = loopback;
bind_port = port;
node_name = Some name;
cluster_name = "messaging-demo";
protocol_interval = 0.2; (* seconds between probe rounds *)
probe_timeout = 0.1; (* direct-ack wait before indirect probes *)
indirect_checks = 3; (* peers asked to probe on our behalf *)
suspicion_mult = 4;
retransmit_mult = 4 }The same config record tunes detection latency against false positives: a suspect becomes dead after roughly suspicion_mult · log(n+1) · protocol_interval seconds, and each broadcast is retransmitted retransmit_mult · log(n+1) times before leaving the queue.
Encryption
(* In swim 0.2.0 the key must be exactly 16 bytes (AES-128-GCM); every
node in the cluster shares it. The verify flags additionally reject
any unencrypted gossip from a misconfigured peer. *)
let shared_key = "araara-demo-key!"
let config ~name ~port =
{ Types.default_config with
bind_addr = loopback;
bind_port = port;
node_name = Some name;
cluster_name = "encrypted-demo";
secret_key = shared_key;
encryption_enabled = true;
gossip_verify_incoming = true;
gossip_verify_outgoing = true }Set a shared secret_key and encryption_enabled and every UDP packet and the TCP state exchange are sealed with AES-GCM; gossip_verify_incoming and gossip_verify_outgoing additionally reject any unencrypted traffic from a misconfigured peer.
(* [Cluster.create] validates the key up front: a secret of the wrong
length is an [Error `Invalid_key] before any socket is touched. *)
let start_node ~sw ~env ~name ~port =
match Cluster.create ~sw ~env ~config:(config ~name ~port) with
| Error `Invalid_key ->
prerr_endline "error: secret key must be exactly 16 bytes";
exit 1
| Ok cluster ->
Cluster.start cluster;
clusterIn swim 0.2.0 the key must be exactly 16 bytes (AES-128-GCM), shared by every node; create returns Error `Invalid_key for any other length before a socket is opened. swim is the membership and transport layer under hive's cluster mode.
$ dune exec examples/swim_encrypted/main.exe