Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/integrations/rabbitmq/connection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import amqplib from "amqplib"
import { afterAll, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import { RabbitConnection, jitteredDelay } from "./connection"

describe("jitteredDelay", () => {
test("returns a value between 50% and 100% of the exponential delay", () => {
for (let attempt = 0; attempt < 5; attempt++) {
const delay = jitteredDelay(attempt)
const exponential = Math.min(1000 * 2 ** attempt, 30_000)
expect(delay).toBeGreaterThanOrEqual(exponential * 0.5)
expect(delay).toBeLessThanOrEqual(exponential)
}
})

test("caps at MAX_RETRY_DELAY_MS", () => {
const delay = jitteredDelay(100)
expect(delay).toBeLessThanOrEqual(30_000)
})
})

describe("RabbitConnection", () => {
const mockChannel = { on: mock(() => {}) }
const mockCreateChannel = mock(() => Promise.resolve(mockChannel))

function makeMockConn(onHandler?: (event: string, handler: unknown) => void) {
return {
createChannel: mockCreateChannel,
on: mock(onHandler ?? (() => {})),
close: mock(() => {})
}
}

const connectSpy = spyOn(amqplib, "connect")

beforeEach(() => {
connectSpy.mockReset()
mockCreateChannel.mockClear()
mockCreateChannel.mockResolvedValue(mockChannel)
})

afterAll(() => {
connectSpy.mockRestore()
})

test("createChannel connects and returns a channel", async () => {
connectSpy.mockResolvedValueOnce(makeMockConn() as never)

const rabbit = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const channel = await rabbit.createChannel()

expect(connectSpy).toHaveBeenCalledTimes(1)
expect(channel).toBe(mockChannel as never)
})

test("concurrent createChannel calls share a single connect attempt", async () => {
connectSpy.mockResolvedValueOnce(makeMockConn() as never)

const rabbit = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const [ch1, ch2] = await Promise.all([rabbit.createChannel(), rabbit.createChannel()])

expect(connectSpy).toHaveBeenCalledTimes(1)
expect(ch1).toBe(mockChannel as never)
expect(ch2).toBe(mockChannel as never)
})

test("reconnects automatically after connection close event", async () => {
let closeHandler: (() => void) | undefined
const firstConn = makeMockConn((event, handler) => {
if (event === "close") closeHandler = handler as () => void
})
const secondConn = makeMockConn()

connectSpy
.mockResolvedValueOnce(firstConn as never)
.mockResolvedValueOnce(secondConn as never)

const rabbit = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
await rabbit.createChannel()
expect(connectSpy).toHaveBeenCalledTimes(1)

// Simulate connection drop
closeHandler?.()

// Wait for async reconnect to initiate
await new Promise(resolve => setTimeout(resolve, 0))

// The reconnect promise is now in-flight; await createChannel which waits for it
await rabbit.createChannel()
expect(connectSpy).toHaveBeenCalledTimes(2)
})

test("$disconnect sets destroyed flag preventing future reconnects", async () => {
let closeHandler: (() => void) | undefined
const conn = makeMockConn((event, handler) => {
if (event === "close") closeHandler = handler as () => void
})
connectSpy.mockResolvedValueOnce(conn as never)

const rabbit = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
await rabbit.createChannel()

rabbit.$disconnect()
closeHandler?.()

await new Promise(resolve => setTimeout(resolve, 0))

// No additional connect call should have been made after destroy
expect(connectSpy).toHaveBeenCalledTimes(1)
})
})
107 changes: 83 additions & 24 deletions src/integrations/rabbitmq/connection.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,101 @@
import { Logger } from "@/lib/utils/logging"
import amqp from "amqplib"

const logger = new Logger("RABBITMQ")

const INITIAL_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_INITIAL_DELAY_MS ?? "1000")
const MAX_RETRY_DELAY_MS = parseInt(process.env.RABBITMQ_RETRY_MAX_DELAY_MS ?? "30000")

export function jitteredDelay(attempt: number): number {
const exponential = Math.min(INITIAL_RETRY_DELAY_MS * 2 ** attempt, MAX_RETRY_DELAY_MS)
return exponential * (0.5 + 0.5 * Math.random())
}

