ecluse
Safe HaskellNone
LanguageGHC2021

Ecluse.Worker

Description

The mirror worker: the supervised consume loop that turns enqueued jobs into mirrored packages.

The worker is the consumer end of the demand-driven mirror queue (see Ecluse.Queue). runWorker long-polls the queue, and for each received job:

  1. fetches the artifact bytes from the public upstream named on the job,
  2. verifies those bytes against the integrity digest the job carries — the digest the rules admitted at serve time, not a fresh re-fetch,
  3. assembles the npm publish document and publishes it to the mirror target (envRegistry, resolved at the composition root with the bearer from the Ecluse.Credential provider), and
  4. acknowledges the job.

The integrity gate is the security crux

A mirrored artifact is later served from the private upstream __without re-running the rules__, so a corrupt or tampered artifact must never enter it. Verification is therefore the gate: a hash mismatch fails the job with no publish and is logged loudly. Because the digest is the serve-time-admitted one carried on the job, the worker mirrors exactly the bytes the rules cleared — an upstream packument mutated in the enqueue → process window cannot substitute a different artifact.

Loop robustness and supervision

The loop is wrapped so a single bad iteration cannot kill the worker thread: a transient receive fetch publish error, or an undecodable body, is caught, logged, and the loop backs off and continues. (Job-level "retry is don't ack" is a separate concern — it governs whether one message redelivers; it does not protect the loop, since an escaping exception would still tear the thread down.) The composition root holds the worker under concurrently_ alongside the server, so a genuinely fatal error propagates and takes the process down (fail-stop), while transient faults self-recover here. A successful poll advances the WorkerHeartbeat, so a stalled loop is visible to the liveness probe.

Shutdown tears the loop down cleanly: the composition root runs it under concurrently_ within the withEnv resource bracket, so process teardown cancels the loop thread and an in-flight, un-acked message simply redelivers — safe, because publishing is idempotent (a version already present is success).

Ack within the visibility budget

A received message is hidden only for the queue's visibility window. The worker acks on success; before a publish that may run long it extendVisibility to hold the message before the window lapses; on a transient failure it does not ack, so the message redelivers. A batch is processed sequentially, so each job has the full visibility budget rather than competing with its batch-mates for it.

See docs/architecture/cloud-backends.md → "Mirror Queue" and "Process model".

Synopsis

Entry point

runWorker :: Env -> IO () Source #

Run the supervised mirror worker over the composition-root Env: the consume → fetch → verify → publish → ack loop, in the App orchestration monad.

This is a self-contained service entry over the shared Env (the split-ready shape the single-process program runs alongside the server). It does not return under normal operation; its caller brackets it for shutdown.

Loop and job processing (exposed for direct testing)

workerLoop :: App () Source #

The continuous consume loop: long-poll for a batch, process it, repeat.

Each iteration is wrapped so a single failure — a receive that throws, a fetch or publish error, an undecodable body — is caught and logged, then the loop backs off briefly and continues, so one bad iteration cannot kill the worker thread. A successful poll advances the heartbeat (whether or not the batch was empty), so a liveness probe sees the loop is alive; an idle queue is a healthy empty poll, not a stall.

processBatch :: [QueueMessage] -> App () Source #

Process one received batch sequentially, so each job gets the full visibility budget rather than competing with its batch-mates for it. A batch is at most the queue's configured batch size (≤ 10), so sequential processing is a deliberate throughput-vs-budget choice, not a scaling bottleneck.

processJob :: ReceiptHandle -> MirrorJob -> App JobOutcome Source #

Process one mirror job end to end: fetch the artifact, verify it against the job's serve-time-admitted integrity digest, and — only on a match — publish it to the mirror target. Returns the JobOutcome that decides ack vs. redeliver.

The receipt handle is taken so a long publish can extendVisibility to hold the message before its window lapses. The rules are not re-run: the job was gated at serve time.

data JobOutcome Source #

The terminal outcome of processing one mirror job, deciding whether the message is acked or left to redeliver.

