Design a Message Queueing System

  • Create your Queue that will hold messages in form of JSON. Standard library queues were not allowed.
  • There was one publisher that can generate messages.
  • There are multiple subscribers that will listen to messages satisfying a particular regex.
  • Subscribers should not be tightly coupled to the system and can be added or removed at runtime.
  • When a subscriber is added to the system, it registers the callback function along with it. And this callback function will be invoked in case some message arrives.
  • There can be a dependency relationship among subscribers i.e if there are two subscribers say A and B and A knows that B has to listen and process first, then only A can listen and process. There were many to many dependency relationships among subscribers.
  • There must a retry mechanism for handling error cases when some exception occurs in listening/ processing messages, that must be retried.

Message.java

package lld.pubsub;

import java.io.Serializable;

public class Message implements Serializable {

    private String regEx;
    // The JSON content
    private String content;

    public Message(String regEx, String content) {
        this.regEx = regEx;
        this.content = content;
    }

    public String getRegEx() {
        return regEx;
    }

    public String getContent() {
        return content;
    }

    @Override
    public String toString() {
        return "Message{" +
                "regEx='" + regEx + '\'' +
                ", content='" + content + '\'' +
                '}';
    }
}

Queue.java

package lld.pubsub;

import java.util.LinkedList;

public class Queue {

    private LinkedList<Message> messageList;

    public Queue() {
        this.messageList = new LinkedList<>();
    }

    public boolean offer(Message message) {
        return messageList.add(message);
    }

    public Message peek() {
        if (messageList.isEmpty()) {
            System.out.println("No messages from publishers to display");
            return null;
        }
        return messageList.getFirst();
    }

    public Message poll() {
        if (messageList.isEmpty()) {
            System.out.println("No messages from publishers to display");
            return null;
        }
        return messageList.removeFirst();
    }
}

Topic.java

package lld.pubsub;

public class Topic {

    private String name;
    private int MAX_TRY;
    private Queue queue = null;

    public Topic(String name, int maxTry) {
        this.name = name;
        this.MAX_TRY = maxTry;
        this.queue = new Queue();
    }

    public String getName() {
        return name;
    }

    public Queue getQueue() {
        return queue;
    }

    public int getMAX_TRY() {
        return MAX_TRY;
    }
}

Consumer.java

package lld.pubsub;

public class Consumer {

    private int PRIORITY = 1;
    private String name;
    private String regEx;

    public Consumer(String name, String regEx) {
        this.name = name;
        this.regEx = regEx;
    }

    public Consumer(String name, String regEx, int priority) {
        this.PRIORITY = priority;
        this.name = name;
        this.regEx = regEx;
    }

    public int getPRIORITY() {
        return PRIORITY;
    }

    public String getName() {
        return name;
    }

    public boolean consume(Message message) {
        if (message.getRegEx().equals(this.regEx)) {
            System.out.println(this.name + " consumed the message :: " + message);
        }
        return true;
    }
}

Broker.java

package lld.pubsub;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;

public class Broker {

    private Topic topic;
    private boolean isTopicAssigned = false;
    private Map<String, Consumer> consumerMap;

    public Broker() {
        consumerMap = new HashMap<>();
    }

    public void setTopic(Topic topic) {
        isTopicAssigned = true;
        this.topic = topic;
    }

    public boolean addConsumer(String consumerKey, Consumer consumer) {
        if (!isTopicAssigned) {
            throw new RuntimeException("Topic not assigned!!");
        }
        if (consumerMap.containsKey(consumerKey)) {
            throw new RuntimeException("Consumer already exists!!");
        }
        synchronized (topic.getQueue()) {
            consumerMap.put(consumerKey, consumer);
        }
        System.out.println("Consumer :" + consumer.getName() + " is added..");
        return true;
    }

