Functional Scope (In-Scope)
- Monotonic Fencing Tokens Engine: Generates incrementing numeric keys per lease to block outdated client calls on resources.
- Lease-Based Auto-Expiration: Releases locks automatically on expiration to prevent system lockouts if a client crashes.
- Compare-And-Swap (CAS) Mechanics: Performs atomic in-memory lock changes securely to prevent collision risks.
- Deadlock Wait-For Dependency Graph: Constructs client lock dependencies and analyzes loops to abort cyclic blocks immediately.
Explicit Boundaries (Out-of-Scope)
- Network Packet Transport Sockets: Ignores physical RPC protocol formats, REST endpoints, and UDP packets.
- Continuous Hard Disk Logging: Skips file sync write barriers and sequential WAL records.
Production reference implementations demonstrating fencing validation, CAS acquisitions, and cycle-based deadlock prevention in Java and Python:
// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
class DistributedLock {
private final String lockId;
private final ReentrantLock internalLock = new ReentrantLock();
private String holderClientId;
private long leaseExpiryTime;
private int reentrancyCount = 0;
private final AtomicLong fencingTokenCounter = new AtomicLong(0);
private long activeFencingToken = 0;
public DistributedLock(String lockId) {
this.lockId = lockId;
}
public String getLockId() { return lockId; }
public String getHolderClientId() {
internalLock.lock();
try { return holderClientId; } finally { internalLock.unlock(); }
}
public long getLeaseExpiryTime() {
internalLock.lock();
try { return leaseExpiryTime; } finally { internalLock.unlock(); }
}
public long getActiveFencingToken() {
internalLock.lock();
try { return activeFencingToken; } finally { internalLock.unlock(); }
}
public long tryAcquire(String clientId, long ttlMillis, long now) {
internalLock.lock();
try {
if (holderClientId == null || leaseExpiryTime < now) {
// Free or expired lock acquisition
this.holderClientId = clientId;
this.leaseExpiryTime = now + ttlMillis;
this.reentrancyCount = 1;
this.activeFencingToken = fencingTokenCounter.incrementAndGet();
return activeFencingToken;
} else if (holderClientId.equals(clientId)) {
// Reentrant acquire
this.reentrancyCount++;
this.leaseExpiryTime = now + ttlMillis; // Renew lease
return activeFencingToken;
}
return -1; // Failed to acquire
} finally {
internalLock.unlock();
}
}
public boolean tryRelease(String clientId) {
internalLock.lock();
try {
if (holderClientId != null && holderClientId.equals(clientId)) {
reentrancyCount--;
if (reentrancyCount == 0) {
this.holderClientId = null;
this.leaseExpiryTime = 0;
}
return true;
}
return false;
} finally {
internalLock.unlock();
}
}
public boolean tryRenew(String clientId, long ttlMillis, long now) {
internalLock.lock();
try {
if (holderClientId != null && holderClientId.equals(clientId) && leaseExpiryTime >= now) {
this.leaseExpiryTime = now + ttlMillis;
return true;
}
return false;
} finally {
internalLock.unlock();
}
}
}
class FencingTokenValidator {
// Resource ID -> Last Accepted Monotonic Fencing Token
private final ConcurrentHashMap<String, Long> resourceTokens = new ConcurrentHashMap<>();
public boolean validateAndWrite(String resourceId, long incomingFencingToken, String payload) {
// Atomic compare-and-swap update via compute
final boolean[] accepted = { false };
resourceTokens.compute(resourceId, (id, currentToken) -> {
if (currentToken == null || incomingFencingToken > currentToken) {
accepted[0] = true;
return incomingFencingToken;
}
return currentToken;
});
if (accepted[0]) {
System.out.println(" [WRITE ACCEPTED] Token " + incomingFencingToken + " wrote: " + payload);
return true;
} else {
System.out.println(" [WRITE REJECTED] Stale Token " + incomingFencingToken + " blocked for write: " + payload);
return false;
}
}
}
class DeadlockDetector {
// Wait-For Graph: Client A waiting for Client B
private final Map<String, Set<String>> waitForGraph = new ConcurrentHashMap<>();
public synchronized void addWaitRelation(String clientA, String clientB) {
waitForGraph.computeIfAbsent(clientA, k -> new HashSet<>()).add(clientB);
}
public synchronized void removeWaitRelation(String clientA, String clientB) {
Set<String> relations = waitForGraph.get(clientA);
if (relations != null) {
relations.remove(clientB);
if (relations.isEmpty()) waitForGraph.remove(clientA);
}
}
public synchronized boolean detectDeadlock() {
Set<String> visited = new HashSet<>();
Set<String> stack = new HashSet<>();
for (String node : waitForGraph.keySet()) {
if (hasCycleDFS(node, visited, stack)) {
return true;
}
}
return false;
}
private boolean hasCycleDFS(String node, Set<String> visited, Set<String> stack) {
if (stack.contains(node)) return true;
if (visited.contains(node)) return false;
visited.add(node);
stack.add(node);
Set<String> neighbors = waitForGraph.get(node);
if (neighbors != null) {
for (String neighbor : neighbors) {
if (hasCycleDFS(neighbor, visited, stack)) {
return true;
}
}
}
stack.remove(node);
return false;
}
}
class DistributedLockManager {
private final ConcurrentHashMap<String, DistributedLock> lockStore = new ConcurrentHashMap<>();
private final DeadlockDetector deadlockDetector = new DeadlockDetector();
public long acquireLock(String lockId, String clientId, long ttlMillis) {
DistributedLock lock = lockStore.computeIfAbsent(lockId, k -> new DistributedLock(lockId));
long now = System.currentTimeMillis();
long token = lock.tryAcquire(clientId, ttlMillis, now);
if (token != -1) {
return token;
}
// Wait-for graph checking
String currentHolder = lock.getHolderClientId();
if (currentHolder != null && !currentHolder.equals(clientId)) {
deadlockDetector.addWaitRelation(clientId, currentHolder);
if (deadlockDetector.detectDeadlock()) {
deadlockDetector.removeWaitRelation(clientId, currentHolder);
throw new IllegalStateException("Deadlock cycle detected! Locking aborted.");
}
}
return -1;
}
public boolean releaseLock(String lockId, String clientId) {
DistributedLock lock = lockStore.get(lockId);
if (lock != null) {
String currentHolder = lock.getHolderClientId();
boolean released = lock.tryRelease(clientId);
if (released && currentHolder != null) {
deadlockDetector.removeWaitRelation(clientId, currentHolder);
}
return released;
}
return false;
}
public boolean renewLock(String lockId, String clientId, long ttlMillis) {
DistributedLock lock = lockStore.get(lockId);
if (lock != null) {
return lock.tryRenew(clientId, ttlMillis, System.currentTimeMillis());
}
return false;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== STARTING DISTRIBUTED LOCK MANAGER SIMULATION ===");
DistributedLockManager manager = new DistributedLockManager();
FencingTokenValidator storageValidator = new FencingTokenValidator();
// Scenario 1: Reentrancy Test
System.out.println("--- Testing Reentrant Locking ---");
long tokenA1 = manager.acquireLock("lock-1", "client-A", 1000);
System.out.println(" Acquired lock-1 (Token: " + tokenA1 + ")");
long tokenA2 = manager.acquireLock("lock-1", "client-A", 1000);
System.out.println(" Reentered lock-1 successfully (Token: " + tokenA2 + ")");
manager.releaseLock("lock-1", "client-A");
manager.releaseLock("lock-1", "client-A");
System.out.println(" Released reentrant lock.");
// Scenario 2: Fencing Token Race Condition Resolution (Martin Kleppmann's classic scenario)
System.out.println("\n--- Testing Fencing Token Race Condition Resolution ---");
// Client A acquires lock-2 with 100ms TTL
long clientAToken = manager.acquireLock("lock-2", "client-A", 100);
System.out.println(" Client A acquires lock-2 (Token: " + clientAToken + ")");
// Client A goes to sleep simulating a long Stop-the-World GC Pause
System.out.println(" [STW GC] Client A enters deep GC pause...");
Thread.sleep(200); // 200ms sleep (> 100ms TTL)
// Lock expires. Client B acquires lock-2 and gets a newer token
long clientBToken = manager.acquireLock("lock-2", "client-B", 1000);
System.out.println(" Client B acquires lock-2 (Token: " + clientBToken + ")");
// Client B successfully writes to storage
storageValidator.validateAndWrite("db-row-777", clientBToken, "Client B Update");
// Client A wakes up from GC and tries to write with stale token
System.out.println(" [GC OVER] Client A wakes up and attempts stale write...");
storageValidator.validateAndWrite("db-row-777", clientAToken, "Client A Stale Update");
// Scenario 3: Deadlock Detection
System.out.println("\n--- Testing Deadlock Detection ---");
manager.acquireLock("resource-X", "client-1", 5000);
manager.acquireLock("resource-Y", "client-2", 5000);
// Simulate wait-for graph: Client 1 wants Y, Client 2 wants X
System.out.println(" Constructing wait-for relations...");
try {
manager.acquireLock("resource-Y", "client-1", 5000);
} catch (Exception e) {
System.out.println(" Immediate request tracking failed or blocked.");
}
try {
System.out.println(" Client 2 attempting lock on resource-X...");
manager.acquireLock("resource-X", "client-2", 5000);
} catch (IllegalStateException e) {
System.out.println(" [DEADLOCK BLOCKED] " + e.getMessage());
}
System.out.println("=== DISTRIBUTED LOCK MANAGER SIMULATION COMPLETE ===");
}
}