KMQ: four scenarios, four passes, and the block policy that closes the gap | Lisandro Fernández Rocha

KMQ: four scenarios, four passes, and the block policy that closes the gap

Published: May 06, 2026
awkmessage-brokerevent-drivenkubernetesfifobackpressuredead-letter-queuehomelabepistemic

KMQ: four scenarios, four passes, and the block policy that closes the gap

The previous post documented four scenarios run against KMQ, a message broker built from FIFOs, AWK, and Kubernetes. Two passed. Two failed. The failures traced the boundary where TCP ingress does not propagate backpressure and where a pod restart opens a window that drops messages.

This post documents the updated system. The topology is simpler. The ring‑buffer overflow policy is explicit. All four scenarios pass, and a fifth scenario, zero‑loss backpressure with a blocked consumer, passes as well.


Simpler topology, direct path

The original architecture placed a NodePort, an ingress proxy, and a ClusterIP service between the producer and the broker. That layering made the boundary issues visible, but it also masked the core pipeline’s behaviour. For the updated tests, the producer connects directly to the broker’s internal‑ingress port inside the cluster, using the pod IP obtained from kubectl. No NodePort, no ingress Deployment, no firewall rules. The path is a single TCP connection to the broker pod on port 5673.

The broker pod itself runs four containers: internal‑ingress, framer, durability, and router. They communicate through a named FIFO (/pipes/raw) and two ring buffers in /dev/shm. The only persistent state is append.log and the per‑queue log files, stored on a hostPath volume bound to a PVC provisioned by local-path on the worker node.


Ring‑buffer policy: block vs. drop_oldest

The ring‑buffer library (rb.awk) now exposes a policy switch through the environment variable RB_OVERFLOW_POLICY. When set to drop_oldest, a full buffer advances the tail pointer, discarding the oldest message. When unset or set to block, the writer enters a busy‑wait loop, calling sleep for the value of SLEEP_SHORT (0.001 seconds) until a slot opens.

The relevant AWK logic inside rb_write:

while (1) {
    head      = rb_cursor_read(hfile)
    tail      = rb_cursor_read(tfile)
    next_head = (head + 1) % size
    if (next_head != tail) break

    if (ENVIRON["RB_OVERFLOW_POLICY"] == "drop_oldest") {
        tail = (tail + 1) % size
        print tail > tfile
        fflush(tfile)
        close(tfile)
        break
    }
    system("sleep " ENVIRON["SLEEP_SHORT"])
}

The drop_oldest policy gives maximum throughput but loses messages when a downstream stage stalls. The block policy propagates backpressure: the writer sleeps, the FIFO upstream fills, the TCP sender blocks, and no messages are lost.


Scenario 1: sequence resume across broker restart

Pattern: write‑ahead log recovery, Kafka offset replay.

Test: send 500 messages. Kill the broker pod. Wait for a replacement. Send another 500. Verify that the sequence in append.log is contiguous and that the framer logs seq_resume=500 on startup.

Result: pass. The new framer reads the existing append.log, finds the last sequence number, and resumes from 501. The five‑record transition window shows the payload changing from batch1 to batch2 with no gap.

499|...|test.probe|...|499|batch1
500|...|test.probe|...|500|batch1
501|...|test.probe|...|1|batch2
502|...|test.probe|...|2|batch2

Scenario 2: routing and dead‑letter cardinality

Pattern: exchange‑to‑queue binding, AMQP routing.

Test: send 100 messages with five routing‑key prefixes (test.probe, jobs.process.*, jobs.notify.*, random.unknown_*, bar.baz_*). Verify counts in each destination file.

Result: pass. test.log receives 40 messages, jobs.log 40, random.log 10, bar.log 10. No dead‑letter file because the router creates one file per prefix, and every key matches a prefix. The routing logic is a five‑line if/else if chain in router.awk.


Scenario 3: backpressure boundary measurement (drop_oldest)

Pattern: kernel‑enforced flow control.

Test: pause the router (kill -STOP). Send 20 000 messages with drop_oldest policy. Measure how many survive.

Result: measured, not pass/fail. With drop_oldest, about 54 % of messages are lost when the consumer is stalled. The sender does not block; the ring buffer fills, and the oldest messages are discarded. This confirms the boundary where backpressure stops: the TCP ingress does not propagate the ring‑buffer block back to the client.


Scenario 3‑block: zero‑loss backpressure (block policy)

Test: same as Scenario 3, but with RB_OVERFLOW_POLICY unset (defaulting to block). Burst size 10 000 messages.

