Machine Coding Problem

Job Scheduler

maco60macoAllinfrastructurecommand-&-observer-patternspriorityqueueexponential-backoffconcurrent-locks
Commonly Asked By:GoogleAirbnbNetflixUber

Functional Scope (In-Scope)

  • Advanced Job Execution: Schedule both one-time and interval-based recurring tasks.
  • Exponential Backoff Retries: Delay failed executions exponentially (delay = base × 2^attempt).
  • Dynamic Cancellations: Cancel scheduled tasks by their unique string identifiers thread-safely.
  • Observer Monitoring Telemetry: Emit scheduling and execution state updates through registered observers.

Explicit Boundaries (Out-of-Scope)

  • No Cluster Auto-scaling: Excludes automatic VM deployment to handle high CPU throughput.
  • No Leader-Election Heartbeats: Bypasses distributed consensus nodes (e.g. Paxos / Raft consensus).

Clean reference designs demonstrating prioritized dispatch loops in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

interface JobObserver {
    void onJobSubmitted(String jobId);
    void onJobCompleted(String jobId);
    void onJobFailed(String jobId, int attempt, String errorMsg);
    void onJobRetried(String jobId, int attempt, long nextFireMs);
}

class Job {
    private final String id;
    private final Runnable task;
    private final int priority;
    private final int maxRetries;
    private final long intervalMs; // 0 for one-time

    public Job(String id, Runnable task, int priority, int maxRetries, long intervalMs) {
        this.id = id;
        this.task = task;
        this.priority = priority;
        this.maxRetries = maxRetries;
        this.intervalMs = intervalMs;
    }

    public String getId() { return id; }
    public Runnable getTask() { return task; }
    public int getPriority() { return priority; }
    public int getMaxRetries() { return maxRetries; }
    public long getIntervalMs() { return intervalMs; }
}

class ScheduledJob implements Comparable<ScheduledJob> {
    private final Job job;
    private long nextFireTime;
    private int retryCount = 0;

    public ScheduledJob(Job job, long fireTime) {
        this.job = job;
        this.nextFireTime = fireTime;
    }

    public Job getJob() { return job; }
    public long getNextFireTime() { return nextFireTime; }
    public int getRetryCount() { return retryCount; }

    public void delayForRetry(long baseMs) {
        retryCount++;
        long backoff = baseMs * (long) Math.pow(2, retryCount);
        nextFireTime = System.currentTimeMillis() + backoff;
    }

    public void scheduleNextExecution() {
        nextFireTime = System.currentTimeMillis() + job.getIntervalMs();
        retryCount = 0; // Reset retry count for the next cycle
    }

    @Override
    public int compareTo(ScheduledJob other) {
        if (this.nextFireTime != other.nextFireTime) {
            return Long.compare(this.nextFireTime, other.nextFireTime);
        }
        return Integer.compare(other.job.getPriority(), this.job.getPriority()); // Higher priority first
    }
}

class JobScheduler {
    private final PriorityQueue<ScheduledJob> queue = new PriorityQueue<>();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition entryAdded = lock.newCondition();
    private final ThreadPoolExecutor workerPool = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
    );
    private final List<JobObserver> observers = new CopyOnWriteArrayList<>();
    private final Set<String> cancelledJobs = ConcurrentHashMap.newKeySet();
    private Thread dispatcherThread;
    private volatile boolean running = true;

    public JobScheduler() {
        dispatcherThread = new Thread(this::dispatcherLoop);
        dispatcherThread.start();
    }

    public void addObserver(JobObserver obs) { observers.add(obs); }

    public void schedule(Job job, long delayMs) {
        lock.lock();
        try {
            long fireTime = System.currentTimeMillis() + delayMs;
            queue.add(new ScheduledJob(job, fireTime));
            for (JobObserver obs : observers) obs.onJobSubmitted(job.getId());
            entryAdded.signal();
        } finally {
            lock.unlock();
        }
    }

    public void cancel(String jobId) {
        cancelledJobs.add(jobId);
    }

    private void dispatcherLoop() {
        while (running) {
            lock.lock();
            try {
                while (queue.isEmpty() && running) {
                    entryAdded.await();
                }
                if (!running) break;

                long now = System.currentTimeMillis();
                ScheduledJob head = queue.peek();
                if (head.getNextFireTime() <= now) {
                    ScheduledJob sj = queue.poll();
                    if (!cancelledJobs.contains(sj.getJob().getId())) {
                        workerPool.submit(() -> executeJob(sj));
                    } else {
                        cancelledJobs.remove(sj.getJob().getId());
                    }
                } else {
                    long sleepMs = head.getNextFireTime() - now;
                    entryAdded.await(sleepMs, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } finally {
                lock.unlock();
            }
        }
    }

    private void executeJob(ScheduledJob sj) {
        Job job = sj.getJob();
        try {
            job.getTask().run();
            for (JobObserver obs : observers) obs.onJobCompleted(job.getId());

            // Handle recurring jobs
            if (job.getIntervalMs() > 0) {
                sj.scheduleNextExecution();
                lock.lock();
                try {
                    queue.add(sj);
                    entryAdded.signal();
                } finally {
                    lock.unlock();
                }
            }
        } catch (Exception e) {
            for (JobObserver obs : observers) {
                obs.onJobFailed(job.getId(), sj.getRetryCount() + 1, e.getMessage());
            }

            if (sj.getRetryCount() < job.getMaxRetries()) {
                sj.delayForRetry(1000);
                for (JobObserver obs : observers) {
                    obs.onJobRetried(job.getId(), sj.getRetryCount(), sj.getNextFireTime());
                }
                lock.lock();
                try {
                    queue.add(sj);
                    entryAdded.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public void shutdown() {
        running = false;
        lock.lock();
        try {
            entryAdded.signalAll();
        } finally {
            lock.unlock();
        }
        workerPool.shutdown();
    }
}