| Safe Haskell | None |
|---|---|
| Language | GHC2021 |
Ecluse.Worker
Description
The mirror worker: the supervised consume loop that turns enqueued jobs into mirrored packages.
The worker is the consumer end of the demand-driven mirror queue (see
Ecluse.Queue). runWorker long-polls the queue, and for each received job:
- fetches the artifact bytes from the public upstream named on the job,
- verifies those bytes against the integrity digest the job carries — the digest the rules admitted at serve time, not a fresh re-fetch,
- assembles the npm publish document and publishes it to the mirror target
(
envRegistry, resolved at the composition root with the bearer from the Ecluse.Credential provider), and - acknowledges the job.
The integrity gate is the security crux
A mirrored artifact is later served from the private upstream __without re-running the rules__, so a corrupt or tampered artifact must never enter it. Verification is therefore the gate: a hash mismatch fails the job with no publish and is logged loudly. Because the digest is the serve-time-admitted one carried on the job, the worker mirrors exactly the bytes the rules cleared — an upstream packument mutated in the enqueue → process window cannot substitute a different artifact.
Loop robustness and supervision
The loop is wrapped so a single bad iteration cannot kill the worker thread: a
transient receive fetch publish error, or an undecodable body, is caught,
logged, and the loop backs off and continues. (Job-level "retry is don't ack" is a
separate concern — it governs whether one message redelivers; it does not protect
the loop, since an escaping exception would still tear the thread down.) The
composition root holds the worker under concurrently_ alongside the server, so a
genuinely fatal error propagates and takes the process down (fail-stop), while
transient faults self-recover here. A successful poll advances the
WorkerHeartbeat, so a stalled loop is visible to the liveness probe.
Shutdown tears the loop down cleanly: the composition root runs it under
concurrently_ within the withEnv resource bracket, so process teardown cancels
the loop thread and an in-flight, un-acked message simply redelivers — safe, because
publishing is idempotent (a version already present is success).
Ack within the visibility budget
A received message is hidden only for the queue's visibility window. The worker
acks on success; before a publish that may run long it
extendVisibility to hold the message before the window lapses; on a
transient failure it does not ack, so the message redelivers. A batch is
processed sequentially, so each job has the full visibility budget rather than
competing with its batch-mates for it.
See docs/architecture/cloud-backends.md → "Mirror Queue" and "Process model".
Synopsis
- runWorker :: Env -> IO ()
- workerLoop :: App ()
- processBatch :: [QueueMessage] -> App ()
- processJob :: ReceiptHandle -> MirrorJob -> App JobOutcome
- data JobOutcome
- workerHeartbeatStaleAfter :: NominalDiffTime
- heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool
- heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool
- data IntegrityResult
- verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult
Entry point
runWorker :: Env -> IO () Source #
Run the supervised mirror worker over the composition-root Env: the
consume → fetch → verify → publish → ack loop, in the App orchestration monad.
This is a self-contained service entry over the shared Env (the split-ready
shape the single-process program runs alongside the server). It does not return
under normal operation; its caller brackets it for shutdown.
Loop and job processing (exposed for direct testing)
workerLoop :: App () Source #
The continuous consume loop: long-poll for a batch, process it, repeat.
Each iteration is wrapped so a single failure — a receive that throws, a fetch or
publish error, an undecodable body — is caught and logged, then the loop backs off
briefly and continues, so one bad iteration cannot kill the worker thread. A
successful poll advances the heartbeat (whether or not the batch was empty), so a
liveness probe sees the loop is alive; an idle queue is a healthy empty poll, not a
stall.
processBatch :: [QueueMessage] -> App () Source #
Process one received batch sequentially, so each job gets the full visibility budget rather than competing with its batch-mates for it. A batch is at most the queue's configured batch size (≤ 10), so sequential processing is a deliberate throughput-vs-budget choice, not a scaling bottleneck.
processJob :: ReceiptHandle -> MirrorJob -> App JobOutcome Source #
Process one mirror job end to end: fetch the artifact, verify it against the
job's serve-time-admitted integrity digest, and — only on a match — publish it to
the mirror target. Returns the JobOutcome that decides ack vs. redeliver.
The receipt handle is taken so a long publish can extendVisibility
to hold the message before its window lapses. The rules are not re-run: the
job was gated at serve time.
data JobOutcome Source #
The terminal outcome of processing one mirror job, deciding whether the message is acked or left to redeliver.
Constructors
| Succeeded | The publish succeeded, so the job is acked. This covers an idempotent
redelivery too: a version already present at the mirror target is a |
| Dropped Text | A non-retryable fault: the bytes did not match the serve-time digest (tamper), or the publish URL was unformable (misconfiguration). Redelivery cannot help, so the job is dropped after alarming. Carries the reason. |
| Retried Text | A transient fault: a fetch failure, or a registry rejection worth retrying. The message is left un-acked so it redelivers. Carries the reason. |
Instances
| Show JobOutcome Source # | |
Defined in Ecluse.Worker Methods showsPrec :: Int -> JobOutcome -> ShowS # show :: JobOutcome -> String # showList :: [JobOutcome] -> ShowS # | |
| Eq JobOutcome Source # | |
Defined in Ecluse.Worker | |
Liveness
workerHeartbeatStaleAfter :: NominalDiffTime Source #
How long the worker's last successful poll may be stale before the loop is considered stalled — the staleness threshold the liveness probe applies.
It is a generous multiple of the long-poll cadence: a healthy idle worker still
completes a poll at least every sqsWaitSeconds (≤ 20s by
default), so a gap several times that is a genuine stall, not an idle queue. Set
well above one poll window so liveness never flaps on normal scheduling jitter.
heartbeatHealthy :: UTCTime -> Maybe UTCTime -> Bool Source #
Whether the worker's consume loop is healthy as of now, given its last
successful poll. This is the liveness signal the single-process /livez probe
folds in (see Ecluse.Server), distinct from HTTP readiness.
Nothing(no poll yet) is healthy: the worker is still starting, not stalled.- A poll within
workerHeartbeatStaleAfteris healthy. - A poll older than that is unhealthy: the loop has gone quiet for too long.
>>>import Data.Time (UTCTime (UTCTime), fromGregorian, secondsToDiffTime)>>>let t0 = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 0)>>>heartbeatHealthy t0 NothingTrue
>>>let now = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 10)>>>heartbeatHealthy now (Just t0)True
>>>let later = UTCTime (fromGregorian 2020 1 1) (secondsToDiffTime 300)>>>heartbeatHealthy later (Just t0)False
heartbeatHealthyNow :: WorkerHeartbeat -> IO Bool Source #
Read the worker heartbeat and decide liveness against the current wall clock —
the IO wrapper the liveness probe calls. True while the consume loop is alive
(or still starting); False once the last successful poll is staler than
workerHeartbeatStaleAfter.
Integrity verification
data IntegrityResult Source #
The result of verifying fetched bytes against the admitted integrity digests.
A sum type, not a Bool, so the mismatch carries the detail an operator needs to
explain why a publish was refused.
Constructors
| IntegrityVerified | The bytes matched the most authoritative admitted digest. |
| IntegrityMismatch Text | The bytes failed the integrity gate. Carries a human-readable detail (the digest they were checked against, or that the strongest one was uncomputable). |
Instances
| Show IntegrityResult Source # | |
Defined in Ecluse.Worker Methods showsPrec :: Int -> IntegrityResult -> ShowS # show :: IntegrityResult -> String # showList :: [IntegrityResult] -> ShowS # | |
| Eq IntegrityResult Source # | |
Defined in Ecluse.Worker Methods (==) :: IntegrityResult -> IntegrityResult -> Bool # (/=) :: IntegrityResult -> IntegrityResult -> Bool # | |
verifyIntegrity :: NonEmpty Hash -> ByteString -> IntegrityResult Source #
Verify fetched artifact bytes against the most authoritative integrity digest the version carries — never against a weaker one while a stronger is present.
A real npm version carries both a modern SRI sha512 digest and the legacy SHA-1
shasum. Passing on any match would let an artifact that matches the weak SHA-1
but fails the strong sha512 through — and SHA-1 collision resistance is broken, so
that is exploitable. So the gate ranks the admitted digests by algorithm authority
(strongest first: sha512 / blake2b > sha384 > sha256 > sha1 > md5), and
checks the bytes against the strongest one present: the bytes pass iff that digest
matches.
A weaker digest can neither override nor rescue a failed strong one.
If the strongest digest present is in an algorithm the worker cannot compute, the gate fails closed rather than falling back to a weaker digest — a tampered artifact must never be admitted on the strength of a hash an attacker could forge.
This is the tamper gate before a publish: a mismatch fails the job and never publishes a corrupt or substituted artifact into the private upstream.
>>>import Ecluse.Package (mkHash, HashAlg (SHA1))>>>fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "0a4d55a8d778e5022fab701977c5d840bbc486d0")Right IntegrityVerified
>>>fmap (\h -> verifyIntegrity (h :| []) "Hello World") (mkHash SHA1 "da39a3ee5e6b4b0d3255bfef95601890afd80709")Right (IntegrityMismatch "the SHA1 digest did not match the fetched bytes")