Machine Coding Problem

Pub-Sub System (Kafka-lite)

maco30maco60macoAllinfrastructureobserver-&-thread-safe-offset-streams
Commonly Asked By:LinkedInConfluentUberNetflix

Functional Scope (In-Scope)

  • Topic Partitions: Support multi-partition message topic distribution to enable balanced load levels.
  • Dynamic Consumer Groups: Allow multiple consumer groups to pull the same topic message logs independently.
  • At-Least-Once Delivery: Offset is advanced only after polling commits successfully, enabling reliable log replays.
  • Explicit Offset Rewinds: Let consumer groups rewind offsets to replay previous partition historical logs.

Explicit Boundaries (Out-of-Scope)

  • No Cluster Networking Broker Consensus: Ignores Raft/ZooKeeper consensus; system is in-process memory-based.
  • No File System Storage Segments: Does not serialize messages to local partition log files on hard disks.

Highly scalable production blueprints in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

class Message {
    private final String id;
    private final String payload;
    private final long offset;

    public Message(String id, String payload, long offset) {
        this.id = id;
        this.payload = payload;
        this.offset = offset;
    }
    public String getId() { return id; }
    public String getPayload() { return payload; }
    public long getOffset() { return offset; }
}

class Partition {
    private final int id;
    private final List<Message> messages = new CopyOnWriteArrayList<>();
    private final AtomicInteger offsetGenerator = new AtomicInteger(0);

    public Partition(int id) { this.id = id; }
    public int getId() { return id; }

    public synchronized Message append(String payload) {
        int offset = offsetGenerator.getAndIncrement();
        String msgId = "MSG-" + id + "-" + offset;
        Message msg = new Message(msgId, payload, offset);
        messages.add(msg);
        return msg;
    }

    public List<Message> getMessagesFrom(long offset) {
        List<Message> result = new ArrayList<>();
        for (Message m : messages) {
            if (m.getOffset() >= offset) {
                result.add(m);
            }
        }
        return result;
    }
}

class Topic {
    private final String name;
    private final List<Partition> partitions = new ArrayList<>();

    public Topic(String name, int partitionCount) {
        this.name = name;
        for (int i = 0; i < partitionCount; i++) {
            partitions.add(new Partition(i));
        }
    }
    public String getName() { return name; }
    public List<Partition> getPartitions() { return partitions; }
}

class ConsumerGroup {
    private final String id;
    private final Map<Integer, Long> partitionOffsets = new ConcurrentHashMap<>();

    public ConsumerGroup(String id) { this.id = id; }
    public String getId() { return id; }

    public long getOffset(int partitionId) {
        return partitionOffsets.getOrDefault(partitionId, 0L);
    }

    public void commit(int partitionId, long offset) {
        partitionOffsets.put(partitionId, offset);
    }
}

class PubSubBroker {
    private final Map<String, Topic> topics = new ConcurrentHashMap<>();
    private final Map<String, ConsumerGroup> groups = new ConcurrentHashMap<>();

    public void createTopic(String name, int partitionCount) {
        topics.put(name, new Topic(name, partitionCount));
    }

    public void registerConsumerGroup(String groupId) {
        groups.put(groupId, new ConsumerGroup(groupId));
    }

    public Message publish(String topicName, int partitionId, String payload) {
        Topic topic = topics.get(topicName);
        if (topic == null || partitionId >= topic.getPartitions().size()) {
            throw new IllegalArgumentException("Invalid topic or partition");
        }
        return topic.getPartitions().get(partitionId).append(payload);
    }

    public List<Message> poll(String topicName, int partitionId, String groupId) {
        Topic topic = topics.get(topicName);
        ConsumerGroup group = groups.get(groupId);
        if (topic == null || group == null || partitionId >= topic.getPartitions().size()) {
            return Collections.emptyList();
        }

        Partition partition = topic.getPartitions().get(partitionId);
        long nextOffset = group.getOffset(partitionId);
        List<Message> newMessages = partition.getMessagesFrom(nextOffset);

        if (!newMessages.isEmpty()) {
            long lastOffset = newMessages.get(newMessages.size() - 1).getOffset();
            group.commit(partitionId, lastOffset + 1);
        }

        return newMessages;
    }
}

public class PubSubDriver {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== KAFKA-LITE PUB-SUB SYSTEM SIMULATION ===");
        PubSubBroker broker = new PubSubBroker();

        broker.createTopic("orders", 2);
        broker.registerConsumerGroup("billing-service");
        broker.registerConsumerGroup("analytics-service");

        // Concurrent publishing simulation
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> {
            broker.publish("orders", 0, "Order-101-Paid");
            broker.publish("orders", 0, "Order-102-Paid");
        });
        executor.submit(() -> {
            broker.publish("orders", 1, "Order-201-Paid");
            broker.publish("orders", 1, "Order-202-Paid");
        });

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.SECONDS);

        // Billing Service Polls
        System.out.println("\n--- Billing Service Polls Partition 0 ---");
        List<Message> billingP0 = broker.poll("orders", 0, "billing-service");
        for (Message m : billingP0) {
            System.out.println("Billing got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
        }

        System.out.println("\n--- Billing Service Polls Partition 1 ---");
        List<Message> billingP1 = broker.poll("orders", 1, "billing-service");
        for (Message m : billingP1) {
            System.out.println("Billing got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
        }

        // Analytics Service Polls
        System.out.println("\n--- Analytics Service Polls Partition 0 (Independent offsets) ---");
        List<Message> analyticsP0 = broker.poll("orders", 0, "analytics-service");
        for (Message m : analyticsP0) {
            System.out.println("Analytics got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
        }
    }
}