Repodb.Pool

repodb · API reference

Lock-free connection pool using kcas for thread-safe operations.

Works with any concurrency library (Eio, Lwt, direct-style) because kcas provides lock-free data structures independent of any specific scheduler.

let config =
  Pool.
    {
      max_size = 10;
      connect = (fun () -> Repodb_postgresql.connect conninfo);
      close = Repodb_postgresql.close;
      validate = None;
    }
in
let pool = Pool.create config in
Pool.with_connection pool (fun conn -> Repo.all conn ~table:users ~decode)
type pool_error = 
  | Pool_empty
  | Pool_closed
  | Pool_timeout
  | Connection_error of string
type 'conn config = {
  max_size : int;
  connect : unit -> ('conn, string) result;
  close : 'conn -> unit;
  validate : ('conn -> bool) option;
}
type 'conn pooled = {
  conn : 'conn;
  created_at : float;
}
type 'conn t = {
  config : 'conn config;
  available : 'conn pooled Kcas_data.Queue.t;
  total_count : int Kcas.Loc.t;
  in_use_count : int Kcas.Loc.t;
  closed : bool Kcas.Loc.t;
}
val create : 'a config -> 'a t
val size : 'a t -> int
val in_use : 'a t -> int
val available : 'a t -> int
val is_closed : 'a t -> bool
val validate_connection : 'a t -> 'a pooled -> bool
val close_pooled : 'a t -> 'a pooled -> unit
val try_create_connection : 'a t -> ('a pooled, pool_error) result
val acquire_from_pool : 'a t -> ('a, pool_error) result
val acquire : 'a t -> ('a, pool_error) result
val acquire_blocking : ?timeoutf:float -> 'a t -> ('a, pool_error) result
val release : 'a t -> 'a -> unit
val with_connection : 'a t -> ('a -> 'b) -> ('b, pool_error) result
val with_connection_blocking : 
  ?timeoutf:float ->
  'a t ->
  ('a -> 'b) ->
  ('b, pool_error) result
val drain : 'a t -> unit
val shutdown : 'a t -> unit
type stats = {
  total : int;
  available : int;
  in_use : int;
  closed : bool;
}
val stats : 'a t -> stats
val error_to_string : pool_error -> string

Multi-server pool with round-robin load balancing.

Distributes connections across multiple database servers without requiring an external load balancer. Each server maintains its own connection pool.

let config =
  Pool.
    {
      servers = [ "host1"; "host2"; "host3" ];
      max_size_per_server = 5;
      connect = (fun conninfo -> Repodb_postgresql.connect conninfo);
      close = Repodb_postgresql.close;
      validate = None;
    }
in
let multi = Pool.create_multi config in
Pool.with_connection_multi multi (fun conn ->
    Repo.all conn ~table:users ~decode)
type 'conn multi_config = {
  servers : string list;
  max_size_per_server : int;
  connect : string -> ('conn, string) result;
  close : 'conn -> unit;
  validate : ('conn -> bool) option;
}
type 'conn multi_t = {
  pools : 'conn t array;
  pool_conninfos : string array;
  conn_to_pool : ('conn, int) Kcas_data.Hashtbl.t;
  healthy : bool Kcas.Loc.t array;
  next_index : int Kcas.Loc.t;
  closed : bool Kcas.Loc.t;
}
val create_multi : 'conn multi_config -> 'conn multi_t
val multi_size : 'a multi_t -> int
val multi_in_use : 'a multi_t -> int
val multi_available : 'a multi_t -> int
val multi_is_closed : 'a multi_t -> bool
val multi_server_count : 'a multi_t -> int
val multi_is_healthy : 'a multi_t -> int -> bool
val mark_unhealthy : 'a multi_t -> int -> unit
val mark_healthy : 'a multi_t -> int -> unit
val count_healthy : 'a multi_t -> int
val next_healthy_index : 'a multi_t -> int option

Find next healthy server using round-robin

val acquire_multi : 'a multi_t -> ('a, pool_error) result
val release_multi : 'a multi_t -> 'a -> unit
val with_connection_multi : 'a multi_t -> ('a -> 'b) -> ('b, pool_error) result
val acquire_multi_blocking : 
  ?timeoutf:float ->
  'a multi_t ->
  ('a, pool_error) result
val with_connection_multi_blocking : 
  ?timeoutf:float ->
  'a multi_t ->
  ('a -> 'b) ->
  ('b, pool_error) result
val drain_multi : 'a multi_t -> unit
val shutdown_multi : 'a multi_t -> unit
type multi_stats = {
  total_servers : int;
  healthy_servers : int;
  per_server : (string * stats) list;
  aggregate : stats;
}
val stats_multi : 'a multi_t -> multi_stats
module Make (D : Driver.S) : sig ... end