RabbitMQ: automatic reconnect with topology re-assert on connection/channel loss#120
Conversation
Barecheck - Code coverage reportTotal: 92.78%Your code coverage diff: 0.00% ▴ ✅ All code changes are covered |
… re-assert Co-authored-by: owens1127 <98496129+owens1127@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This pull request implements automatic reconnection with topology re-assertion for RabbitMQ connections and channels, addressing fault tolerance issues where dropped connections or deleted queues required manual API restarts.
Changes:
- Added connection-level resilience with infinite retry loop using jittered exponential backoff (1s to 30s max delay)
- Implemented channel-level resilience with automatic topology re-assertion (queue recreation) on channel loss
- Added comprehensive unit tests for both connection and queue resilience behaviors
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| src/integrations/rabbitmq/connection.ts | Replaced simple connection logic with retry loop, connection loss handlers, and configurable backoff |
| src/integrations/rabbitmq/connection.test.ts | Added tests for jittered delay, concurrent connection deduplication, reconnection, and $disconnect |
| src/integrations/rabbitmq/queue.ts | Replaced flag-based channel management with channel instance caching and lazy recreation with topology re-assertion |
| src/integrations/rabbitmq/queue.test.ts | Added tests for topology assertion, channel reuse, re-assertion after loss, and concurrent send deduplication |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| $disconnect(): void { | ||
| this.destroyed = true | ||
| this.conn?.close() |
There was a problem hiding this comment.
The $disconnect() method calls this.conn?.close() which is non-blocking, but it doesn't wait for the close to complete or handle potential errors during close. If close() throws or if there are in-flight operations, they may not be handled gracefully.
Consider using await this.conn?.close() if the close operation is async, or wrapping in try-catch to handle potential errors during disconnection. This is especially important for graceful shutdown scenarios.
| $disconnect(): void { | |
| this.destroyed = true | |
| this.conn?.close() | |
| async $disconnect(): Promise<void> { | |
| this.destroyed = true | |
| if (this.conn) { | |
| try { | |
| await this.conn.close() | |
| } catch (err) { | |
| logger.warn( | |
| "RABBITMQ_DISCONNECT_FAILED", | |
| err instanceof Error ? err : new Error(String(err)) | |
| ) | |
| } finally { | |
| this.conn = null | |
| } | |
| } |
| const INITIAL_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_INITIAL_DELAY_MS ?? "1000") | ||
| const MAX_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_MAX_DELAY_MS ?? "30000") | ||
|
|
There was a problem hiding this comment.
Missing input validation for environment variables: The parseInt() calls for RABBITMQ_RETRY_INITIAL_DELAY_MS and RABBITMQ_RETRY_MAX_DELAY_MS don't validate that the parsed values are valid numbers. If the environment variable contains non-numeric values, parseInt() will return NaN, which will cause unexpected behavior in the jitteredDelay function (resulting in NaN delays).
Consider adding validation to ensure these values are positive numbers, or at minimum check for isNaN() and provide a fallback or throw a clear error message.
| const INITIAL_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_INITIAL_DELAY_MS ?? "1000") | |
| const MAX_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_MAX_DELAY_MS ?? "30000") | |
| function parsePositiveIntEnv( | |
| rawValue: string | undefined, | |
| defaultValue: number, | |
| envName: string | |
| ): number { | |
| if (rawValue === undefined) { | |
| return defaultValue | |
| } | |
| const parsed = Number.parseInt(rawValue, 10) | |
| if (Number.isNaN(parsed) || parsed <= 0) { | |
| logger.warn( | |
| "RABBITMQ_INVALID_ENV", | |
| new Error(`Invalid value for ${envName}: ${rawValue}`), | |
| { envName, rawValue, defaultValue } | |
| ) | |
| return defaultValue | |
| } | |
| return parsed | |
| } | |
| const INITIAL_RETRY_DELAY_MS = parsePositiveIntEnv( | |
| process.env.RABBITMQ_RETRY_INITIAL_DELAY_MS, | |
| 1000, | |
| "RABBITMQ_RETRY_INITIAL_DELAY_MS" | |
| ) | |
| const MAX_RETRY_DELAY_MS = parsePositiveIntEnv( | |
| process.env.RABBITMQ_RETRY_MAX_DELAY_MS, | |
| 30000, | |
| "RABBITMQ_RETRY_MAX_DELAY_MS" | |
| ) |
| this.user = args.user | ||
| this.password = args.password | ||
| this.port = parseInt(args.port.toString()) | ||
| this.heartbeat = args.heartbeat ?? parseInt(process.env.RABBITMQ_HEARTBEAT ?? "60") |
There was a problem hiding this comment.
Similar to lines 6-7, the heartbeat parsing doesn't validate for NaN. If RABBITMQ_HEARTBEAT contains a non-numeric value, this will pass NaN to the amqplib connect call, which may cause connection issues or unexpected behavior.
Add validation to ensure the heartbeat value is a positive number, with a clear error message if invalid.
| this.heartbeat = args.heartbeat ?? parseInt(process.env.RABBITMQ_HEARTBEAT ?? "60") | |
| const envHeartbeatStr = process.env.RABBITMQ_HEARTBEAT | |
| const envHeartbeat = parseInt(envHeartbeatStr ?? "60", 10) | |
| const heartbeat = args.heartbeat ?? envHeartbeat | |
| if (!Number.isFinite(heartbeat) || heartbeat <= 0) { | |
| const providedValue = | |
| args.heartbeat !== undefined ? String(args.heartbeat) : envHeartbeatStr ?? "60" | |
| throw new Error( | |
| `Invalid RabbitMQ heartbeat value "${providedValue}". It must be a positive number.` | |
| ) | |
| } | |
| this.heartbeat = heartbeat |
| await this.connectPromise | ||
| } | ||
| const conn = this.conn | ||
| if (!conn) { |
There was a problem hiding this comment.
Race condition in connection handling: If the connection is lost immediately after connectPromise completes (line 88) but before this.conn is read (line 90), the handleConnectionLoss event handler could set this.conn to null, causing the check at line 91 to throw "Failed to connect to RabbitMQ" even though reconnection is being initiated.
This is a narrow race window, but consider capturing the connection reference immediately after the await completes to avoid this issue, or add additional null checking with appropriate error messaging.
| throw new Error("Failed to create channel") | ||
| } | ||
|
|
||
| const channel = await this.getChannel() |
There was a problem hiding this comment.
The getChannel() call replaces the previous error-prone connection logic, but errors from channel creation will propagate to callers. Since the sendBuffer method (lines 71-80, outside the diff) catches and logs errors without rethrowing, send operations will silently fail when channel creation fails. This could lead to message loss during connection issues.
Consider documenting this behavior or ensuring that errors from getChannel() are handled appropriately by the caller to avoid silent failures.
The broker integration had no fault tolerance — a dropped connection or externally deleted queue required a manual restart to recover. This replaces both the connection and queue channel management with resilient implementations.
Connection (
connection.ts)min(1000ms × 2^attempt, 30s) × [0.5, 1.0]jitter prevents thundering herdclose/errorlisteners on every connection; on either event the reference is nulled and reconnect is scheduleddestroyedflag —$disconnect()exits the retry loop cleanly; concurrentcreateChannel()calls share a single in-flightconnectPromiseRABBITMQ_RETRY_INITIAL_DELAY_MS,RABBITMQ_RETRY_MAX_DELAY_MS,RABBITMQ_HEARTBEATQueue channel (
queue.ts)ch.assertQueue(name, { durable: true })called on each new channel, so queues are recreated if deleted externallyclose/errorlisteners on every channel; loss clears the reference so the nextsend/sendJsonlazily recreates and re-assertsisReady/isConnectingflag pair with a directchannelinstance +channelPromisegate for concurrent sendsTests
connection.test.ts:jitteredDelayrange bounds, concurrent connect deduplication, reconnect-after-close,$disconnectsuppressing future reconnectsqueue.test.ts: topology assertion on first send, channel reuse across sends, re-assertion after channel loss, concurrent-send deduplicationOriginal prompt
This pull request was created from Copilot chat.
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.