api-amqp
@rtorcato/api-amqp wraps amqplib with
small typed helpers: JSON encoding, exchange/queue assertion, and
ack-on-success / nack-on-throw. The publisher and consumer take an amqplib
Channel, so they compose with a shared connection and test without a broker.
Install
pnpm add @rtorcato/api-amqp amqplib
amqplib is a peer dependency — you bring your own version.
Usage
import { connect, createPublisher, createConsumer } from '@rtorcato/api-amqp'
const { connection, channel } = await connect('amqp://localhost')
const publish = await createPublisher<{ id: string }>(channel, { exchange: 'orders' })
publish('order.created', { id: '1' })
await createConsumer<{ id: string }>(channel, { queue: 'orders' }, async (order) => {
await process(order)
})
connect returns { connection, channel } — close the channel then the
connection on shutdown.
Behaviour
- Publisher —
{ exchange, exchangeType?, durable? }.exchangeTypedefaults to'topic',durabletotrue. Messages are publishedpersistentwithcontent-type: application/json. - Consumer —
{ queue, durable?, prefetch?, requeueOnError? }.prefetchdefaults to10. The message is acked when the handler resolves and nacked when it throws; setrequeueOnError: trueto requeue (watch for poison-message loops).
Reconnection
Not included — amqplib doesn't reconnect on its own. For production resilience
wrap connect in a retry loop or use
amqp-connection-manager
and pass its channel to these helpers.