Repodb.Pool
repodb · API reference
Lock-free connection pool using kcas for thread-safe operations.
Works with any concurrency library (Eio, Lwt, direct-style) because kcas provides lock-free data structures independent of any specific scheduler.
let config =
Pool.
{
max_size = 10;
connect = (fun () -> Repodb_postgresql.connect conninfo);
close = Repodb_postgresql.close;
validate = None;
}
in
let pool = Pool.create config in
Pool.with_connection pool (fun conn -> Repo.all conn ~table:users ~decode)type pool_error =
| Pool_empty
| Pool_closed
| Pool_timeout
| Connection_error of stringtype 'conn config = {
max_size : int;
connect : unit -> ('conn, string) result;
close : 'conn -> unit;
validate : ('conn -> bool) option;
}type 'conn pooled = {
conn : 'conn;
created_at : float;
}type 'conn t = {
config : 'conn config;
available : 'conn pooled Kcas_data.Queue.t;
total_count : int Kcas.Loc.t;
in_use_count : int Kcas.Loc.t;
closed : bool Kcas.Loc.t;
}val create : 'a config -> 'a tval size : 'a t -> intval in_use : 'a t -> intval available : 'a t -> intval is_closed : 'a t -> boolval validate_connection : 'a t -> 'a pooled -> boolval close_pooled : 'a t -> 'a pooled -> unitval try_create_connection : 'a t -> ('a pooled, pool_error) resultval acquire_from_pool : 'a t -> ('a, pool_error) resultval acquire : 'a t -> ('a, pool_error) resultval acquire_blocking : ?timeoutf:float -> 'a t -> ('a, pool_error) resultval release : 'a t -> 'a -> unitval with_connection : 'a t -> ('a -> 'b) -> ('b, pool_error) resultval with_connection_blocking :
?timeoutf:float ->
'a t ->
('a -> 'b) ->
('b, pool_error) resultval drain : 'a t -> unitval shutdown : 'a t -> unittype stats = {
total : int;
available : int;
in_use : int;
closed : bool;
}val stats : 'a t -> statsval error_to_string : pool_error -> stringMulti-server pool with round-robin load balancing.
Distributes connections across multiple database servers without requiring an external load balancer. Each server maintains its own connection pool.
let config =
Pool.
{
servers = [ "host1"; "host2"; "host3" ];
max_size_per_server = 5;
connect = (fun conninfo -> Repodb_postgresql.connect conninfo);
close = Repodb_postgresql.close;
validate = None;
}
in
let multi = Pool.create_multi config in
Pool.with_connection_multi multi (fun conn ->
Repo.all conn ~table:users ~decode)type 'conn multi_config = {
servers : string list;
max_size_per_server : int;
connect : string -> ('conn, string) result;
close : 'conn -> unit;
validate : ('conn -> bool) option;
}type 'conn multi_t = {
pools : 'conn t array;
pool_conninfos : string array;
conn_to_pool : ('conn, int) Kcas_data.Hashtbl.t;
healthy : bool Kcas.Loc.t array;
next_index : int Kcas.Loc.t;
closed : bool Kcas.Loc.t;
}val create_multi : 'conn multi_config -> 'conn multi_tval multi_size : 'a multi_t -> intval multi_in_use : 'a multi_t -> intval multi_available : 'a multi_t -> intval multi_is_closed : 'a multi_t -> boolval multi_server_count : 'a multi_t -> intval multi_is_healthy : 'a multi_t -> int -> boolval mark_unhealthy : 'a multi_t -> int -> unitval mark_healthy : 'a multi_t -> int -> unitval count_healthy : 'a multi_t -> intval next_healthy_index : 'a multi_t -> int optionFind next healthy server using round-robin
val acquire_multi : 'a multi_t -> ('a, pool_error) resultval release_multi : 'a multi_t -> 'a -> unitval with_connection_multi : 'a multi_t -> ('a -> 'b) -> ('b, pool_error) resultval acquire_multi_blocking :
?timeoutf:float ->
'a multi_t ->
('a, pool_error) resultval with_connection_multi_blocking :
?timeoutf:float ->
'a multi_t ->
('a -> 'b) ->
('b, pool_error) resultval drain_multi : 'a multi_t -> unitval shutdown_multi : 'a multi_t -> unittype multi_stats = {
total_servers : int;
healthy_servers : int;
per_server : (string * stats) list;
aggregate : stats;
}val stats_multi : 'a multi_t -> multi_statsmodule Make (D : Driver.S) : sig ... end