Part IV — The Web·Chapter 12

SQLx and
PostgreSQL

SQLx is the Rust database library that verifies your SQL queries at compile time against a real database. Not an ORM — your SQL remains SQL, readable and direct. But the compiler checks every column name, every type binding, every nullable field before the binary is built. No more production crashes from a mistyped column name at 3am during a Sprint NOC incident.
§ 12.1
Why SQLx — Compile-Time Query Verification

Most database libraries in other languages are either full ORMs (which abstract SQL away at the cost of control and performance) or raw query builders with runtime type errors. SQLx takes a different approach: you write plain SQL, and during compilation SQLx runs your queries against a real database schema and verifies that column types match your Rust types. If you rename a column in a migration and forget to update the query, the code will not compile.

How Compile-Time Verification Works

The sqlx::query_as! macro

During cargo build, SQLx connects to the database specified in DATABASE_URL, sends your SQL with PREPARE, receives the inferred parameter and result types from PostgreSQL, and verifies that your Rust struct fields match. The verification result is cached in a .sqlx/ directory — you can commit this for offline builds.

Three modes:

query! — verifies the SQL, returns an anonymous struct per row. Good for one-off queries.

query_as!(YourStruct, sql) — verifies SQL and maps rows directly into your named struct. This is the one you will use most.

query_file_as!(YourStruct, "path.sql") — reads SQL from a file. Good for complex queries.

§ 12.2
Setup — PgPool and Migrations

A connection pool is essential for a web server. Each HTTP request should not open a fresh TCP connection to PostgreSQL — that takes hundreds of milliseconds. PgPool maintains a pool of ready connections (default 5–10) and hands them out to concurrent handlers. When the handler completes, the connection returns to the pool.

Cargo.toml additions
sqlx = { version = "0.7", features = [
    "runtime-tokio",      # use Tokio for async I/O
    "postgres",           # PostgreSQL dialect
    "uuid",               # UUID type support
    "chrono",             # DateTime type support
    "macros",             # query!, query_as! macros
    "migrate",            # sqlx::migrate!() for running migrations
] }
migrations/001_create_alerts.sql
-- SQLx runs migrations in filename order.
-- Each file runs exactly once; sqlx tracks them in _sqlx_migrations table.

CREATE TABLE IF NOT EXISTS alerts (
    id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    site        TEXT        NOT NULL,         -- 'Kilimanjaro', 'Serengeti', etc.
    severity    TEXT        NOT NULL,         -- 'info', 'warning', 'critical'
    message     TEXT        NOT NULL,
    source      TEXT        NOT NULL,         -- 'zabbix', 'manual', 'api'
    resolved    BOOLEAN     NOT NULL DEFAULT FALSE,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at TIMESTAMPTZ
);

CREATE INDEX idx_alerts_site     ON alerts(site);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_created  ON alerts(created_at DESC);

CREATE TABLE IF NOT EXISTS sites (
    name        TEXT        PRIMARY KEY,
    city        TEXT        NOT NULL,
    country     TEXT        NOT NULL,
    status      TEXT        NOT NULL DEFAULT 'operational',
    latency_ms  INTEGER,
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

INSERT INTO sites (name, city, country) VALUES
    ('Kilimanjaro', 'Dar es Salaam', 'Tanzania'),
    ('Serengeti',   'Arusha',        'Tanzania'),
    ('Drakensberg', 'Johannesburg',  'South Africa'),
    ('Rwenzori',    'Kampala',       'Uganda')
ON CONFLICT (name) DO NOTHING;
src/db.rs — pool initialisation and migrations
use sqlx::PgPool;

pub async fn create_pool(database_url: &str) -> anyhow::Result<PgPool> {
    let pool = PgPool::connect(database_url).await?;

    // Run pending migrations from ./migrations/ directory
    // Checked against _sqlx_migrations table — idempotent
    sqlx::migrate!("./migrations").run(&pool).await?;

    tracing::info!("Database connected and migrations applied");
    Ok(pool)
}

// In main.rs:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let db_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    let pool = create_pool(&db_url).await?;
    let state = AppState::new(pool);
    // build router, serve ...
    Ok(())
}
§ 12.3
Writing Queries — query_as!