Constructors

Succeeded

The publish succeeded, so the job is acked. This covers an idempotent redelivery too: a version already present at the mirror target is a 409 the registry handle treats as success (publishArtifact), so it surfaces here as Succeeded rather than a distinct case.

Dropped Text

A non-retryable fault: the bytes did not match the serve-time digest (tamper), or the publish URL was unformable (misconfiguration). Redelivery cannot help, so the job is dropped after alarming. Carries the reason.

Retried Text

A transient fault: a fetch failure, or a registry rejection worth retrying. The message is left un-acked so it redelivers. Carries the reason.

Instances

Instances details
Show JobOutcome Source # 
Instance details

Defined in Ecluse.Worker

Eq JobOutcome Source # 
Instance details

Defined in Ecluse.Worker

Liveness

workerHeartbeatStaleAfter :: NominalDiffTime Source #

How long the worker's last successful poll may be stale before the loop is considered stalled — the staleness threshold the liveness probe applies.

It is a generous multiple of the long-poll cadence: a healthy idle worker still completes a poll at least every sqsWaitSeconds (≤ 20s by default), so a gap several times that is a genuine stall, not an idle queue. Set well above one poll window so liveness never flaps on normal scheduling jitter.

heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool Source #

Whether the worker's consume loop is healthy as of now, given its last successful poll. This is the liveness signal the single-process /livez probe folds in (see Ecluse.Server), distinct from HTTP readiness.

  • Nothing (no poll yet) is healthy: the worker is still starting, not stalled.
  • A poll within workerHeartbeatStaleAfter is healthy.
  • A poll older than that is unhealthy: the loop has gone quiet for too long.
>>> import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime)
>>> let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0)
>>> heartbeatHealthy t0 Nothing
True
>>> let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10)
>>> heartbeatHealthy now (Just t0)
True
>>> let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300)
>>> heartbeatHealthy later (Just t0)
False

heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool Source #

Read the worker heartbeat and decide liveness against the current wall clock — the IO wrapper the liveness probe calls. True while the consume loop is alive (or still starting); False once the last successful poll is staler than workerHeartbeatStaleAfter.

Integrity verification

data IntegrityResult Source #

The result of verifying fetched bytes against the admitted integrity digests. A sum type, not a Bool, so the mismatch carries the detail an operator needs to explain why a publish was refused.

Constructors

IntegrityVerified

The bytes matched the most authoritative admitted digest.

IntegrityMismatch Text

The bytes failed the integrity gate. Carries a human-readable detail (the digest they were checked against, or that the strongest one was uncomputable).

Instances

Instances details
Show IntegrityResult Source # 
Instance details

Defined in Ecluse.Worker

Eq IntegrityResult Source # 
Instance details

Defined in Ecluse.Worker

verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult Source #

Verify fetched artifact bytes against the most authoritative integrity digest the version carries — never against a weaker one while a stronger is present.

A real npm version carries both a modern SRI sha512 digest and the legacy SHA-1 shasum. Passing on any match would let an artifact that matches the weak SHA-1 but fails the strong sha512 through — and SHA-1 collision resistance is broken, so that is exploitable. So the gate ranks the admitted digests by algorithm authority (strongest first: sha512 / blake2b > sha384 > sha256 > sha1 > md5), and checks the bytes against the strongest one present: the bytes pass iff that digest matches. A weaker digest can neither override nor rescue a failed strong one.

If the strongest digest present is in an algorithm the worker cannot compute, the gate fails closed rather than falling back to a weaker digest — a tampered artifact must never be admitted on the strength of a hash an attacker could forge.

This is the tamper gate before a publish: a mismatch fails the job and never publishes a corrupt or substituted artifact into the private upstream.

>>> import Ecluse.Package (mkHash, HashAlg (SHA1))
>>> fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "0a4d55a8d778e5022fab701977c5d840bbc486d0")
Right IntegrityVerified
>>> fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
Right (IntegrityMismatch "the SHA1 digest did not match the fetched bytes")