Node.js library for interacting with RabbitMQ based on RxJS streams
Any suggestions, any criticism and any help from the open source community is warmly welcomed!
NabbitMQ is a library that makes it easy for Node.js developers to interact with RabbitMQ. It's built on top of famous amqplib package and it leverages RxJS streams.
Message queues naturally are streams of events, therefore using RxJS with them is an efficient way for developers to solve complex problems in a very elegant fashion.
There are a lot of use cases, when we don't need to setup non standard exchanges and non trivial bindings to queues. In fact, most of the time what we actually need is just a simple queue, just out of the box. And NabbitMQ is here to help you with that! All you need is to provide custom name for the queue and you're ready to go, everything else is handled for you!
However, NabbitMQ allows you to use amqplib's promise-based api directly, so that you can build a more complex solution for your specific needs and still make use of RxJS streams.
Obviously, one of the main reasons for this library to even exist is to have the threshold of entry to RabbitMQ world a bit lower, than it is now, but at the same time to allow us to make use of any piece of API that RabbitMQ provides us with.
The other reason is seamless error handling and helping developers to easily build fault tolerant solutions. For example, NabbitMQ will provide you with an automatically set up dead letter queue that listens to your main queue, unless you just don't need to have.
NabbitMQ has its own set of error classes, therefore it makes it easy for developers to debug and build solutions, that will survive even in the most "cornery" corner cases.
In the end, the main principle and goal is to have a solid and reliable solution out of the box, while working with RabbitMQ.
This snippet demonstrates how you can easily spin up a solid RabbitMQ setup and quickly start to consume a stream of events from it. Under the hood, NabbitMQ creates all necessary bindings, exchanges, dead letter queues and provides you with reconnect logic.
import { ConnectionFactory, ConsumerFactory, PublisherFactory } from 'nabbitmq';
async function main() {
const connectionFactory = new ConnectionFactory();
connectionFactory.setUri('amqp://localhost:5672');
const connection = await connectionFactory.newConnection();
const consumerFactory = new ConsumerFactory(connection);
consumerFactory.setConfigs({queue: {name: 'super_queue'}});
const consumer = await consumerFactory.newConsumer();
consumer.startConsuming().subscribe({next: console.log, error: console.error});
const anotherConnection = await connectionFactory.newConnection();
const publisherFactory = new PublisherFactory(anotherConnection);
publisherFactory.setConfigs({exchange: {name: consumer.getActiveConfigs().exchange.name}});
const publisher = await publisherFactory.newPublisher();
setInterval(() => publisher.publishMessage(Buffer.from('hello hello!'), `${consumer.getActiveConfigs().queue.name}_rk`), 1000);
}
main();
NabbitMQ provides you with two main abstractions: Publisher and Consumer. Each is represented by a class, that implements RabbitMqPeer interface. They are supposed to be instantiated with PublisherFactory and ConsumerFactory classes. However, there is a third abstraction called RabbitMqConnection. This is a class, that holds an active connection data to the used RabbitMQ server. It is injected into publishers and consumers via their factories. Configs to setup RabbitMQ internal structure of exchanges, queues and bindings, are provided to factories in form of plain JavaScript/TypeScript objects. There are interfaces for these objects, called ConsumerConfigs and PublisherConfigs. Most of the values for these objects are optional, the consumers and publishers themselves fill them up with some standard values. For example, if your provide a queue name like my_queue, but don't provide an exchange name, the exchange will be called exchange_my_queue - you can rely on that. Also, if dead letter queue has to be set up (which is optional), but no name for it provided, consumer will also result with default name like my_queue_dlq. Dead letter exchange will have the following form: exchange_my_queue_dlq.
However, it is obvious that you might need to have a rare and not so generic RabbitMQ structure with more than one queue and more than one exchange. Therefore there is an option for you not to provide these configs, but to provide a so called custom setup function. This function accepts a connection object from underlying amqplib package. Inside of this function you can do whatever you need, but it should return a promise that resolves with an object that contains amqplib channel instance and optional consumer's prefetch count, if you use this function to set up a consumer (not mandatory though, a default prefetch value will be set if not provided).
Class: RabbitMqConnection
Public methods:
Description: This class gives you an abstraction over a physical connection to the RabbitMQ server.
Class: ConnectionFactory Public methods:
Description: Factory that produces instances of RabbitMqConnection.
Class: Consumer Public methods:
Generated using TypeDoc