The query_as! macro takes your Rust struct and a SQL string. At compile time, PostgreSQL infers column types from the query. SQLx maps them to Rust types. If there is a mismatch — a nullable column bound to a non-Option type, a TEXT column bound to an i32 — the compiler tells you. Your struct fields must match column names exactly (or you alias in SQL).

src/models.rs
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

#[derive(Debug, Serialize, sqlx::FromRow)]
pub struct Alert {
    pub id:          Uuid,
    pub site:        String,
    pub severity:    String,
    pub message:     String,
    pub source:      String,
    pub resolved:    bool,
    pub created_at:  DateTime<Utc>,
    pub resolved_at: Option<DateTime<Utc>>,  // nullable → Option
}

#[derive(Debug, Deserialize)]
pub struct CreateAlert {
    pub site:     String,
    pub severity: String,
    pub message:  String,
    pub source:   String,
}

#[derive(Debug, Serialize, sqlx::FromRow)]
pub struct Site {
    pub name:       String,
    pub city:       String,
    pub country:    String,
    pub status:     String,
    pub latency_ms: Option<i32>,
    pub updated_at: DateTime<Utc>,
}
src/db.rs — repository functions
use sqlx::PgPool;
use crate::models::{Alert, CreateAlert, Site};
use uuid::Uuid;

pub async fn list_alerts(
    pool: &PgPool,
    site: Option<String>,
    limit: i64,
) -> sqlx::Result<Vec<Alert>> {
    // query_as! verifies this SQL against your DB schema at compile time
    sqlx::query_as!(
        Alert,
        r#"
            SELECT id, site, severity, message, source,
                   resolved, created_at, resolved_at
            FROM   alerts
            WHERE  ($1::text IS NULL OR site = $1)
            ORDER  BY created_at DESC
            LIMIT  $2
        "#,
        site,   // $1 — nullable filter
        limit   // $2
    )
    .fetch_all(pool)
    .await
}

pub async fn create_alert(
    pool: &PgPool,
    input: CreateAlert,
) -> sqlx::Result<Alert> {
    sqlx::query_as!(
        Alert,
        r#"
            INSERT INTO alerts (site, severity, message, source)
            VALUES ($1, $2, $3, $4)
            RETURNING id, site, severity, message, source,
                      resolved, created_at, resolved_at
        "#,
        input.site,
        input.severity,
        input.message,
        input.source,
    )
    .fetch_one(pool)
    .await
}

pub async fn resolve_alert(
    pool: &PgPool,
    id: Uuid,
) -> sqlx::Result<Option<Alert>> {
    sqlx::query_as!(
        Alert,
        r#"
            UPDATE alerts
            SET    resolved = TRUE, resolved_at = NOW()
            WHERE  id = $1 AND resolved = FALSE
            RETURNING id, site, severity, message, source,
                      resolved, created_at, resolved_at
        "#,
        id
    )
    .fetch_optional(pool)  // returns None if id not found or already resolved
    .await
}

pub async fn list_sites(pool: &PgPool) -> sqlx::Result<Vec<Site>> {
    sqlx::query_as!(Site, "SELECT * FROM sites ORDER BY name")
        .fetch_all(pool)
        .await
}
§ 12.4
Transactions — ACID Guarantees

When creating an alert also needs to update the site status atomically, use a transaction. If any step fails, the entire operation rolls back. SQLx transactions integrate cleanly with Rust's ownership model: the transaction is committed when you call .commit() and rolled back automatically when the transaction variable drops without being committed.

