repodb — the data layer
repodb is the data layer of the stack: typed schemas and fields, changesets that validate input before it touches the database, a composable query DSL, transactions and Multi, associations with batched preloading, a connection pool, read/write splitting, typed errors, and a raw-SQL escape hatch for the cases the DSL cannot express — all dialect-aware across SQLite, PostgreSQL and MariaDB. Every snippet below is cut from a complete program under the site repository's examples/ directory, and the runnable ones execute in the test suite against an in-memory database.
The shape of the data layer
repodb separates four concerns: a schema describes a table's columns as typed fields; a changeset validates and tracks changes before they persist; the query DSL builds statements as composable values; and a Repo — the driver-bound module you get by applying Repodb.Repo.Make to a backend — runs them. The flow of a write is always the same: parse input into a changeset, validate, then execute a query. Reads go straight through the query DSL.
Choosing the driver is one functor application — Repodb.Repo.Make (Repodb_sqlite) here, Repodb_postgresql or Repodb_mariadb in production. The field, changeset and query code above it is identical across dialects; only the generated SQL differs, and repodb writes that for you. When the DSL cannot express a query, a raw-SQL escape hatch is available — covered near the end of this page — but it is the exception, not the habit.
Drivers and connecting
repodb ships three drivers — Repodb_sqlite, Repodb_postgresql and Repodb_mariadb — and you pick one by applying Repodb.Repo.Make to it. They share the same Repo interface, so the schema, changeset and query code is identical; what differs is how you connect and a few dialect details the driver renders for you. Each exposes connect : string -> (conn, error) result and error_message, so connection handling looks the same across all three.
SQLite — embedded, no server
(* SQLite is embedded: the connection string is a file path, or
":memory:" for a throwaway database. No server, no network, no TLS. *)
let connect_sqlite () =
match Repodb_sqlite.connect ":memory:" with
| Ok conn ->
print_endline "sqlite: connected to :memory:";
Repodb_sqlite.close conn
| Error e -> Printf.printf "sqlite: %s\n" (Repodb_sqlite.error_message e)SQLite runs in-process against a file; the connection string is just a path, or ":memory:" for an ephemeral database (what this site's tests use). There is no network and therefore no TLS. It is the right default for development, tests, small single-node deployments and desktop apps — one file you can copy. The trade-offs are SQLite's: one writer at a time, and no separate server to scale independently.
PostgreSQL — and TLS
(* PostgreSQL takes a libpq connection string, passed through verbatim —
so every libpq parameter works, TLS included. sslmode=require encrypts
the connection; sslmode=verify-full plus sslrootcert also authenticates
the server against a CA. connect_timeout bounds the attempt. *)
let connect_postgres () =
let conninfo =
"host=db.example.invalid port=5432 dbname=app user=app password=secret \
sslmode=verify-full sslrootcert=/etc/ssl/certs/ca.pem connect_timeout=2"
in
match Repodb_postgresql.connect conninfo with
| Ok conn ->
print_endline "postgres: connected over TLS";
Repodb_postgresql.close conn
| Error e ->
Printf.printf "postgres: no server in this demo (%s)\n"
(Repodb_postgresql.error_message e)The PostgreSQL driver takes a libpq connection string and passes it through unchanged, so every libpq parameter is available — including transport security. sslmode=require encrypts the connection; sslmode=verify-full together with sslrootcert (and sslcert / sslkey for client certificates) also authenticates the server against a CA, which is what you want across an untrusted network. It is the production default: concurrent writers, rich types, and replicas for read scaling.
$ dune exec examples/repodb_connect/main.exeMariaDB / MySQL
(* MariaDB/MySQL takes a key=value string parsed by the driver: host,
port, user, password, db, socket, charset. A unix socket avoids TCP
altogether; the driver does not forward SSL options, so for TLS over
TCP terminate it at a proxy in front of the database. *)
let connect_mariadb () =
let conninfo =
"host=127.0.0.1 port=3307 user=app password=secret db=app \
charset=utf8mb4"
in
match Repodb_mariadb.connect conninfo with
| Ok conn ->
print_endline "mariadb: connected";
Repodb_mariadb.close conn
| Error e ->
Printf.printf "mariadb: no server in this demo (%s)\n"
(Repodb_mariadb.error_message e)The MariaDB driver parses its own key=value string — host, port, user, password, db, socket, charset — and connects over TCP or, with socket=, a local unix socket that skips the network entirely. One caveat to know: this driver does not forward TLS options, so for an encrypted connection over an untrusted network put a TLS-terminating proxy in front of the database (or use the unix socket when the app and database share a host).
Choosing
A common path is SQLite in development and tests and PostgreSQL in production — and because only the driver and the connection string change, the same models move between them unedited. Open the connection (or the pool) once in the application's supervision tree and hand it to your contexts; never connect from a controller.
Schemas and typed fields
(* The record rows map to, and typed fields tying accessors to columns. *)
type user = { id : int; name : string; email : string; age : int }
let users_table = Repodb.Schema.table "users"
let field name ty get set =
Field.make ~table_name:"users" ~name ~ty ~get ~set ()
let name_field =
field "name" Types.string (fun u -> u.name) (fun v u -> { u with name = v })
let email_field =
field "email" Types.string
(fun u -> u.email)
(fun v u -> { u with email = v })
let age_field =
field "age" Types.int (fun u -> u.age) (fun v u -> { u with age = v })A Field.make value packages everything about one column: its table, name, repodb type, and the getter and setter into your record. Every later layer — changesets, query expressions, inserts — is expressed in terms of these field values, which is how the compiler knows that age compares against an int and email casts from a string. In an araara application the fields live in the model file, one model per table.
Migrations
(* A migration describes the table with portable typed columns. *)
let create_users =
let open Repodb.Migration in
migration ~version:1L ~name:"create_users" ~down:[ drop_table "users" ]
~up:
[
create_table "users"
[
typed_column "id" Types.int ~primary_key:true;
typed_column "name" Types.string ~nullable:false;
typed_column "email" Types.string ~nullable:false ~unique:true;
typed_column "age" Types.int ~nullable:false;
];
]A migration carries a monotonic version, a name, and up/down operations built from portable typed columns — primary keys, nullability, uniqueness, defaults, foreign keys. Repo.Ddl renders the dialect-specific SQL from these definitions; you never write CREATE TABLE by hand. The down operations make a migration reversible, and the version is how the migration runner knows what is already applied.
Prefer the typed API: build DDL and queries through migrations and the query DSL, not string concatenation — that is what keeps every backend honest and your code injection-safe. The raw-SQL escape hatch exists for the genuine exceptions, and the section below shows how to use it without giving up parameter binding.
Changesets: validate before you persist
A changeset takes untrusted input — form parameters, a decoded JSON body — casts only the fields you name through their types, then layers validations. An invalid changeset carries structured errors your controller renders next to the offending fields; only a valid one proceeds to a write. This is parse-don't-validate applied to persistence: by the time data reaches a query it is already known good.
Cast
(* [create] seeds a changeset with a base record; [cast] copies only the
whitelisted fields out of untrusted params (extra keys are dropped). *)
let cast_params params =
Changeset.create empty
|> Changeset.cast params
~fields:
[
name_field;
email_field;
age_field;
role_field;
password_field;
password_confirmation_field;
]create starts from a record; cast pulls the named parameters through their fields, ignoring anything you did not list — a mass-assignment guard by default. get_change and changes inspect what was actually set.
The validators
(* A pipeline of validators. Each only fires when its field was cast, so
missing optional fields stay silent while required ones are enforced. *)
let validate cs =
cs
|> Changeset.validate_required [ name_field; email_field; age_field ]
|> Changeset.validate_format email_field ~pattern:"^[^@]+@[^@]+$"
|> Changeset.validate_length name_field ~min:2 ~max:40
|> Changeset.validate_number age_field ~greater_than_or_equal:18
~less_than_or_equal:120
|> Changeset.validate_inclusion role_field ~values:[ "admin"; "member" ]
|> Changeset.validate_confirmation password_field
~confirmation_field:password_confirmation_fieldrepodb ships a full set, each appending to the changeset's error list and composing with |>: validate_required, validate_format, validate_length (min / max / is), validate_number (greater_than, less_than, and the _or_equal bounds), validate_inclusion, validate_exclusion, validate_acceptance, validate_confirmation (a field against its confirmation), and validate_change for an arbitrary predicate. unique_constraint, foreign_key_constraint and check_constraint declare database constraints the changeset maps back to friendly errors when the insert fails.
Errors
(* A bad submission accumulates every failure; nothing is persisted.
[is_valid] gates the write, [error_messages] reports field + reason. *)
let bad =
cast_params
[
("name", "A");
("email", "not-an-email");
("age", "9");
("role", "superuser");
("password", "s3cret");
("password_confirmation", "typo");
]
|> validate
in
if Changeset.is_valid bad then die "expected the bad changeset to be invalid"
else begin
Printf.printf "rejected (%d errors):\n" (List.length (Changeset.errors bad));
List.iter (fun m -> Printf.printf " - %s\n" m) (Changeset.error_messages bad)
end;is_valid gates the write; errors and error_messages give the structured and human-readable failures. A controller turns them into a 422 with the messages beside the form fields — the same changeset backs the HTML and JSON paths.
$ dune exec examples/repodb_changesets/main.exeType-safe inserts
(* Insert: the compiler checks each column's type against its value. *)
let insert name email age =
Query.insert_into users_table
|> Repodb.Query_values.values3
(name_field, email_field, age_field)
(Expr.string name, Expr.string email, Expr.int age)
in
List.iter
(fun (name, email, age) ->
match Repo.insert_query conn (insert name email age) with
| Ok () -> Printf.printf "inserted: %s, age %d\n" name age
| Error e -> die ("insert: " ^ Repodb.Error.show_db_error e))
[
(change name_field, change email_field, 34);
("Bob", "bob@example.com", 17);
("Carol", "carol@example.com", 41);
];Query_values.valuesN pairs N fields with N expressions, and the compiler checks each column's type against its value — Expr.string into a string column, Expr.int into an int column. A mismatch is a build error, not a runtime surprise from the driver. insert_query runs it; insert_returning_query brings back generated columns like the new id.
The query DSL
Queries are values, so fragments compose like any other values and nothing executes until you hand the query to the repo. Expressions (module Expr) reference the typed fields, so a comparison is checked the same way an insert is.
Filtering
(* [where] starts the predicate; [and_where] conjoins, [or_where]
disjoins. Operators live in Expr and are used under a local open. *)
let adults_in_paris =
Query.from users_table
|> Query.where Expr.(column city_field = string "Paris")
|> Query.and_where Expr.(column age_field >= int 18)
in
run "adults in Paris" (Repo.all_query conn adults_in_paris ~decode:decode_user);from opens a query; where adds a predicate; and_where / or_where combine them. Expr provides the comparison and logical operators over column / int / string literals.
Ordering, limit and offset
(* [order_by] (or the [asc]/[desc] shorthands) plus [limit]/[offset]
give deterministic paging - here the second page of one, by age. *)
let oldest_page2 =
Query.from users_table
|> Query.desc (Expr.column age_field)
|> Query.limit 1 |> Query.offset 1
in
run "by age desc, page 2 (limit 1 offset 1)"
(Repo.all_query conn oldest_page2 ~decode:decode_user);order_by with asc / desc sorts; limit and offset page. distinct dedupes; the join family (left_join, inner_join, …) takes an ~on expression for queries that span tables.
Grouping and aggregates
(* [group_by] with an aggregate from Expr ([count] here, typed int64).
[distinct] dedupes a projection. Decode the aggregate columns by hand. *)
let by_city =
Query.from users_table
|> Query.select Expr.[ column city_field; count (column age_field) ]
|> Query.group_by Expr.[ column city_field ]
|> Query.desc (Expr.count (Expr.column age_field))
in
(match
Repo.all_query conn by_city ~decode:(fun row ->
(Repodb.Driver.row_text row 0, Repodb.Driver.row_int64 row 1))
with
| Ok rows ->
Printf.printf "users per city:\n";
List.iter
(fun (city, n) -> Printf.printf " %s: %Ld\n" city n)
rows
| Error e -> die ("aggregate: " ^ Repodb.Error.show_db_error e));
let distinct_cities =
Query.from users_table
|> Query.select Expr.[ column city_field ]
|> Query.distinct
|> Query.asc (Expr.column city_field)
in
(match
Repo.all_query conn distinct_cities ~decode:(fun row ->
Repodb.Driver.row_text row 0)
with
| Ok cities -> Printf.printf "distinct cities: %s\n" (String.concat ", " cities)
| Error e -> die ("distinct: " ^ Repodb.Error.show_db_error e));group_by plus the aggregate expressions (count, sum, avg, min, max) and having build summary queries. count is typed int64, so its column decodes with Driver.row_int64.
Updates and deletes
(* [update] with [set] (typed column := expr) and a [where]; [delete_from]
with a predicate. Both return [unit result] via the Repo. *)
let promote_bob =
Query.update users_table
|> Query.set age_field (Expr.int 18)
|> Query.where Expr.(column name_field = string "Bob")
in
(match Repo.update_query conn promote_bob with
| Ok () -> print_endline "updated: Bob's age set to 18"
| Error e -> die ("update: " ^ Repodb.Error.show_db_error e));
let drop_minors =
Query.delete_from users_table
|> Query.where Expr.(column age_field < int 18)
in
(match Repo.delete_query conn drop_minors with
| Ok () -> print_endline "deleted: rows with age < 18"
| Error e -> die ("delete: " ^ Repodb.Error.show_db_error e));update / delete_from with set and where build mutations the same composable way; update_query and Repo.update / Repo.delete run them. on_conflict_do_nothing and on_conflict_do_update express upserts.
$ dune exec examples/repodb_queries/main.exeReading rows
(* Decode one row of [SELECT *] (column order: id, name, email, age). *)
let decode_user row =
let open Repodb.Driver in
{
id = row_int row 0;
name = row_text row 1;
email = row_text row 2;
age = row_int row 3;
}A decode function maps a driver row into your record by column position, using Driver.row_int / row_text / row_int64 / row_float and their nullable variants. all_query runs a query and decodes every row; one_query and one_query_opt expect a single row; for a table by primary key, Repo.get and Repo.all skip the query DSL entirely. Large result sets stream rather than materialize — see the streaming support in repodb's Stream module.
$ dune exec examples/repodb_quickstart/main.exeTransactions
Repo.transaction conn f runs f inside a database transaction: it commits when f returns Ok and rolls back when f returns Error or raises. Either every write lands or none does.
Commit
(* A transaction with two writes: both land atomically on COMMIT. *)
let committed =
Repo.transaction conn (fun conn ->
match insert conn "Alice" 100 with
| Error e -> Error e
| Ok () -> insert conn "Bob" 250)
in
(match committed with
| Ok () -> Printf.printf "commit: ok, rows=%d (expect 2)\n" (count conn)
| Error e -> die ("commit: " ^ Repodb.Error.show_db_error e));Rollback
(* This transaction writes a row, then returns [Error]; repodb rolls the
whole unit back, so the write is NOT visible afterwards. *)
let before = count conn in
let rolled_back =
Repo.transaction conn (fun conn ->
match insert conn "Carol" 999 with
| Error e -> Error e
| Ok () -> Error (Repodb.Error.Query_failed "deliberate abort"))
in
(match rolled_back with
| Ok () -> die "rollback: transaction unexpectedly committed"
| Error e ->
Printf.printf "rollback: aborted (%s)\n" (Repodb.Error.show_db_error e);
let after = count conn in
Printf.printf "rollback: rows before=%d after=%d (expect equal)\n" before
after;
if before <> after then die "rollback: row leaked past abort");An aborting transaction leaves the database exactly as it was — the example proves it by comparing row counts before and after. In an araara context, the one-transaction-per-write rule means a public context function owns its transaction, and post-commit effects (pubsub, jobs) fire after it returns.
$ dune exec examples/repodb_transactions/main.exeMulti: composing operations
When several writes belong together, Multi composes them into one named, atomic pipeline executed in a single transaction — clearer than nesting calls inside Repo.transaction, and the results are addressable by name.
Building the pipeline
(* Compose three named steps; nothing runs until [execute]. The first
step keeps its inserted row (RETURNING) so we can read it back. *)
let pipeline =
Multi.empty
|> Multi.insert_returning "alice" ~table:accounts_table
~columns:[ "owner"; "balance" ]
~values:Driver.Value.[ text "Alice"; int 100 ]
|> Multi.insert "bob" ~table:accounts_table
~columns:[ "owner"; "balance" ]
~values:Driver.Value.[ text "Bob"; int 250 ]
|> Multi.update "raise_alice" ~table:accounts_table ~columns:[ "balance" ]
~values:Driver.Value.[ int 175 ]
~where_column:"owner" ~where_value:(Driver.Value.text "Alice")
inThe builders live on Repodb.Multi — empty, insert, insert_returning, update, delete, and run / run_unit / run_row for an arbitrary step — and chain with |>. Each step is named so its result can be read back.
Executing it
(* Run the whole pipeline atomically. On failure [multi_error] names the
offending step and carries the results completed before it. *)
let results =
match Multi_run.execute conn pipeline with
| Ok results ->
Printf.printf "exec: committed %d steps\n"
(List.length (Multi.names pipeline));
results
| Error { failed_operation; error; _ } ->
die
(Printf.sprintf "exec: step %S failed: %s" failed_operation
(Repodb.Error.show_db_error error))
inThe runner is the functor Repodb.Multi.Make (Repodb_sqlite); its execute conn pipeline runs the whole thing atomically and returns the collected results or the failed operation. (The builders are top-level; only execute comes from the functor.)
Reading results back
(* Read named outputs back from the map: a decoded RETURNING row, and a
unit step that simply confirms it ran. The update is visible too. *)
let alice = Multi.get_row_exn results "alice" ~decode:decode_account in
Multi.get_unit_exn results "bob";
Printf.printf "results: alice #%d inserted as %s with balance %d\n" alice.id
alice.owner alice.balance;
(match Repo.get conn ~table:accounts_table ~id:alice.id ~decode:decode_account
with
| Ok a -> Printf.printf "results: alice balance now %d (expect 175)\n" a.balance
| Error e -> die ("reload: " ^ Repodb.Error.show_db_error e));get_row / get_rows / get_unit (and their _exn and _raw variants) pull a named step's result, decoding a RETURNING row with your own decoder.
$ dune exec examples/repodb_multi/main.exeAssociations and preloading
Associations describe how tables relate; preloading loads the related rows for a set of owners in one batched query instead of one query per owner — the cure for N+1.
Declaring associations
(* An author has_many books; the foreign key lives on the books table.
The inverse: a book belongs_to one author via that same column. *)
let books_assoc =
Assoc.has_many "books" ~related_table:"books" ~foreign_key:"author_id" ()
let _author_assoc =
Assoc.belongs_to "author" ~related_table:"authors" ~foreign_key:"author_id" ()belongs_to, has_one, has_many and many_to_many name the relationship, the related table, and the foreign key (plus the join table and keys for many-to-many).
Preloading in one query
(* Load the owners, then preload all their books in ONE batched query
(SELECT * FROM books WHERE author_id IN (...)) - no per-author query. *)
let authors =
match Repo.all_query conn (Query.from authors_table) ~decode:decode_author with
| Ok authors -> authors
| Error e -> die ("load authors: " ^ Repodb.Error.show_db_error e)
in
let authors =
match
Repo.preload_has_many conn authors ~assoc:books_assoc
~get_owner_id:(fun a -> a.id)
~decode_related:decode_book
~get_fk:(fun b -> b.author_id)
~set_assoc:(fun books a -> { a with books = Assoc.Loaded books })
with
| Ok authors -> authors
| Error e -> die ("preload books: " ^ Repodb.Error.show_db_error e)
inRepo.preload_has_many (and preload_has_one, preload_belongs_to, preload_many_to_many) takes the owners, the association, and functions to read the owner id, decode a related row and attach the result. It issues a single SELECT … WHERE fk IN (…) for all owners at once.
Using the result
(* Each owner now carries its preloaded children. *)
List.iter
(fun a ->
let books =
match a.books with Assoc.Loaded bs -> bs | Assoc.NotLoaded -> []
in
Printf.printf "%s (%d books)\n" a.name (List.length books);
List.iter (fun b -> Printf.printf " - %s\n" b.title) books)
authors;Each owner comes back with its related rows attached, so rendering a list of authors with their books is one preload and a fold — no query inside the loop. preload_chunked handles owner sets too large for a single IN clause.
$ dune exec examples/repodb_assoc/main.exeThe connection pool
A pool hands connections to fibers without opening one per request. It is lock-free and scheduler-agnostic, so it fits Eio's structured concurrency without blocking the event loop.
Creating a pool
(* A temp-file DB so every pooled connection sees the same data
(a pool over ":memory:" would give each connection its own DB). *)
let db_path = Filename.temp_file "repodb_pool" ".sqlite3" in
let pool = Pool.create ~max_size:4 ~conninfo:db_path () inPool.Make (Repodb_sqlite).create ~max_size ~conninfo () opens the pool against a connection string. (A pooled SQLite demo uses a file path rather than :memory:, so the connections share data.)
Borrowing a connection
(* Borrow a connection to write, and another to read. with_connection
returns the connection to the pool automatically. *)
let insert name =
Query.insert_into widgets_table
|> Repodb.Query_values.values1 widget_name (Expr.string name)
in
let write_result =
Pool.with_connection pool (fun conn ->
List.fold_left
(fun acc name ->
match acc with
| Error _ -> acc
| Ok () -> Repo.insert_query conn (insert name))
(Ok ()) [ "sprocket"; "flange"; "grommet" ])
in
(match write_result with
| Ok (Ok ()) -> print_endline "inserted: 3 widgets"
| Ok (Error e) -> die ("insert: " ^ Repodb.Error.show_db_error e)
| Error e -> die ("pool: " ^ pool_error e));
let read_result =
Pool.with_connection pool (fun conn ->
Repo.all_query conn (Query.from widgets_table) ~decode:decode_widget)
in
(match read_result with
| Ok (Ok widgets) ->
Printf.printf "read back %d widgets:\n" (List.length widgets);
List.iter (fun w -> Printf.printf " #%d %s\n" w.id w.name) widgets
| Ok (Error e) -> die ("query: " ^ Repodb.Error.show_db_error e)
| Error e -> die ("pool: " ^ pool_error e));with_connection borrows a connection for the duration of the callback and returns it to the pool afterwards, even on error; with_connection_blocking waits with an optional timeout when the pool is exhausted.
Stats and shutdown
(* Both borrowed connections were returned, so they sit idle and
available; nothing is in use between calls. *)
let s = Pool.stats pool in
Printf.printf "pool stats: total=%d available=%d in_use=%d closed=%b\n"
s.total s.available s.in_use s.closed;
Printf.printf " size=%d available=%d in_use=%d\n" (Pool.size pool)
(Pool.available pool) (Pool.in_use pool);
(* Gracefully close the pool and all its connections. *)
Pool.shutdown pool;
Printf.printf "after shutdown: closed=%b\n" (Pool.is_closed pool);stats, size, available and in_use report the pool's state for health checks; shutdown drains and closes it. In a full application the pool is opened once in the supervision tree and every context borrows from it.
$ dune exec examples/repodb_pool/main.exeRaw SQL: the escape hatch
The query DSL is the default because it is type-checked and dialect-portable. But some SQL it cannot express — a window function, a recursive CTE, a database-specific operator, a hand-tuned query you want verbatim. For those cases the Repo exposes three functions that run a SQL string directly: exec_sql for statements that return nothing, query_sql for many rows, and query_one_sql for at most one. They are an escape hatch, not a backdoor: parameters still bind positionally through Driver.Value, so you never interpolate user input into the string.
Running a statement
(* exec_sql runs a statement that returns no rows. Parameters are
Driver.Value.t, bound positionally; SQLite uses ? placeholders
(the placeholder syntax is dialect-specific — one reason raw SQL
ties you to a backend and is an escape hatch, not the default). *)
let exec sql params =
match Repo.exec_sql conn sql ~params with
| Ok () -> ()
| Error e -> die (Repodb.Error.show_db_error e)
in
exec "CREATE TABLE metrics (name TEXT NOT NULL, hits INTEGER NOT NULL)" [||];
exec "INSERT INTO metrics (name, hits) VALUES (?, ?)"
[| Value.text "home"; Value.int 10 |];
exec "INSERT INTO metrics (name, hits) VALUES (?, ?)"
[| Value.text "docs"; Value.int 25 |];exec_sql takes the SQL and a Driver.Value.t array bound to the statement's placeholders — Value.text, Value.int and friends. The placeholder syntax is the database's own (? for SQLite and MariaDB, $1 for PostgreSQL), which is the catch: a raw query is written for one dialect, where the typed DSL would render all three. That is the trade you are making each time you reach for it.
Reading rows back
(* query_sql returns raw rows; decode them by position with the
Driver.row_* accessors, exactly as a query-DSL decoder would. *)
let sql = "SELECT name, hits FROM metrics WHERE hits > ? ORDER BY hits DESC" in
(match Repo.query_sql conn sql ~params:[| Value.int 5 |] with
| Ok rows ->
Printf.printf "%d busy page(s):\n" (List.length rows);
List.iter
(fun row ->
Printf.printf " %s: %d hits\n"
(Repodb.Driver.row_text row 0)
(Repodb.Driver.row_int row 1))
rows
| Error e -> die (Repodb.Error.show_db_error e));query_sql returns the driver's rows; you decode them by column position with the same Driver.row_* accessors a DSL decoder uses, so the read side looks identical whether the query came from the DSL or a string. Keep the SQL in the model alongside the typed queries, parameterize every value, and treat each raw query as a small, documented exception.
$ dune exec examples/repodb_raw/main.exeDialects: one model, three databases
The same migration and the same queries render to backend-specific SQL because the dialect lives in the driver, not in your code. Repo.Ddl.generate_up_sql is a pure function of a migration — no connection needed — so you can render a migration for every backend and read exactly what each one will run.
One migration
let create_users =
let open Repodb.Migration in
migration ~version:1L ~name:"create_users" ~down:[ drop_table "users" ]
~up:
[
create_table "users"
[
typed_column "id" Types.int ~primary_key:true;
typed_column "email" Types.string ~nullable:false ~unique:true;
typed_column "active" Types.bool ~nullable:false;
typed_column "score" Types.float ~nullable:false;
typed_column "created_at" Types.ptime ~nullable:false;
typed_column "metadata" Types.json ~nullable:true;
];
]The migration is written once, in portable typed columns. Nothing in it names a SQL type or a dialect.
Three renderings
(* generate_up_sql is a pure function of the migration; each driver's Ddl
renders its own dialect. Note how the portable column types map to
different SQL: bool is INTEGER on SQLite but BOOLEAN elsewhere, float
is REAL / DOUBLE PRECISION / DOUBLE, a timestamp is TEXT / TIMESTAMPTZ /
DATETIME, and json is TEXT / JSONB / JSON. *)
let show label sqls =
Printf.printf "-- %s --\n" label;
List.iter (fun s -> Printf.printf "%s\n" s) sqls;
print_newline ()
let () =
show "sqlite" (Sqlite.Ddl.generate_up_sql create_users);
show "postgresql" (Postgres.Ddl.generate_up_sql create_users);
show "mariadb" (Mariadb.Ddl.generate_up_sql create_users)Each driver's Ddl maps the portable types to its own SQL: a bool is INTEGER on SQLite but BOOLEAN on PostgreSQL and MariaDB; a float is REAL, DOUBLE PRECISION and DOUBLE respectively; a timestamp is TEXT, TIMESTAMPTZ and DATETIME; json is TEXT, JSONB and JSON. You never see these tables — you declare Types.bool and the driver picks the column type — but generate_up_sql lets you confirm what ships. The query DSL renders the same way at runtime, which is why a model moves between SQLite in development and PostgreSQL in production unedited.
$ dune exec examples/repodb_dialects/main.exeErrors are values
repodb never raises for an expected database failure. Every operation returns a result whose error side is a Repodb.Error.db_error — a closed set of variants (Constraint_violation, Not_found, Query_failed, Transaction_failed, Validation_failed, Timeout, Pool_exhausted, and a few more) you match on. The absence of a row, a violated constraint, a dead connection: each is a value the type system makes you handle.
A violated constraint
(* Inserting a duplicate email violates the UNIQUE constraint. The error
comes back as a typed db_error variant you match on — no grepping a
message string. How precise the variant is depends on the driver:
PostgreSQL surfaces a detailed message that repodb parses into
Constraint_violation; SQLite reports only "CONSTRAINT", so it arrives
as Query_failed. Matching both arms keeps the code portable. *)
let insert email =
Repo.exec_sql conn "INSERT INTO users (email) VALUES (?)"
~params:[| Value.text email |]
in
(match insert "ada@example.com" with
| Ok () -> die "expected the duplicate to be rejected"
| Error (Repodb.Error.Constraint_violation { constraint_name; _ }) ->
Printf.printf "rejected duplicate (Constraint_violation: %s)\n"
constraint_name
| Error (Repodb.Error.Query_failed msg) ->
Printf.printf "rejected duplicate (Query_failed: %s)\n" msg
| Error e -> die ("unexpected: " ^ Repodb.Error.show_db_error e));How precise the variant is depends on the driver. PostgreSQL reports a detailed message that repodb parses into Constraint_violation with the constraint name; SQLite reports only "CONSTRAINT", so the same failure arrives as Query_failed. Matching both arms keeps the code portable — and this is exactly what a changeset's unique_constraint / foreign_key_constraint declarations build on to turn a failed insert into a friendly field error.
A missing row
(* A read for a row that does not exist is Error Not_found, distinct
from a row that decodes to None — the absence is in the type. *)
let users = Repodb.Schema.table "users" in
let decode row = Repodb.Driver.row_text row 1 in
(match Repo.get conn ~table:users ~id:999 ~decode with
| Ok email -> Printf.printf "found %s\n" email
| Error Repodb.Error.Not_found -> print_endline "no user #999 (Not_found)"
| Error e -> die (Repodb.Error.show_db_error e));Repo.get returns Error Not_found when the id matches nothing — distinct from a row that decodes to None. The absence is in the type, so a controller can map Not_found to a 404 without inspecting any message string. (Use Repo.get_opt when "no row" is an ordinary Ok None rather than an error.)
$ dune exec examples/repodb_errors/main.exeRead/write splitting (CQRS)
When one database server is not enough, repodb's Cqrs module splits traffic: writes go to a primary, reads fan out across replicas with round-robin, random or least-connections selection and per-replica health tracking. The query code above it does not change — only which pool hands out the connection — so you adopt it by swapping a Repo for a Cqrs pool, not by rewriting models.
Describing the topology
(* A CQRS pool is described by one config: where the primary lives, where the
replicas live, and how reads pick a replica. In production these are
distinct servers kept in sync by the database's own replication; here a
single temp file stands in for all of them so the demo can run — the point
is the routing, not the topology. *)
let () =
let db = Filename.temp_file "repodb_cqrs" ".sqlite3" in
let cqrs =
C.create
{
primary_conninfo = db;
primary_max_size = 2;
replica_conninfos = [ db; db ];
(* two replicas *)
replica_max_size_each = 2;
replica_selection = Repodb.Cqrs.RoundRobin;
validate = None;
}
inOne config names the primary, the replicas, the pool sizes and the replica-selection strategy. Repodb.Cqrs.Make (Driver) gives a pool bound to that driver, just like Repo.Make.
Routing reads and writes
(* with_write borrows from the primary pool. *)
run_write "create" (fun conn ->
Repo.exec_sql conn
"CREATE TABLE pages (slug TEXT PRIMARY KEY, hits INTEGER NOT NULL)"
~params:[||]);
let bump slug hits conn =
Repo.exec_sql conn "INSERT INTO pages (slug, hits) VALUES (?, ?)"
~params:Repodb.Driver.Value.[| text slug; int hits |]
in
run_write "insert docs" (bump "docs" 25);
run_write "insert home" (bump "home" 10);
(* with_read routes to a healthy replica, round-robin; with no replicas it
falls back to the primary, so read code is identical either way. *)
let total =
run_read "sum hits" (fun conn ->
match Repo.query_one_sql conn "SELECT SUM(hits) FROM pages" ~params:[||] with
| Ok (Some row) -> Ok (Repodb.Driver.row_int row 0)
| Ok None -> Ok 0
| Error e -> Error e)
in
Printf.printf "total hits (read from a replica): %d\n" total;with_write borrows a connection from the primary pool; with_read routes to a healthy replica, falling back to the primary when there are none — so read code is identical whether or not replicas are configured. Each wraps the connection's lifetime, returning it to the pool even on error.
Health and stats
(* stats reports the primary and every replica pool, plus how many replicas
are currently healthy — what a health-check endpoint would surface. *)
let s = C.stats cqrs in
Printf.printf "primary in_use=%d healthy replicas=%d\n" s.primary.in_use
s.healthy_replica_count;
(* mark_replica_unhealthy pulls a replica out of rotation (e.g. after a
failed health probe); reads then skip it until it is marked healthy. *)
C.mark_replica_unhealthy cqrs 0;
Printf.printf "after marking replica 0 unhealthy: healthy replicas=%d\n"
(C.count_healthy_replicas cqrs);stats reports the primary and every replica pool plus the count of healthy replicas — what a health-check endpoint surfaces. mark_replica_unhealthy pulls a replica out of rotation after a failed probe; reads skip it until it is marked healthy again.
$ dune exec examples/repodb_cqrs/main.exeWhere this fits in a full application: every repodb call lives in a model or a context — controllers and views never touch the database. The conventions page shows the layering, and the API reference linked above documents every module and function in full.