Functional Scope (In-Scope)
- Dynamic Heartbeat Reception: Tracks continuous heartbeat signals per user across multiple device channels.
- State Sweeping Eviction Engines: Periodically infers
AWAY(500ms idle) andOFFLINE(1.5s idle) states. - Stateless Subscription Routing: Handles pub-sub user registries to dispatch real-time status transitions.
- State Change Broadcast filtering: Filters out redundant notifications, ensuring broadcasts only occur on real status changes.
Explicit Boundaries (Out-of-Scope)
- Physical Websocket Layer Buffers: Mocks socket handshakes, networking frames, and raw TCP load balancers.
- Geo-distributed Redis Syncing: Evaluates in-memory data structures rather than implementing dynamic multi-cluster Redis replication.
Production reference implementations demonstrating heartbeat processing, idle sweeps, subscription structures, and async broadcasting in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
enum PresenceStatus {
ONLINE, AWAY, OFFLINE
}
class UserPresence {
private final String userId;
private volatile PresenceStatus status;
private volatile long lastHeartbeatMs;
private final Set<String> activeDevices;
public UserPresence(String userId, PresenceStatus status, long nowMs) {
this.userId = userId;
this.status = status;
this.lastHeartbeatMs = nowMs;
this.activeDevices = ConcurrentHashMap.newKeySet();
}
public String getUserId() { return userId; }
public PresenceStatus getStatus() { return status; }
public void setStatus(PresenceStatus status) { this.status = status; }
public long getLastHeartbeatMs() { return lastHeartbeatMs; }
public void updateHeartbeat(long nowMs) { this.lastHeartbeatMs = nowMs; }
public Set<String> getActiveDevices() { return activeDevices; }
}
class PresenceSystem {
private final ConcurrentHashMap<String, UserPresence> presenceStore = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<String>> subscriberMap = new ConcurrentHashMap<>(); // targetUserId -> Set of SubscriberUserIds
private final ConcurrentLinkedQueue<String> broadcastLog = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService sweeperExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "PresenceSweeper");
thread.setDaemon(true);
return thread;
});
private final long awayThresholdMs = 500; // 500ms for demo responsiveness
private final long offlineThresholdMs = 1500; // 1.5s for demo responsiveness
public PresenceSystem() {
// Start background status sweeper job every 100 milliseconds
sweeperExecutor.scheduleAtFixedRate(this::sweepPresenceStates, 100, 100, TimeUnit.MILLISECONDS);
}
// Heartbeat Receiver endpoint
public void receiveHeartbeat(String userId, String deviceId) {
long now = System.currentTimeMillis();
UserPresence presence = presenceStore.computeIfAbsent(userId, id -> new UserPresence(id, PresenceStatus.ONLINE, now));
presence.updateHeartbeat(now);
presence.getActiveDevices().add(deviceId);
PresenceStatus oldStatus = presence.getStatus();
if (oldStatus != PresenceStatus.ONLINE) {
presence.setStatus(PresenceStatus.ONLINE);
triggerBroadcast(userId, PresenceStatus.ONLINE);
}
}
// Status change subscription manager
public void subscribe(String subscriberId, String targetUserId) {
subscriberMap.computeIfAbsent(targetUserId, k -> ConcurrentHashMap.newKeySet()).add(subscriberId);
}
public void unsubscribe(String subscriberId, String targetUserId) {
Set<String> subs = subscriberMap.get(targetUserId);
if (subs != null) {
subs.remove(subscriberId);
}
}
public UserPresence getPresence(String userId) {
return presenceStore.get(userId);
}
public List<String> getRecentBroadcasts() {
return new ArrayList<>(broadcastLog);
}
// Background job to infer AWAY/OFFLINE states
private void sweepPresenceStates() {
long now = System.currentTimeMillis();
presenceStore.forEach((userId, presence) -> {
long idleTime = now - presence.getLastHeartbeatMs();
PresenceStatus oldStatus = presence.getStatus();
PresenceStatus newStatus = oldStatus;
if (idleTime > offlineThresholdMs) {
newStatus = PresenceStatus.OFFLINE;
presence.getActiveDevices().clear();
} else if (idleTime > awayThresholdMs) {
newStatus = PresenceStatus.AWAY;
}
if (newStatus != oldStatus) {
presence.setStatus(newStatus);
triggerBroadcast(userId, newStatus);
}
});
}
private void triggerBroadcast(String userId, PresenceStatus status) {
Set<String> subscribers = subscriberMap.get(userId);
if (subscribers == null || subscribers.isEmpty()) return;
// Async fan-out simulator
for (String subId : subscribers) {
String logEntry = "BROADCAST [" + subId + "] -> User: " + userId + " changed to: " + status;
broadcastLog.offer(logEntry);
System.out.println(logEntry);
}
}
public void shutdown() {
sweeperExecutor.shutdown();
}
}
public class Main {
public static void main(String[] args) throws Exception {
System.out.println("=== JAVA USER PRESENCE SYSTEM DEMO ===");
PresenceSystem system = new PresenceSystem();
// Alice subscribes to Bob's presence
system.subscribe("Alice", "Bob");
// Charlie subscribes to Bob's presence
system.subscribe("Charlie", "Bob");
// Bob logs in from mobile device
System.out.println("Bob sends heartbeat from Mobile...");
system.receiveHeartbeat("Bob", "MobileDevice");
UserPresence bobsPresence = system.getPresence("Bob");
System.out.println("Bob current status: " + bobsPresence.getStatus());
System.out.println("Bob active devices: " + bobsPresence.getActiveDevices());
// Wait 800ms without heartbeats to transition Bob to AWAY
System.out.println("Waiting 800ms for AWAY transition...");
Thread.sleep(800);
System.out.println("Bob current status: " + bobsPresence.getStatus());
// Bob sends a heartbeat from Desktop
System.out.println("Bob sends heartbeat from Desktop...");
system.receiveHeartbeat("Bob", "DesktopDevice");
System.out.println("Bob current status: " + bobsPresence.getStatus() + ", active devices: " + bobsPresence.getActiveDevices());
// Wait 2000ms to transition Bob to OFFLINE
System.out.println("Waiting 2000ms for OFFLINE transition...");
Thread.sleep(2000);
System.out.println("Bob current status: " + bobsPresence.getStatus() + ", active devices: " + bobsPresence.getActiveDevices());
system.shutdown();
System.out.println("=== END OF JAVA DEMO ===");
}
}