| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Ecluse.Queue
Description
The mirror-queue handle: the durable hand-off from the request path to the mirror worker.
Mirroring is demand-driven: when a client fetches an artifact whose version
passes the rules, the proxy enqueues a MirrorJob and serves the artifact
immediately, never blocking on the mirror. A separate worker receives jobs,
fetches and verifies the artifact, publishes it to the mirror target, and acks
the job (see docs/architecture/cloud-backends.md → "Mirror Queue").
The queue is the one cloud surface with materially different APIs per provider
(AWS SQS SendMessage/ReceiveMessage+visibility-timeout/DeleteMessage; GCP
Pub/Sub Publish/Pull+ack-deadline/Acknowledge), so it is its own handle —
a record of functions (the Handle pattern). Both providers fit the same
receive → process → ack shape; their differences (visibility timeout vs ack
deadline, batch limits, dead-letter wiring) stay behind the handle, and
ReceiptHandle is opaque so neither leaks.
Like the other handles, the effectful fields return IO, not App, so an
adapter stays decoupled from the proxy's Env/App (see
docs/architecture/technology-stack.md → "Key Decisions").
Conventions
The two cloud backends both give at-least-once delivery, which is safe here because publishing is idempotent (a registry treats versions as immutable). The handle's contract reflects that:
enqueueis best-effort. It runs on the request hot path (enqueue, then serve immediately), so a failure must be logged/metered and __never fail the client response__ — the artifact is already served, and a later pull re-enqueues.- Retry is "don't
ack". A job that fails processing is simply not acked; the visibility timeout / ack deadline redelivers it, and the backend's native dead-letter path catches the persistently failing ones. There is deliberately nonack. extendVisibilitylets the worker hold a long publish (a large artifact) past the visibility window. It is an optimization, not correctness-critical, since idempotency already makes redelivery harmless.
This module provides the handle and its payload types, plus two STM-backed in-memory implementations:
newInMemoryQueue— the test double that models the cloud backends' visibility-timeout semantics (receive → ack / redeliver-on-no-ack), used to exercise the worker's retry path without a cloud queue.newBoundedInMemoryQueue— the bounded, best-effort production backend selected byMIRROR_QUEUE_PROVIDER=memory. See its own Haddock for why it is correctness-safe (a dropped job is re-enqueued on the next demand) and why it deliberately does not redeliver.
Synopsis
- data MirrorQueue = MirrorQueue {
- enqueue :: MirrorJob -> IO ()
- receive :: IO [QueueMessage]
- ack :: ReceiptHandle -> IO ()
- extendVisibility :: ReceiptHandle -> Seconds -> IO ()
- data MirrorJob = MirrorJob {}
- data MirrorArtifact = MirrorArtifact {}
- data QueueMessage = QueueMessage {}
- data ReceiptHandle
- mkReceiptHandle :: Text -> ReceiptHandle
- unReceiptHandle :: ReceiptHandle -> Text
- newtype Seconds = Seconds Int
- newInMemoryQueue :: IO MirrorQueue
- data MemoryQueueConfig = MemoryQueueConfig {}
- defaultMemoryQueueConfig :: Int -> MemoryQueueConfig
- newBoundedInMemoryQueue :: MemoryQueueConfig -> (Int -> IO ()) -> IO MirrorQueue
- memoryQueueBatchSize :: Int
- memoryQueueDropReportInterval :: Int
Queue handle
data MirrorQueue Source #
The mirror-queue handle — a record of functions over a backend whose private
state the closures capture. See the module header for the enqueue /
don't-ack-to-retry / no-nack conventions; all fields are IO.
Constructors
| MirrorQueue | |
Fields
| |
Payloads
A mirror job: everything the worker needs to back-fill one artifact into the mirror target. The version was already gated by the rules at serve time (when the job was enqueued), so the worker does not re-run the rules; it fetches the bytes, verifies them against the serve-time-admitted integrity digest the job carries, and publishes.
The integrity digest and the artifact descriptor are captured at enqueue time
(jobArtifact), not re-fetched: the worker mirrors exactly what the rules
admitted, so an upstream packument mutated in the enqueue → process window cannot
substitute a different artifact for the one that was gated. The descriptor also
carries the filename and declared size the worker needs to assemble the publish
document.
Constructors
| MirrorJob | |
Fields
| |
Instances
data MirrorArtifact Source #
The serve-time-admitted artifact descriptor carried on a MirrorJob: exactly
the fields the worker needs to verify the fetched bytes and assemble the publish
document, captured when the version was gated.
maHashes is a NonEmpty because the serve path admits a public version only when
it carries at least one integrity digest (the integrity-presence admission policy),
so a job with no digest to verify against is unrepresentable — the worker always
has a fingerprint to check the bytes against before they reach the private upstream.
Constructors
| MirrorArtifact | |
Fields
| |
Instances
| Show MirrorArtifact Source # | |
Defined in Ecluse.Queue Methods showsPrec :: Int -> MirrorArtifact -> ShowS # show :: MirrorArtifact -> String # showList :: [MirrorArtifact] -> ShowS # | |
| Eq MirrorArtifact Source # | |
Defined in Ecluse.Queue Methods (==) :: MirrorArtifact -> MirrorArtifact -> Bool # (/=) :: MirrorArtifact -> MirrorArtifact -> Bool # | |
data QueueMessage Source #
A received message: the MirrorJob to process together with the
ReceiptHandle used to ack it (or extendVisibility on it) once processed.
Constructors
| QueueMessage | |
Fields
| |
Instances
| Show QueueMessage Source # | |
Defined in Ecluse.Queue Methods showsPrec :: Int -> QueueMessage -> ShowS # show :: QueueMessage -> String # showList :: [QueueMessage] -> ShowS # | |
| Eq QueueMessage Source # | |
Defined in Ecluse.Queue | |
Opaque receipt
data ReceiptHandle Source #
An opaque handle identifying a received message for ack /
extendVisibility. It carries the backend's own delivery token — an SQS receipt
handle or a Pub/Sub ackId — as text; the constructor is hidden so neither
provider's representation leaks into worker code, and a handle is only ever
obtained from a QueueMessage returned by receive. Build one (in a backend)
with mkReceiptHandle and read the token back with unReceiptHandle.
Instances
| Show ReceiptHandle Source # | |
Defined in Ecluse.Queue Methods showsPrec :: Int -> ReceiptHandle -> ShowS # show :: ReceiptHandle -> String # showList :: [ReceiptHandle] -> ShowS # | |
| Eq ReceiptHandle Source # | |
Defined in Ecluse.Queue Methods (==) :: ReceiptHandle -> ReceiptHandle -> Bool # (/=) :: ReceiptHandle -> ReceiptHandle -> Bool # | |
| Ord ReceiptHandle Source # | |
Defined in Ecluse.Queue Methods compare :: ReceiptHandle -> ReceiptHandle -> Ordering # (<) :: ReceiptHandle -> ReceiptHandle -> Bool # (<=) :: ReceiptHandle -> ReceiptHandle -> Bool # (>) :: ReceiptHandle -> ReceiptHandle -> Bool # (>=) :: ReceiptHandle -> ReceiptHandle -> Bool # max :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle # min :: ReceiptHandle -> ReceiptHandle -> ReceiptHandle # | |
mkReceiptHandle :: Text -> ReceiptHandle Source #
Wrap a backend's delivery token (an SQS receipt handle, a Pub/Sub ackId)
as an opaque ReceiptHandle. For backend implementations only — worker code
obtains handles from receive, never builds them.
unReceiptHandle :: ReceiptHandle -> Text Source #
Recover the backend's delivery token from a ReceiptHandle, to pass back to
the backend on ack / extendVisibility. For backend implementations only.
Durations
A duration in whole seconds, for extendVisibility. A 'newtype' so a raw
Int of seconds is never confused with some other count.
In-memory double
newInMemoryQueue :: IO MirrorQueue Source #
Build a fresh STM-backed in-memory MirrorQueue.
Honours the handle's contract: enqueue appends (FIFO), receive delivers all
currently-visible jobs and moves them in-flight, ack removes an in-flight job,
and an in-flight job that is never acked is redelivered on the next receive
("retry is don't ack"). extendVisibility holds a job in-flight across one such
redelivery pass. This is a test double — there is no long-poll blocking; an empty
receive returns [] at once.
Bounded in-memory production backend
data MemoryQueueConfig Source #
What the bounded in-memory backend needs: its depth cap and its idle-poll
window. A record (like SqsConfig) so each knob is named rather
than a bare Int; build it with defaultMemoryQueueConfig for the production poll
window.
Constructors
| MemoryQueueConfig | |
Fields
| |
Instances
| Show MemoryQueueConfig Source # | |
Defined in Ecluse.Queue Methods showsPrec :: Int -> MemoryQueueConfig -> ShowS # show :: MemoryQueueConfig -> String # showList :: [MemoryQueueConfig] -> ShowS # | |
| Eq MemoryQueueConfig Source # | |
Defined in Ecluse.Queue Methods (==) :: MemoryQueueConfig -> MemoryQueueConfig -> Bool # (/=) :: MemoryQueueConfig -> MemoryQueueConfig -> Bool # | |
defaultMemoryQueueConfig :: Int -> MemoryQueueConfig Source #
A MemoryQueueConfig for a given depth cap with the idle-poll window at its
production default — 20s, mirroring the SQS long-poll cadence
(defaultSqsConfig) and comfortably under the worker's 120s
heartbeat-staleness budget (workerHeartbeatStaleAfter), so an idle
receive returns a healthy empty poll long before /livez would flag the loop
stalled. The depth cap stays the operator-tunable knob; the poll window is a fixed
cadence, exposed on the record only so a test can shorten it.
newBoundedInMemoryQueue Source #
Arguments
| :: MemoryQueueConfig | The depth cap (and any future knobs). |
| -> (Int -> IO ()) | Invoked on each report-worthy cap-overflow drop with the running total drops,
so the composition root can log it (and, once the |
| -> IO MirrorQueue |
Build a bounded, best-effort in-memory MirrorQueue — the production backend
behind MIRROR_QUEUE_PROVIDER=memory, a TBQueue shared between the serve path's
enqueue and the worker's receive.
It is correctness-safe despite being lossy: mirroring is a demand-driven optimization over the always-available public upstream, so a job lost to the cap or to process teardown just means the package is served from public again and re-enqueued on the next pull — a deferred performance win, never a correctness loss. That admits two deliberate departures from the cloud backends' contract:
- Bounded, drop-newest on overflow. The queue holds at most
memQueueMaxDepthjobs; anenqueuethat would exceed the cap is rejected (the newest job is dropped) rather than growing memory without bound — the load-bearing constraint, since a cold-cachenpm cienqueues thousands of jobs at once.enqueuenever throws (it runs on the serve hot path), and each report-worthy drop invokes the injected drop callback with the running drop count, rate-limited bymemoryQueueDropReportIntervalso a flood does not spam. - No redelivery;
ack/extendVisibilityare no-ops. Unlike the cloud backends (andnewInMemoryQueue), there is no visibility-timeout in-flight tracking: areceiveremoves a job for good. A job whose processing fails is therefore not redelivered — it is simply re-enqueued on the next demand. This bounds memory hardest (nothing is retained after delivery) and is admissible precisely because a lost job is safe.
receive is a bounded long-poll: it waits up to memQueuePollWaitMicros for a
job, then drains up to memoryQueueBatchSize without blocking, or returns [] when
the window lapses — the in-process analogue of the cloud long-poll. The bound is
load-bearing: the worker advances its liveness heartbeat only when receive returns
(an empty poll is a healthy idle), so an idle receive that blocked forever would
let the heartbeat go stale and /livez flag the loop stalled. The wait is the
timeout-over-atomically idiom rather than registerDelay so it works on the
non-threaded RTS too; an interrupted poll aborts the STM transaction, consuming
nothing.
memoryQueueBatchSize :: Int Source #
The most jobs one receive delivers from the bounded in-memory backend. Held
at the SQS batch cap so the worker — which processes a batch sequentially and
advances its liveness heartbeat once per poll — sees the same bounded batch shape
regardless of backend, rather than one poll returning a whole cold-cache burst and
starving the heartbeat past its staleness window.
memoryQueueDropReportInterval :: Int Source #
How many cap-overflow drops the bounded in-memory backend absorbs between warning reports. The first drop is always reported, then every multiple of this, so a sustained flood logs at most about one line per this many drops rather than one per dropped job.