Repodb.Cqrs

repodb · API reference

Read/write connection routing for CQRS-style deployments.

A CQRS pool owns one primary pool and zero or more replica pools. Writes and transactions always go to the primary. Reads can go to healthy replicas using round-robin, random, or least-connections selection; if no replica is available, reads fall back to the primary.

module Db = Repodb.Cqrs.Make (Repodb_postgresql.Driver)

  let config =
    { Cqrs.primary_conninfo = "host=primary dbname=app";
      primary_max_size = 10;
      replica_conninfos = [ "host=replica-1 dbname=app" ];
      replica_max_size_each = 10;
      replica_selection = Cqrs.RoundRobin;
      validate = None }

  let list_users db query ~decode =
    Db.with_read db (fun conn -> Repo.all_query conn query ~decode)

Use with_write or transaction for operations that must observe their own writes. Replica lag is a deployment concern; this module only routes connections and tracks health. Mark replicas unhealthy when health checks or query failures show they should be temporarily skipped.

type replica_selection = 
  | RoundRobin
  | Random
  | LeastConnections
type 'conn config = {
  primary_conninfo : string;
  primary_max_size : int;
  replica_conninfos : string list;
  replica_max_size_each : int;
  replica_selection : replica_selection;
  validate : ('conn -> bool) option;
}
type conn_origin = [ 
  | `Primary
  | `Replica of int
 ]
type 'conn t = {
  primary_pool : 'conn Pool.t;
  replica_pools : 'conn Pool.t array option;
  replica_healthy : bool Kcas.Loc.t array option;
  replica_index : int Kcas.Loc.t;
  selection : replica_selection;
  conn_origin : ('conn, conn_origin) Kcas_data.Hashtbl.t;
}
type intent = 
  | Read
  | Write
val route_query_type : Query.query_type -> intent
val has_replicas : 'a t -> bool
val count_healthy_replicas : 'a t -> int
val next_replica_round_robin : 'a t -> (int * 'a Pool.t) option
val next_replica_random : 'a t -> (int * 'a Pool.t) option
val next_replica_least_connections : 'a t -> (int * 'a Pool.t) option
val select_replica : 'a t -> (int * 'a Pool.t) option
val mark_replica_unhealthy : 'a t -> int -> unit
val mark_replica_healthy : 'a t -> int -> unit
val acquire_read : 'a t -> ('a, Pool.pool_error) result
val acquire_write : 'a t -> ('a, Pool.pool_error) result
val release_read : 'a t -> 'a -> unit
val release_write : 'a t -> 'a -> unit
val with_read : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) result
val with_write : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) result
val with_primary : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) result
val with_replica : 'a t -> ('a -> 'b) -> ('b, Pool.pool_error) result
type stats = {
  primary : Pool.stats;
  replicas : Pool.stats list option;
  healthy_replica_count : int;
}
val stats : 'a t -> stats
val shutdown : 'a t -> unit
module Make (D : Driver.S) : sig ... end