Publisher & Subscriber (Pub Sub) With RabbitMQ

Publisher & Subscriber (Pub Sub) With RabbitMQ

ยท

4 min read

Pub/Sub, short for Publish/Subscribe, is a messaging pattern and architectural design used in distributed systems and communication systems. It facilitates the exchange of messages or events between different components or systems in a decoupled and scalable manner. In the Pub/Sub model, there are two main roles: publishers and subscribers.

Here's how Pub/Sub works:

  • Publishers: Publishers are responsible for producing and sending messages or events to a central message broker or topic/channel. They don't need to know who the subscribers are or what happens to the messages once published. Publishers simply "publish" messages with a specific topic or category.

  • Subscribers: Subscribers express their interest in specific topics or channels and receive messages related to those topics. Subscribers are unaware of the publishers or other subscribers. They "subscribe" to topics they care about and receive relevant messages when they are published.

Key characteristics and concepts of the Pub/Sub pattern:

  • Decoupling: Pub/Sub decouples the producers (publishers) from the consumers (subscribers). Publishers don't need to know who the subscribers are, and subscribers don't need to know who the publishers are. This promotes loose coupling in distributed systems.

  • Scalability: Pub/Sub systems are highly scalable because new publishers and subscribers can be added without affecting the existing components. The message broker or middleware handles the distribution of messages.

  • Flexibility: Subscribers can subscribe to multiple topics or channels, allowing them to receive relevant messages from various sources. Publishers can also publish messages on multiple topics.

  • Asynchrony: Messages are delivered asynchronously, meaning that publishers and subscribers operate independently of each other. This is useful for building real-time and event-driven systems.

  • Message Broker: A message broker or message middleware acts as an intermediary between publishers and subscribers. It manages the distribution of messages, routing messages to the appropriate subscribers based on their subscriptions.

  • Topics/Channels: Messages are categorized into topics or channels, which help in organizing and filtering messages. Subscribers express their interest by subscribing to specific topics or channels.

Use cases for Pub/Sub include:

  • Real-Time Data Streaming: Pub/Sub is commonly used in real-time data streaming systems, such as financial market data feeds, IoT data ingestion, and social media updates.

  • Notifications and Alerts: It's used to deliver notifications and alerts to users or systems when specific events occur, such as email notifications, push notifications in mobile apps, and system monitoring alerts.

  • Distributed Systems: Pub/Sub helps in building loosely coupled and scalable distributed systems, where components need to communicate without direct dependencies.

  • Event-Driven Architectures: Pub/Sub is a fundamental pattern in event-driven architectures, where components communicate and react to events and changes in the system.

Popular technologies and protocols that implement Pub/Sub include Apache Kafka, RabbitMQ, MQTT, and cloud-based message queuing services like Amazon SNS (Simple Notification Service) and Google Cloud Pub/Sub. These systems provide the infrastructure and tools to enable efficient Pub/Sub communication in various application domains.

Pub/Sub With RabbitMQ

In this tutorial, we'll walk through the process of creating a basic job queue system using RabbitMQ and Node.js. Job queues are essential components of many applications, enabling the efficient distribution and processing of tasks. We'll divide this tutorial into two parts: the publisher (producer) and the consumer (worker).

Publisher (Producer) Code:

const amqp = require("amqplib");

const msg = { number: process.argv[2] };

connect();

async function connect() {
    try {
        // Connect to the RabbitMQ server
        const amqpServer = 'amqps://ghqwhblv:s1QHRN66p_eCt3zDHu8dO-wyOGTuPP7L@armadillo.rmq.cloudamqp.com/ghqwhblv';
        const connection = await amqp.connect(amqpServer);
        const channel = await connection.createChannel();

        // Assert the existence of the "jobs" queue
        await channel.assertQueue("jobs");

        // Send a job to the "jobs" queue
        await channel.sendToQueue("jobs", Buffer.from(JSON.stringify(msg)));
        console.log(`Job sent successfully ${msg.number}`);

        // Close the channel and connection
        await channel.close();
        await connection.close();
    } catch (error) {
        console.log(error);
    }
}

Explanation: In this section, we define the publisher (producer) code responsible for sending jobs to the RabbitMQ server. The publisher connects to the RabbitMQ server, asserts the existence of the "jobs" queue, sends a job to the queue, and then closes the channel and connection.

Consumer (Worker) Code:

const amqp = require("amqplib");

connect();

async function connect() {
    try {
        // Connect to the RabbitMQ server
        const ampqServer = "amqps://ghqwhblv:s1QHRN66p_eCt3zDHu8dO-wyOGTuPP7L@armadillo.rmq.cloudamqp.com/ghqwhblv";
        const connection = await amqp.connect(ampqServer);
        const channel = await connection.createChannel();

        // Assert the existence of the "jobs" queue
        await channel.assertQueue("jobs");

        // Set up a message consumer for the "jobs" queue
        channel.consume("jobs", message => {
            const input = JSON.parse(message.content.toString());
            console.log(`Received job with input ${input.number}`);

            // Acknowledge the message if input.number is 7
            if (input.number == 7) {
                channel.ack(message);
            }
        });

        console.log("Waiting for messages")
    } catch (err) {
        console.log(err);
    }
}

In this section, we define the consumer (worker) code responsible for processing jobs from the "jobs" queue. The consumer connects to the RabbitMQ server, asserts the existence of the "jobs" queue, sets up a message consumer, and processes incoming jobs. We also demonstrate message acknowledgement (channel.ack) for successful processing.

ย