export class RabbitConnection {
private user: string
private password: string
private port: number
private isReady = false
private isConnecting = false
private conn: Promise<amqp.Connection | null> = Promise.resolve(null)

constructor(args: { user: string; password: string; port: string | number }) {
private readonly user: string
private readonly password: string
private readonly port: number
private readonly heartbeat: number
private conn: amqp.Connection | null = null
private connectPromise: Promise<void> | null = null
private destroyed = false

constructor(args: {
user: string
password: string
port: string | number
heartbeat?: number
}) {
this.user = args.user
this.password = args.password
this.port = parseInt(args.port.toString())
this.heartbeat = args.heartbeat ?? parseInt(process.env.RABBITMQ_HEARTBEAT ?? "60")
}

private async connect() {
if (!this.isConnecting && !this.isReady) {
this.isConnecting = true
this.conn = amqp
.connect(`amqp://${this.user}:${this.password}@localhost:${this.port}`)
.finally(() => {
this.isConnecting = false
this.isReady = true
})
await this.conn
private async connectWithRetry(): Promise<void> {
let attempt = 0
while (!this.destroyed) {
if (attempt > 0) {
const delay = jitteredDelay(attempt - 1)
logger.info("RABBITMQ_RECONNECTING", { attempt, delayMs: Math.round(delay) })
await new Promise(resolve => setTimeout(resolve, delay))
}
try {
const conn = await amqp.connect(
`amqp://${this.user}:${this.password}@localhost:${this.port}`,
{ heartbeat: this.heartbeat }
)
this.conn = conn
logger.info("RABBITMQ_CONNECTED", { attempt })
conn.on("close", () => this.handleConnectionLoss("close"))
conn.on("error", err =>
this.handleConnectionLoss(
"error",
err instanceof Error ? err : new Error(String(err))
)
)
return
} catch (err) {
logger.warn(
"RABBITMQ_CONNECT_FAILED",
err instanceof Error ? err : new Error(String(err)),
{ attempt }
)
attempt++
}
}
}

async createChannel() {
await this.connect()
const conn = await this.conn
private handleConnectionLoss(event: string, err?: Error): void {
if (this.conn) {
logger.warn("RABBITMQ_CONNECTION_LOST", err ?? null, { event })
this.conn = null
}
if (!this.destroyed && !this.connectPromise) {
this.connectPromise = this.connectWithRetry().finally(() => {
this.connectPromise = null
})
}
}

async createChannel(): Promise<amqp.Channel> {
if (!this.conn) {
if (!this.connectPromise) {
this.connectPromise = this.connectWithRetry().finally(() => {
this.connectPromise = null
})
}
await this.connectPromise
}
const conn = this.conn
if (!conn) {
throw new Error("Failed to connect to RabbitMQ")
}
return await conn.createChannel()
return conn.createChannel()
}

$disconnect() {
this.conn.then(conn => conn?.close())
$disconnect(): void {
this.destroyed = true
this.conn?.close()
}
}
122 changes: 122 additions & 0 deletions src/integrations/rabbitmq/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import amqplib from "amqplib"
import { afterAll, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import { RabbitConnection } from "./connection"
import { RabbitQueue } from "./queue"

describe("RabbitQueue", () => {
const mockAssertQueue = mock(() =>
Promise.resolve({ queue: "test_queue", messageCount: 0, consumerCount: 0 })
)
const mockSendToQueue = mock(() => true)

function makeMockChannel(onHandler?: (event: string, handler: unknown) => void) {
return {
assertQueue: mockAssertQueue,
sendToQueue: mockSendToQueue,
on: mock(onHandler ?? (() => {}))
}
}

const mockCreateChannel = mock(() => Promise.resolve(makeMockChannel()))
const mockConn = {
createChannel: mockCreateChannel,
on: mock(() => {}),
close: mock(() => {})
}

const connectSpy = spyOn(amqplib, "connect")

beforeEach(() => {
connectSpy.mockReset()
connectSpy.mockResolvedValue(mockConn as never)
mockAssertQueue.mockClear()
mockSendToQueue.mockClear()
mockCreateChannel.mockClear()
mockCreateChannel.mockResolvedValue(makeMockChannel())
})

afterAll(() => {
connectSpy.mockRestore()
})

test("asserts queue topology when channel is first created", async () => {
const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<number>({ queueName: "test_queue", connection })

await queue.send(1)

expect(mockAssertQueue).toHaveBeenCalledTimes(1)
expect(mockAssertQueue).toHaveBeenCalledWith("test_queue", { durable: true })
})

test("sends message after topology assertion", async () => {
const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<number>({ queueName: "test_queue", connection })

await queue.send(42)

expect(mockSendToQueue).toHaveBeenCalledTimes(1)
expect(mockSendToQueue).toHaveBeenCalledWith(
"test_queue",
Buffer.from("42"),
expect.objectContaining({ contentType: "text/plain" })
)
})

test("reuses channel for multiple sends without re-asserting", async () => {
const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<number>({ queueName: "test_queue", connection })

await queue.send(1)
await queue.send(2)
await queue.send(3)

expect(mockAssertQueue).toHaveBeenCalledTimes(1)
expect(mockSendToQueue).toHaveBeenCalledTimes(3)
})

test("re-asserts queue topology after channel loss", async () => {
let closeHandler: (() => void) | undefined
mockCreateChannel.mockResolvedValue(
makeMockChannel((event, handler) => {
if (event === "close") closeHandler = handler as () => void
})
)

const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<number>({ queueName: "test_queue", connection })

await queue.send(1)
expect(mockAssertQueue).toHaveBeenCalledTimes(1)

// Simulate channel close
closeHandler?.()

await queue.send(2)
expect(mockAssertQueue).toHaveBeenCalledTimes(2)
expect(mockSendToQueue).toHaveBeenCalledTimes(2)
})

test("sends JSON objects", async () => {
const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<{ id: number }>({ queueName: "test_queue", connection })

await queue.sendJson({ id: 99 })

expect(mockSendToQueue).toHaveBeenCalledWith(
"test_queue",
Buffer.from(JSON.stringify({ id: 99 })),
expect.objectContaining({ contentType: "application/json" })
)
})

test("concurrent sends share a single channel creation", async () => {
const connection = new RabbitConnection({ user: "guest", password: "guest", port: 5672 })
const queue = new RabbitQueue<number>({ queueName: "test_queue", connection })

await Promise.all([queue.send(1), queue.send(2), queue.send(3)])

expect(mockAssertQueue).toHaveBeenCalledTimes(1)
expect(mockSendToQueue).toHaveBeenCalledTimes(3)
})
})
Loading
Loading