Repodb.Cqrs
repodb · API reference
Read/write connection routing for CQRS-style deployments.
A CQRS pool owns one primary pool and zero or more replica pools. Writes and transactions always go to the primary. Reads can go to healthy replicas using round-robin, random, or least-connections selection; if no replica is available, reads fall back to the primary.
module Db = Repodb.Cqrs.Make (Repodb_postgresql.Driver)
let config =
{ Cqrs.primary_conninfo = "host=primary dbname=app";
primary_max_size = 10;
replica_conninfos = [ "host=replica-1 dbname=app" ];
replica_max_size_each = 10;
replica_selection = Cqrs.RoundRobin;
validate = None }
let list_users db query ~decode =
Db.with_read db (fun conn -> Repo.all_query conn query ~decode)Use with_write or transaction for operations that must observe their own writes. Replica lag is a deployment concern; this module only routes connections and tracks health. Mark replicas unhealthy when health checks or query failures show they should be temporarily skipped.
type replica_selection =
| RoundRobin
| Random
| LeastConnectionstype 'conn config = {
primary_conninfo : string;
primary_max_size : int;
replica_conninfos : string list;
replica_max_size_each : int;
replica_selection : replica_selection;
validate : ('conn -> bool) option;
}type conn_origin = [
| `Primary
| `Replica of int
]type 'conn t = {
primary_pool : 'conn Pool.t;
replica_pools : 'conn Pool.t array option;
replica_healthy : bool Kcas.Loc.t array option;
replica_index : int Kcas.Loc.t;
selection : replica_selection;
conn_origin : ('conn, conn_origin) Kcas_data.Hashtbl.t;
}type intent =
| Read
| Writeval route_query_type : Query.query_type -> intentval has_replicas : 'a t -> boolval count_healthy_replicas : 'a t -> intval next_replica_round_robin : 'a t -> (int * 'a Pool.t) optionval next_replica_random : 'a t -> (int * 'a Pool.t) optionval next_replica_least_connections : 'a t -> (int * 'a Pool.t) optionval select_replica : 'a t -> (int * 'a Pool.t) optionval mark_replica_unhealthy : 'a t -> int -> unitval mark_replica_healthy : 'a t -> int -> unitval acquire_read : 'a t -> ('a, Pool.pool_error) resultval acquire_write : 'a t -> ('a, Pool.pool_error) resultval release_read : 'a t -> 'a -> unitval release_write : 'a t -> 'a -> unitval with_read : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) resultval with_write : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) resultval with_primary : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) resultval with_replica : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) resulttype stats = {
primary : Pool.stats;
replicas : Pool.stats list option;
healthy_replica_count : int;
}val stats : 'a t -> statsval shutdown : 'a t -> unitmodule Make (D : Driver.S) : sig ... end