ecluse
Safe HaskellNone
LanguageGHC2021

Ecluse.Queue

Description

The mirror-queue handle: the durable hand-off from the request path to the mirror worker.

Mirroring is demand-driven: when a client fetches an artifact whose version passes the rules, the proxy enqueues a MirrorJob and serves the artifact immediately, never blocking on the mirror. A separate worker receives jobs, fetches and verifies the artifact, publishes it to the mirror target, and acks the job (see docs/architecture/cloud-backends.md → "Mirror Queue").

The queue is the one cloud surface with materially different APIs per provider (AWS SQS SendMessage/ReceiveMessage+visibility-timeout/DeleteMessage; GCP Pub/Sub Publish/Pull+ack-deadline/Acknowledge), so it is its own handle — a record of functions (the Handle pattern). Both providers fit the same receive → process → ack shape; their differences (visibility timeout vs ack deadline, batch limits, dead-letter wiring) stay behind the handle, and ReceiptHandle is opaque so neither leaks.

Like the other handles, the effectful fields return IO, not App, so an adapter stays decoupled from the proxy's Env/App (see docs/architecture/technology-stack.md → "Key Decisions").

Conventions

The two cloud backends both give at-least-once delivery, which is safe here because publishing is idempotent (a registry treats versions as immutable). The handle's contract reflects that:

  • enqueue is best-effort. It runs on the request hot path (enqueue, then serve immediately), so a failure must be logged/metered and __never fail the client response__ — the artifact is already served, and a later pull re-enqueues.
  • Retry is "don't ack". A job that fails processing is simply not acked; the visibility timeout / ack deadline redelivers it, and the backend's native dead-letter path catches the persistently failing ones. There is deliberately no nack.
  • extendVisibility lets the worker hold a long publish (a large artifact) past the visibility window. It is an optimization, not correctness-critical, since idempotency already makes redelivery harmless.

This module provides the handle and its payload types, plus two STM-backed in-memory implementations:

  • newInMemoryQueue — the test double that models the cloud backends' visibility-timeout semantics (receive → ack / redeliver-on-no-ack), used to exercise the worker's retry path without a cloud queue.
  • newBoundedInMemoryQueue — the bounded, best-effort production backend selected by MIRROR_QUEUE_PROVIDER=memory. See its own Haddock for why it is correctness-safe (a dropped job is re-enqueued on the next demand) and why it deliberately does not redeliver.
Synopsis

Queue handle

data MirrorQueue Source #

The mirror-queue handle — a record of functions over a backend whose private state the closures capture. See the module header for the enqueue / don't-ack-to-retry / no-nack conventions; all fields are IO.

Constructors

MirrorQueue 