transaction example
pub async fn create_critical_alert(
    pool: &PgPool,
    input: CreateAlert,
) -> sqlx::Result<Alert> {
    // Begin transaction — auto-rolled-back if txn is dropped without commit
    let mut txn = pool.begin().await?;

    let alert = sqlx::query_as!(
        Alert,
        r#"INSERT INTO alerts (site, severity, message, source)
           VALUES ($1, $2, $3, $4)
           RETURNING id, site, severity, message, source,
                     resolved, created_at, resolved_at"#,
        input.site, input.severity, input.message, input.source,
    )
    .fetch_one(&mut *txn)  // note: query against txn, not pool
    .await?;

    // Also update site status if this is a critical alert
    if input.severity == "critical" {
        sqlx::query!(
            "UPDATE sites SET status = 'degraded', updated_at = NOW() WHERE name = $1",
            input.site
        )
        .execute(&mut *txn)
        .await?;
    }

    txn.commit().await?;  // both writes committed atomically
    Ok(alert)
}
§ 12.5
Testing Database Code

SQLx provides #[sqlx::test] — a macro that spins up a temporary database for each test function, runs migrations, and tears it down after. This gives you real PostgreSQL behaviour in your tests without mocking. No fixtures, no in-memory SQLite approximations. The test database URL is derived from DATABASE_URL with a unique suffix per test.

tests/db_tests.rs
#[sqlx::test(migrations = "./migrations")]
async fn test_create_and_resolve_alert(pool: PgPool) {
    // pool is a fresh DB with migrations applied — provided by #[sqlx::test]
    let alert = create_alert(&pool, CreateAlert {
        site:     "Kilimanjaro".into(),
        severity: "warning".into(),
        message:  "BGP session flap detected".into(),
        source:   "zabbix".into(),
    }).await.expect("create failed");

    assert!(!alert.resolved);
    assert_eq!(alert.site, "Kilimanjaro");

    let resolved = resolve_alert(&pool, alert.id)
        .await.expect("resolve failed")
        .expect("alert not found");

    assert!(resolved.resolved);
    assert!(resolved.resolved_at.is_some());
}

#[sqlx::test(migrations = "./migrations")]
async fn test_list_alerts_by_site(pool: PgPool) {
    // Insert two alerts for different sites
    for (site, sev) in [("Kilimanjaro", "info"), ("Serengeti", "critical")] {
        create_alert(&pool, CreateAlert {
            site: site.into(), severity: sev.into(),
            message: "test".into(), source: "test".into(),
        }).await.unwrap();
    }

    let kilimanjaro_alerts = list_alerts(&pool, Some("Kilimanjaro".into()), 10)
        .await.unwrap();
    assert_eq!(kilimanjaro_alerts.len(), 1);
    assert_eq!(kilimanjaro_alerts[0].severity, "info");
}
✓ What SQLx Gives You

Compile-time SQL verification catches the entire class of runtime bugs that come from SQL typos, schema drift, and type mismatches. Combined with PostgreSQL's ACID transactions, you have a database layer that is both safe and correct. For the Sprint NOC API, this means Abraham Mwapongo's team in Dar es Salaam can trust that the alert data they see on the NOC wall screens is accurate and consistent — even during concurrent alert floods from Zabbix.

Exercise 12-A

Add a Sites Health Endpoint

Create a GET /api/v1/sites/health endpoint that queries all sites and returns a JSON object with counts by status: { "operational": 3, "degraded": 1, "down": 0 }. Write this as a single SQL GROUP BY query using query! (not query_as!). Handle the case where a status has zero sites — use COALESCE or application-level defaulting.

Exercise 12-B

Alert Pagination

The current list_alerts uses LIMIT. Implement cursor-based pagination: the client sends a before query parameter (a UUID of the last seen alert) and the server returns the next page of results before that alert's created_at timestamp. Use Query<PaginationParams> as the extractor and write a SQLx query using a CTE or subquery to find the cursor row's timestamp.