Result: pass. The sender blocks after the ring buffer fills. After the router resumes, all 10 000 messages drain to append.log. Zero loss. The protocol‑level acknowledgement loop is absent, but the kernel‑level blocking at the FIFO and the busy‑wait in the ring‑buffer writer together prevent any message from being dropped during the pipeline stall.


Scenario 4: dead‑letter queue replay

Pattern: SQS DLQ redrive, poison message recovery.

Test: send 70 known‑key messages and 30 with a dead.letter_* prefix. Verify routing. Capture the last 30 lines of dead.log (the newly arrived dead letters), rewrite the routing keys to test.recovered_*, and reinject through the direct broker connection. Verify that test.log grew by 100 (70 original + 30 recovered) and that dead.log is untouched (audit trail preserved).

Result: pass. The delta‑based measurement avoids interference from previous runs. The awk one‑liner that rewrites the keys:

awk -F'|' '{ n=$3; sub(/^dead\.letter_/,"",n); print "test.recovered_" n "|" now "|" n "|recovered" }'

This recovery operation reads the dead‑letter file, maps each record to a new routing key, and pushes it back through the broker. The original dead‑letter file stays unchanged on disk, serving as an immutable audit trail.


Throughput on modest hardware

The worker node is an Intel N97 mini PC: four Alder Lake efficiency cores at 1.5 GHz, 16 GB RAM, consumer‑grade NVMe storage.

Throughput measured via throughput-test.sh, sending directly to the broker pod and polling test.log for completion:

  • 1 000 messages: send + flush in ~1 ms → ~1 M msg/s
  • 10 000 messages: send + flush in ~6 ms → ~1.4 M msg/s
  • 50 000 messages: send in 30 ms, flush in 30 ms → ~1.7 M msg/s

These are burst numbers, the sustained rate under continuous load would be lower. The measurement counts the time from the first message sent until the last message appears in test.log, which includes the full pipeline traversal: TCP receive, FIFO, framing, durability write to append.log, durable ring‑buffer, router write to queue file. No message reordering occurs; check-gap.awk reports zero gaps in every run.

The entire broker runs as four gawk processes inside a single Kubernetes pod. Total resident memory during a 10 k burst stays under 10 MB. The only writes to disk are the sequential appends to append.log and the queue files, no compaction, no indexing, no background threads.


What changed, structurally

  • The test harness now connects directly to the broker pod IP, removing the ingress layer and its associated TCP‑boundary artifacts.
  • The ring‑buffer overflow policy is explicit and switchable via an environment variable, allowing the same binary to demonstrate both lossy high‑throughput and lossless blocked behaviour.
  • The dead‑letter replay scenario uses a delta‑count approach that is idempotent across multiple runs, making the tests repeatable without manual cleanup.
Producer (any host with socat)
  │
  │ TCP to Pod PORT
  ▼
┌─────────────────────────────────────────────────────────────┐
│ Broker Pod (Dedicate Worker Node 4 containers)              │
│                                                             │
│  ┌─────────────┐  FIFO /pipes/raw   ┌────────┐              │
│  │internal-    │───────────────────▶│ framer │              │
│  │ingress      │                    │ .awk   │              │
│  │(socat+gawk) │                    └───┬────┘              │
│  └─────────────┘                        │ ring buffer       │
│                                    /dev/shm/kmq/framed      │
│                                         │ (8k slots, block) │
│                                         ▼                   │
│                                    ┌──────────┐             │
│                                    │durability│             │
│                                    │ .awk     │             │
│                                    └─┬─────┬──┘             │
│                                      │     │                │
│                       ring buffer    │     │ append.log     │
│                  /dev/shm/kmq/durable│     │ (WAL)          │
│                          (block)     │     ▼                │
│                                      │  ┌──────────────┐    │
│                                      │  │ PVC hostPath │    │
│                                      │  │ /opt/…/logs  │    │
│                                      │  └──────────────┘    │
│                                      ▼                      │
│                                 ┌─────────┐                 │
│                                 │ router  │                 │
│                                 │ .awk    │                 │
│                                 └────┬────┘                 │
│                                      │                      │
│                              test.log, jobs.log,            │
│                              dead.log, …                    │
└─────────────────────────────────────────────────────────────┘
  │
  │ TCP from egress pods (not shown) or direct read via host
  ▼
Consumer / inspection

KMQ remains a single‑binary, single‑pod broker with no external dependencies. Every message in the system is a line in a file. Every file is readable with cat, tail, and awk. Every scenario is a shell script that produces log‑fmt output and an exit code.

This round of scenarios moves the project from “two passes, two failures” to “five passes”. The passes do not come from a new protocol or a rewrite. They come from a simpler test path that exercises the core pipeline directly, and from a single if statement that decides whether a full buffer advances the tail pointer or sleeps for a millisecond.