Functional Scope (In-Scope)
- Topic Partitions: Support multi-partition message topic distribution to enable balanced load levels.
- Dynamic Consumer Groups: Allow multiple consumer groups to pull the same topic message logs independently.
- At-Least-Once Delivery: Offset is advanced only after polling commits successfully, enabling reliable log replays.
- Explicit Offset Rewinds: Let consumer groups rewind offsets to replay previous partition historical logs.
Explicit Boundaries (Out-of-Scope)
- No Cluster Networking Broker Consensus: Ignores Raft/ZooKeeper consensus; system is in-process memory-based.
- No File System Storage Segments: Does not serialize messages to local partition log files on hard disks.
Highly scalable production blueprints in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class Message {
private final String id;
private final String payload;
private final long offset;
public Message(String id, String payload, long offset) {
this.id = id;
this.payload = payload;
this.offset = offset;
}
public String getId() { return id; }
public String getPayload() { return payload; }
public long getOffset() { return offset; }
}
class Partition {
private final int id;
private final List<Message> messages = new CopyOnWriteArrayList<>();
private final AtomicInteger offsetGenerator = new AtomicInteger(0);
public Partition(int id) { this.id = id; }
public int getId() { return id; }
public synchronized Message append(String payload) {
int offset = offsetGenerator.getAndIncrement();
String msgId = "MSG-" + id + "-" + offset;
Message msg = new Message(msgId, payload, offset);
messages.add(msg);
return msg;
}
public List<Message> getMessagesFrom(long offset) {
List<Message> result = new ArrayList<>();
for (Message m : messages) {
if (m.getOffset() >= offset) {
result.add(m);
}
}
return result;
}
}
class Topic {
private final String name;
private final List<Partition> partitions = new ArrayList<>();
public Topic(String name, int partitionCount) {
this.name = name;
for (int i = 0; i < partitionCount; i++) {
partitions.add(new Partition(i));
}
}
public String getName() { return name; }
public List<Partition> getPartitions() { return partitions; }
}
class ConsumerGroup {
private final String id;
private final Map<Integer, Long> partitionOffsets = new ConcurrentHashMap<>();
public ConsumerGroup(String id) { this.id = id; }
public String getId() { return id; }
public long getOffset(int partitionId) {
return partitionOffsets.getOrDefault(partitionId, 0L);
}
public void commit(int partitionId, long offset) {
partitionOffsets.put(partitionId, offset);
}
}
class PubSubBroker {
private final Map<String, Topic> topics = new ConcurrentHashMap<>();
private final Map<String, ConsumerGroup> groups = new ConcurrentHashMap<>();
public void createTopic(String name, int partitionCount) {
topics.put(name, new Topic(name, partitionCount));
}
public void registerConsumerGroup(String groupId) {
groups.put(groupId, new ConsumerGroup(groupId));
}
public Message publish(String topicName, int partitionId, String payload) {
Topic topic = topics.get(topicName);
if (topic == null || partitionId >= topic.getPartitions().size()) {
throw new IllegalArgumentException("Invalid topic or partition");
}
return topic.getPartitions().get(partitionId).append(payload);
}
public List<Message> poll(String topicName, int partitionId, String groupId) {
Topic topic = topics.get(topicName);
ConsumerGroup group = groups.get(groupId);
if (topic == null || group == null || partitionId >= topic.getPartitions().size()) {
return Collections.emptyList();
}
Partition partition = topic.getPartitions().get(partitionId);
long nextOffset = group.getOffset(partitionId);
List<Message> newMessages = partition.getMessagesFrom(nextOffset);
if (!newMessages.isEmpty()) {
long lastOffset = newMessages.get(newMessages.size() - 1).getOffset();
group.commit(partitionId, lastOffset + 1);
}
return newMessages;
}
}
public class PubSubDriver {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== KAFKA-LITE PUB-SUB SYSTEM SIMULATION ===");
PubSubBroker broker = new PubSubBroker();
broker.createTopic("orders", 2);
broker.registerConsumerGroup("billing-service");
broker.registerConsumerGroup("analytics-service");
// Concurrent publishing simulation
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
broker.publish("orders", 0, "Order-101-Paid");
broker.publish("orders", 0, "Order-102-Paid");
});
executor.submit(() -> {
broker.publish("orders", 1, "Order-201-Paid");
broker.publish("orders", 1, "Order-202-Paid");
});
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
// Billing Service Polls
System.out.println("\n--- Billing Service Polls Partition 0 ---");
List<Message> billingP0 = broker.poll("orders", 0, "billing-service");
for (Message m : billingP0) {
System.out.println("Billing got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
}
System.out.println("\n--- Billing Service Polls Partition 1 ---");
List<Message> billingP1 = broker.poll("orders", 1, "billing-service");
for (Message m : billingP1) {
System.out.println("Billing got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
}
// Analytics Service Polls
System.out.println("\n--- Analytics Service Polls Partition 0 (Independent offsets) ---");
List<Message> analyticsP0 = broker.poll("orders", 0, "analytics-service");
for (Message m : analyticsP0) {
System.out.println("Analytics got: " + m.getPayload() + " (Offset: " + m.getOffset() + ")");
}
}
}