SQLx and
PostgreSQL
Most database libraries in other languages are either full ORMs (which abstract SQL away at the cost of control and performance) or raw query builders with runtime type errors. SQLx takes a different approach: you write plain SQL, and during compilation SQLx runs your queries against a real database schema and verifies that column types match your Rust types. If you rename a column in a migration and forget to update the query, the code will not compile.
The sqlx::query_as! macro
During cargo build, SQLx connects to the database specified in DATABASE_URL, sends your SQL with PREPARE, receives the inferred parameter and result types from PostgreSQL, and verifies that your Rust struct fields match. The verification result is cached in a .sqlx/ directory — you can commit this for offline builds.
Three modes:
• query! — verifies the SQL, returns an anonymous struct per row. Good for one-off queries.
• query_as!(YourStruct, sql) — verifies SQL and maps rows directly into your named struct. This is the one you will use most.
• query_file_as!(YourStruct, "path.sql") — reads SQL from a file. Good for complex queries.
A connection pool is essential for a web server. Each HTTP request should not open a fresh TCP connection to PostgreSQL — that takes hundreds of milliseconds. PgPool maintains a pool of ready connections (default 5–10) and hands them out to concurrent handlers. When the handler completes, the connection returns to the pool.
sqlx = { version = "0.7", features = [
"runtime-tokio", # use Tokio for async I/O
"postgres", # PostgreSQL dialect
"uuid", # UUID type support
"chrono", # DateTime type support
"macros", # query!, query_as! macros
"migrate", # sqlx::migrate!() for running migrations
] }-- SQLx runs migrations in filename order. -- Each file runs exactly once; sqlx tracks them in _sqlx_migrations table. CREATE TABLE IF NOT EXISTS alerts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), site TEXT NOT NULL, -- 'Kilimanjaro', 'Serengeti', etc. severity TEXT NOT NULL, -- 'info', 'warning', 'critical' message TEXT NOT NULL, source TEXT NOT NULL, -- 'zabbix', 'manual', 'api' resolved BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), resolved_at TIMESTAMPTZ ); CREATE INDEX idx_alerts_site ON alerts(site); CREATE INDEX idx_alerts_severity ON alerts(severity); CREATE INDEX idx_alerts_created ON alerts(created_at DESC); CREATE TABLE IF NOT EXISTS sites ( name TEXT PRIMARY KEY, city TEXT NOT NULL, country TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'operational', latency_ms INTEGER, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); INSERT INTO sites (name, city, country) VALUES ('Kilimanjaro', 'Dar es Salaam', 'Tanzania'), ('Serengeti', 'Arusha', 'Tanzania'), ('Drakensberg', 'Johannesburg', 'South Africa'), ('Rwenzori', 'Kampala', 'Uganda') ON CONFLICT (name) DO NOTHING;
use sqlx::PgPool; pub async fn create_pool(database_url: &str) -> anyhow::Result<PgPool> { let pool = PgPool::connect(database_url).await?; // Run pending migrations from ./migrations/ directory // Checked against _sqlx_migrations table — idempotent sqlx::migrate!("./migrations").run(&pool).await?; tracing::info!("Database connected and migrations applied"); Ok(pool) } // In main.rs: #[tokio::main] async fn main() -> anyhow::Result<()> { let db_url = std::env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); let pool = create_pool(&db_url).await?; let state = AppState::new(pool); // build router, serve ... Ok(()) }
The query_as! macro takes your Rust struct and a SQL string. At compile time, PostgreSQL infers column types from the query. SQLx maps them to Rust types. If there is a mismatch — a nullable column bound to a non-Option type, a TEXT column bound to an i32 — the compiler tells you. Your struct fields must match column names exactly (or you alias in SQL).
use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; #[derive(Debug, Serialize, sqlx::FromRow)] pub struct Alert { pub id: Uuid, pub site: String, pub severity: String, pub message: String, pub source: String, pub resolved: bool, pub created_at: DateTime<Utc>, pub resolved_at: Option<DateTime<Utc>>, // nullable → Option } #[derive(Debug, Deserialize)] pub struct CreateAlert { pub site: String, pub severity: String, pub message: String, pub source: String, } #[derive(Debug, Serialize, sqlx::FromRow)] pub struct Site { pub name: String, pub city: String, pub country: String, pub status: String, pub latency_ms: Option<i32>, pub updated_at: DateTime<Utc>, }
use sqlx::PgPool; use crate::models::{Alert, CreateAlert, Site}; use uuid::Uuid; pub async fn list_alerts( pool: &PgPool, site: Option<String>, limit: i64, ) -> sqlx::Result<Vec<Alert>> { // query_as! verifies this SQL against your DB schema at compile time sqlx::query_as!( Alert, r#" SELECT id, site, severity, message, source, resolved, created_at, resolved_at FROM alerts WHERE ($1::text IS NULL OR site = $1) ORDER BY created_at DESC LIMIT $2 "#, site, // $1 — nullable filter limit // $2 ) .fetch_all(pool) .await } pub async fn create_alert( pool: &PgPool, input: CreateAlert, ) -> sqlx::Result<Alert> { sqlx::query_as!( Alert, r#" INSERT INTO alerts (site, severity, message, source) VALUES ($1, $2, $3, $4) RETURNING id, site, severity, message, source, resolved, created_at, resolved_at "#, input.site, input.severity, input.message, input.source, ) .fetch_one(pool) .await } pub async fn resolve_alert( pool: &PgPool, id: Uuid, ) -> sqlx::Result<Option<Alert>> { sqlx::query_as!( Alert, r#" UPDATE alerts SET resolved = TRUE, resolved_at = NOW() WHERE id = $1 AND resolved = FALSE RETURNING id, site, severity, message, source, resolved, created_at, resolved_at "#, id ) .fetch_optional(pool) // returns None if id not found or already resolved .await } pub async fn list_sites(pool: &PgPool) -> sqlx::Result<Vec<Site>> { sqlx::query_as!(Site, "SELECT * FROM sites ORDER BY name") .fetch_all(pool) .await }
When creating an alert also needs to update the site status atomically, use a transaction. If any step fails, the entire operation rolls back. SQLx transactions integrate cleanly with Rust's ownership model: the transaction is committed when you call .commit() and rolled back automatically when the transaction variable drops without being committed.
pub async fn create_critical_alert( pool: &PgPool, input: CreateAlert, ) -> sqlx::Result<Alert> { // Begin transaction — auto-rolled-back if txn is dropped without commit let mut txn = pool.begin().await?; let alert = sqlx::query_as!( Alert, r#"INSERT INTO alerts (site, severity, message, source) VALUES ($1, $2, $3, $4) RETURNING id, site, severity, message, source, resolved, created_at, resolved_at"#, input.site, input.severity, input.message, input.source, ) .fetch_one(&mut *txn) // note: query against txn, not pool .await?; // Also update site status if this is a critical alert if input.severity == "critical" { sqlx::query!( "UPDATE sites SET status = 'degraded', updated_at = NOW() WHERE name = $1", input.site ) .execute(&mut *txn) .await?; } txn.commit().await?; // both writes committed atomically Ok(alert) }
SQLx provides #[sqlx::test] — a macro that spins up a temporary database for each test function, runs migrations, and tears it down after. This gives you real PostgreSQL behaviour in your tests without mocking. No fixtures, no in-memory SQLite approximations. The test database URL is derived from DATABASE_URL with a unique suffix per test.
#[sqlx::test(migrations = "./migrations")] async fn test_create_and_resolve_alert(pool: PgPool) { // pool is a fresh DB with migrations applied — provided by #[sqlx::test] let alert = create_alert(&pool, CreateAlert { site: "Kilimanjaro".into(), severity: "warning".into(), message: "BGP session flap detected".into(), source: "zabbix".into(), }).await.expect("create failed"); assert!(!alert.resolved); assert_eq!(alert.site, "Kilimanjaro"); let resolved = resolve_alert(&pool, alert.id) .await.expect("resolve failed") .expect("alert not found"); assert!(resolved.resolved); assert!(resolved.resolved_at.is_some()); } #[sqlx::test(migrations = "./migrations")] async fn test_list_alerts_by_site(pool: PgPool) { // Insert two alerts for different sites for (site, sev) in [("Kilimanjaro", "info"), ("Serengeti", "critical")] { create_alert(&pool, CreateAlert { site: site.into(), severity: sev.into(), message: "test".into(), source: "test".into(), }).await.unwrap(); } let kilimanjaro_alerts = list_alerts(&pool, Some("Kilimanjaro".into()), 10) .await.unwrap(); assert_eq!(kilimanjaro_alerts.len(), 1); assert_eq!(kilimanjaro_alerts[0].severity, "info"); }
Compile-time SQL verification catches the entire class of runtime bugs that come from SQL typos, schema drift, and type mismatches. Combined with PostgreSQL's ACID transactions, you have a database layer that is both safe and correct. For the Sprint NOC API, this means Abraham Mwapongo's team in Dar es Salaam can trust that the alert data they see on the NOC wall screens is accurate and consistent — even during concurrent alert floods from Zabbix.
Add a Sites Health Endpoint
Create a GET /api/v1/sites/health endpoint that queries all sites and returns a JSON object with counts by status: { "operational": 3, "degraded": 1, "down": 0 }. Write this as a single SQL GROUP BY query using query! (not query_as!). Handle the case where a status has zero sites — use COALESCE or application-level defaulting.
Alert Pagination
The current list_alerts uses LIMIT. Implement cursor-based pagination: the client sends a before query parameter (a UUID of the last seen alert) and the server returns the next page of results before that alert's created_at timestamp. Use Query<PaginationParams> as the extractor and write a SQLx query using a CTE or subquery to find the cursor row's timestamp.