Fields

  • enqueue :: MirrorJob -> IO ()

    Producer. Best-effort: runs on the request hot path, so a failure is logged/metered and never fails the client response (see the header).

  • receive :: IO [QueueMessage]

    Consumer. One long-poll for a batch of messages; returns [] on timeout (an empty poll), so the worker loop simply polls again.

  • ack :: ReceiptHandle -> IO ()

    Acknowledge a processed message so it is not redelivered. Not acking is how a failed job is retried (the header's "retry is don't ack").

  • extendVisibility :: ReceiptHandle -> Seconds -> IO ()

    Extend a received message's visibility window to hold a long publish. An optimization, not correctness-critical (redelivery is harmless).

Payloads

data MirrorJob Source #

A mirror job: everything the worker needs to back-fill one artifact into the mirror target. The version was already gated by the rules at serve time (when the job was enqueued), so the worker does not re-run the rules; it fetches the bytes, verifies them against the serve-time-admitted integrity digest the job carries, and publishes.

The integrity digest and the artifact descriptor are captured at enqueue time (jobArtifact), not re-fetched: the worker mirrors exactly what the rules admitted, so an upstream packument mutated in the enqueue → process window cannot substitute a different artifact for the one that was gated. The descriptor also carries the filename and declared size the worker needs to assemble the publish document.

Constructors

MirrorJob 

Fields

Instances

Instances details
Show MirrorJob Source # 
Instance details

Defined in Ecluse.Queue

Eq MirrorJob Source # 
Instance details

Defined in Ecluse.Queue

data MirrorArtifact Source #

The serve-time-admitted artifact descriptor carried on a MirrorJob: exactly the fields the worker needs to verify the fetched bytes and assemble the publish document, captured when the version was gated.

maHashes is a NonEmpty because the serve path admits a public version only when it carries at least one integrity digest (the integrity-presence admission policy), so a job with no digest to verify against is unrepresentable — the worker always has a fingerprint to check the bytes against before they reach the private upstream.

Constructors

MirrorArtifact 

Fields

  • maFilename :: Text

    The artifact's on-the-wire filename, the _attachments key in the publish document.

  • maHashes :: NonEmpty Hash

    The serve-time-admitted integrity digests (at least one). The worker verifies the fetched bytes against these before publishing; a mismatch fails the job with no publish.

  • maSize :: Maybe Int

    The declared artifact size in bytes, if the registry reported it.

Instances

Instances details
Show MirrorArtifact Source # 
Instance details

Defined in Ecluse.Queue

Eq MirrorArtifact Source # 
Instance details

Defined in Ecluse.Queue

data QueueMessage Source #

A received message: the MirrorJob to process together with the ReceiptHandle used to ack it (or extendVisibility on it) once processed.

Constructors

QueueMessage 

Fields

Instances

Instances details
Show QueueMessage Source # 
Instance details

Defined in Ecluse.Queue

Eq QueueMessage Source # 
Instance details

Defined in Ecluse.Queue

Opaque receipt

data ReceiptHandle Source #

An opaque handle identifying a received message for ack / extendVisibility. It carries the backend's own delivery token — an SQS receipt handle or a Pub/Sub ackId — as text; the constructor is hidden so neither provider's representation leaks into worker code, and a handle is only ever obtained from a QueueMessage returned by receive. Build one (in a backend) with mkReceiptHandle and read the token back with unReceiptHandle.

mkReceiptHandle :: Text -> ReceiptHandle Source #

Wrap a backend's delivery token (an SQS receipt handle, a Pub/Sub ackId) as an opaque ReceiptHandle. For backend implementations only — worker code obtains handles from receive, never builds them.

unReceiptHandle :: ReceiptHandle -> Text Source #

Recover the backend's delivery token from a ReceiptHandle, to pass back to the backend on ack / extendVisibility. For backend implementations only.

Durations

newtype Seconds Source #

A duration in whole seconds, for extendVisibility. A 'newtype' so a raw Int of seconds is never confused with some other count.

Constructors

Seconds Int 

Instances

Instances details
Show Seconds Source # 
Instance details

Defined in Ecluse.Queue

Eq Seconds Source # 
Instance details

Defined in Ecluse.Queue

Methods

(==) :: Seconds -> Seconds -> Bool #

(/=) :: Seconds -> Seconds -> Bool #

Ord Seconds Source # 
Instance details

Defined in Ecluse.Queue

In-memory double

newInMemoryQueue :: IO MirrorQueue Source #

Build a fresh STM-backed in-memory MirrorQueue.

Honours the handle's contract: enqueue appends (FIFO), receive delivers all currently-visible jobs and moves them in-flight, ack removes an in-flight job, and an in-flight job that is never acked is redelivered on the next receive ("retry is don't ack"). extendVisibility holds a job in-flight across one such redelivery pass. This is a test double — there is no long-poll blocking; an empty receive returns [] at once.

Bounded in-memory production backend

data MemoryQueueConfig Source #

What the bounded in-memory backend needs: its depth cap and its idle-poll window. A record (like SqsConfig) so each knob is named rather than a bare Int; build it with defaultMemoryQueueConfig for the production poll window.

Constructors

MemoryQueueConfig 

Fields

  • memQueueMaxDepth :: Int

    The maximum number of jobs the queue holds. A fresh enqueue past this cap is dropped-newest (the enqueue is rejected); a dropped job is safe, as it is re-enqueued on the next demand. Must be positive (the config layer enforces it).

  • memQueuePollWaitMicros :: Int

    The idle long-poll window in microseconds: how long a receive waits for a job before returning [] (an empty, healthy poll). Bounds the idle wait so the worker's liveness heartbeat keeps advancing — see newBoundedInMemoryQueue.

defaultMemoryQueueConfig :: Int -> MemoryQueueConfig Source #

A MemoryQueueConfig for a given depth cap with the idle-poll window at its production default — 20s, mirroring the SQS long-poll cadence (defaultSqsConfig) and comfortably under the worker's 120s heartbeat-staleness budget (workerHeartbeatStaleAfter), so an idle receive returns a healthy empty poll long before /livez would flag the loop stalled. The depth cap stays the operator-tunable knob; the poll window is a fixed cadence, exposed on the record only so a test can shorten it.

newBoundedInMemoryQueue Source #

Arguments

:: MemoryQueueConfig

The depth cap (and any future knobs).

-> (Int -> IO ())

Invoked on each report-worthy cap-overflow drop with the running total drops, so the composition root can log it (and, once the ecluse.mirror.* metric catalogue lands, increment a drop counter alongside).

-> IO MirrorQueue 

Build a bounded, best-effort in-memory MirrorQueue — the production backend behind MIRROR_QUEUE_PROVIDER=memory, a TBQueue shared between the serve path's enqueue and the worker's receive.

It is correctness-safe despite being lossy: mirroring is a demand-driven optimization over the always-available public upstream, so a job lost to the cap or to process teardown just means the package is served from public again and re-enqueued on the next pull — a deferred performance win, never a correctness loss. That admits two deliberate departures from the cloud backends' contract:

  • Bounded, drop-newest on overflow. The queue holds at most memQueueMaxDepth jobs; an enqueue that would exceed the cap is rejected (the newest job is dropped) rather than growing memory without bound — the load-bearing constraint, since a cold-cache npm ci enqueues thousands of jobs at once. enqueue never throws (it runs on the serve hot path), and each report-worthy drop invokes the injected drop callback with the running drop count, rate-limited by memoryQueueDropReportInterval so a flood does not spam.
  • No redelivery; ack / extendVisibility are no-ops. Unlike the cloud backends (and newInMemoryQueue), there is no visibility-timeout in-flight tracking: a receive removes a job for good. A job whose processing fails is therefore not redelivered — it is simply re-enqueued on the next demand. This bounds memory hardest (nothing is retained after delivery) and is admissible precisely because a lost job is safe.

receive is a bounded long-poll: it waits up to memQueuePollWaitMicros for a job, then drains up to memoryQueueBatchSize without blocking, or returns [] when the window lapses — the in-process analogue of the cloud long-poll. The bound is load-bearing: the worker advances its liveness heartbeat only when receive returns (an empty poll is a healthy idle), so an idle receive that blocked forever would let the heartbeat go stale and /livez flag the loop stalled. The wait is the timeout-over-atomically idiom rather than registerDelay so it works on the non-threaded RTS too; an interrupted poll aborts the STM transaction, consuming nothing.

memoryQueueBatchSize :: Int Source #

The most jobs one receive delivers from the bounded in-memory backend. Held at the SQS batch cap so the worker — which processes a batch sequentially and advances its liveness heartbeat once per poll — sees the same bounded batch shape regardless of backend, rather than one poll returning a whole cold-cache burst and starving the heartbeat past its staleness window.

memoryQueueDropReportInterval :: Int Source #

How many cap-overflow drops the bounded in-memory backend absorbs between warning reports. The first drop is always reported, then every multiple of this, so a sustained flood logs at most about one line per this many drops rather than one per dropped job.