    public boolean removeConsumer(String consumerKey) {
        if (!isTopicAssigned) {
            throw new RuntimeException("Topic not assigned!!");
        }
        if (!consumerMap.containsKey(consumerKey)) {
            throw new RuntimeException("Consumer not found!!");
        }
        Consumer consumer = consumerMap.get(consumerKey);
        synchronized (topic.getQueue()) {
            consumerMap.remove(consumerKey);
        }
        System.out.println("Consumer :" + consumer.getName() + " is removed..");
        return true;
    }

    public void publish(Message message, int tryNo) {

        boolean published = topic.getQueue().offer(message);
        if (published) {
            broadcastToSubscribers(message);
        } else if (tryNo <= topic.getMAX_TRY()) {
            tryNo++;
            publish(message, tryNo);
        } else {
            throw new RuntimeException("Maximum try exceeded!!");
        }
    }

    public void broadcastToSubscribers(Message message) {
        List<Consumer> consumerList = new ArrayList<>(consumerMap.values());
        Queue<Consumer> maxHeap = new PriorityQueue<>(new Comparator<Consumer>() {
            @Override
            public int compare(Consumer c1, Consumer c2) {
                return Integer.compare(c2.getPRIORITY(), c1.getPRIORITY());
            }
        });
        // add the consumers in max heap to process
        // based on consumer's priority
        for (Consumer consumer : consumerList) {
            maxHeap.offer(consumer);
        }
        List<Consumer> consumerListFailedToConsumeMessage = new ArrayList<>();
        boolean failed = false;
        for (Consumer consumer : maxHeap) {
            if (!consumer.consume(message)) {
                failed = true;
                consumerListFailedToConsumeMessage.add(consumer);
                // retry this list..
            }
        }
        if (!failed) {
            topic.getQueue().poll();
        }
    }
}

Now test the Message Queue system by a main class.

Driver
package lld.pubsub;

public class Driver {

    public static void main(String[] args) {

        // First create the topic
        Topic topic = new Topic("topic-1", 3);

        // Create the broker and assign the topic
        Broker broker = new Broker();
        broker.setTopic(topic);


        // add consumers to topic
        // consumer-1,2,3 will process messages with regex - abc
        broker.addConsumer("consumer-1", new Consumer("consumer-1", "abc"));
        broker.addConsumer("consumer-2", new Consumer("consumer-2", "abc"));
        broker.addConsumer("consumer-3", new Consumer("consumer-3", "abc"));

        // consumer-4 will process messages with regex - def
        broker.addConsumer("consumer-4", new Consumer("consumer-4", "def"));

        printDivider();

        Message message = new Message("abc", "Message for consumer [1,2,3]");
        broker.publish(message, 1);

        printDivider();

        // remove some consumers
        broker.removeConsumer("consumer-2");

        message = new Message("def", "Message for consumer-4");
        broker.publish(message, 1);

        printDivider();

        // consumer 2 is removed so consumer 1 and 2 will process
        message = new Message("abc", "Message for consumer [1,2,3]");
        broker.publish(message, 1);
    }

    private static void printDivider() {
        System.out.println("-----------------------------------------");
    }
}

The output:

Consumer :consumer-1 is added..
Consumer :consumer-2 is added..
Consumer :consumer-3 is added..
Consumer :consumer-4 is added..
----------------------------------------------------------
consumer-3 consumed the message :: Message{regEx='abc', content='Message for consumer [1,2,3]'}
consumer-2 consumed the message :: Message{regEx='abc', content='Message for consumer [1,2,3]'}
consumer-1 consumed the message :: Message{regEx='abc', content='Message for consumer [1,2,3]'}
----------------------------------------------------------
Consumer :consumer-2 is removed..
consumer-4 consumed the message :: Message{regEx='def', content='Message for consumer-4'}
----------------------------------------------------------
consumer-3 consumed the message :: Message{regEx='abc', content='Message for consumer [1,2,3]'}
consumer-1 consumed the message :: Message{regEx='abc', content='Message for consumer [1,2,3]'}

Categories: LLD

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s