Machine Coding Problem

Thumbnail Service

macoAllmediaasync-job-processing
Commonly Asked By:PinterestNetflixDropbox

Functional Scope (In-Scope)

  • Asynchronous Job Queueing: Offloads image transcoding from the API gateway using an in-memory Blocking Queue and worker pool.
  • Idempotent Operations: Uses hashes computed from original Image IDs and Target Resolutions to reuse existing jobs.
  • Robust Exponential Back-off Retries: Automatically re-schedules failed sub-jobs with adaptive back-off.
  • Atomic State Updates & Event callbacks: Maintains strict thread safety and pushes callbacks to webhook endpoints on completion.

Explicit Boundaries (Out-of-Scope)

  • Physical Image Resizing & Compression: Mocks pixel rendering pipelines and image decoders (like libjpeg).
  • Webhooks / Dynamic Load Balancing: Simple console webhook logging replaces fully qualified web hook endpoints.

Production reference implementations demonstrating thread pools, blocking queues, back-off retry logic, and webhook callbacks in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

enum JobStatus {
    PENDING, PROCESSING, DONE, FAILED
}

class Resolution {
    private final int width;
    private final int height;

    public Resolution(int width, int height) {
        this.width = width;
        this.height = height;
    }

    public int getWidth() { return width; }
    public int getHeight() { return height; }

    @Override
    public String toString() {
        return width + "x" + height;
    }
}

class ThumbnailJob {
    private final String id;
    private final String originalImageId;
    private final List<Resolution> resolutions;
    private JobStatus status;
    private int retries;
    private final int maxRetries;
    private final Map<String, String> generatedPaths; // resolution -> storagePath
    private String errorMessage;
    private final long createdAtMs;

    public ThumbnailJob(String id, String originalImageId, List<Resolution> resolutions, int maxRetries) {
        this.id = id;
        this.originalImageId = originalImageId;
        this.resolutions = new ArrayList<>(resolutions);
        this.status = JobStatus.PENDING;
        this.retries = 0;
        this.maxRetries = maxRetries;
        this.generatedPaths = new ConcurrentHashMap<>();
        this.createdAtMs = System.currentTimeMillis();
    }

    public String getId() { return id; }
    public String getOriginalImageId() { return originalImageId; }
    public List<Resolution> getResolutions() { return resolutions; }
    public synchronized JobStatus getStatus() { return status; }
    public synchronized void setStatus(JobStatus status) { this.status = status; }
    public synchronized int getRetries() { return retries; }
    public synchronized void incrementRetries() { this.retries++; }
    public int getMaxRetries() { return maxRetries; }
    public Map<String, String> getGeneratedPaths() { return generatedPaths; }
    public synchronized String getErrorMessage() { return errorMessage; }
    public synchronized void setErrorMessage(String msg) { this.errorMessage = msg; }
}

interface ImageProcessor {
    String process(String imageId, Resolution resolution) throws Exception;
}

class HighQualityImageProcessor implements ImageProcessor {
    @Override
    public String process(String imageId, Resolution res) throws Exception {
        // Simulating unstable imaging network
        if (Math.random() < 0.15) {
            throw new RuntimeException("Transcoder failed to load image buffers.");
        }
        return "/storage/thumbnails/hq_" + imageId + "_" + res.getWidth() + "x" + res.getHeight() + ".jpg";
    }
}

class FastImageProcessor implements ImageProcessor {
    @Override
    public String process(String imageId, Resolution res) throws Exception {
        return "/storage/thumbnails/fast_" + imageId + "_" + res.getWidth() + "x" + res.getHeight() + ".jpg";
    }
}

class ThumbnailService {
    private final ConcurrentHashMap<String, ThumbnailJob> jobs = new ConcurrentHashMap<>();
    private final BlockingQueue<String> jobQueue = new LinkedBlockingQueue<>();
    private final ConcurrentHashMap<String, List<String>> callbacks = new ConcurrentHashMap<>();
    private final ExecutorService workerPool;
    private final ScheduledExecutorService retryScheduler = Executors.newScheduledThreadPool(1);
    private final int defaultMaxRetries = 3;
    private final ImageProcessor imageProcessor;

    public ThumbnailService(int threadCount, ImageProcessor processor) {
        this.imageProcessor = Objects.requireNonNull(processor);
        this.workerPool = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            workerPool.submit(this::processQueue);
        }
    }

    // Submit Job with Idempotency Validation
    public String submitJob(String imageId, List<Resolution> resolutions, String callbackUrl) {
        String jobId = calculateIdempotencyKey(imageId, resolutions);

        // Register callback url if provided
        if (callbackUrl != null) {
            callbacks.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>()).add(callbackUrl);
        }

        // Idempotency Check
        ThumbnailJob existing = jobs.get(jobId);
        if (existing != null) {
            if (existing.getStatus() == JobStatus.FAILED) {
                // Retry/reset failed job
                existing.setStatus(JobStatus.PENDING);
                existing.setErrorMessage(null);
                jobQueue.offer(jobId);
            }
            return jobId;
        }

        ThumbnailJob newJob = new ThumbnailJob(jobId, imageId, resolutions, defaultMaxRetries);
        jobs.put(jobId, newJob);
        jobQueue.offer(jobId);

        return jobId;
    }

    public ThumbnailJob getJobStatus(String jobId) {
        return jobs.get(jobId);
    }

    private void processQueue() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                String jobId = jobQueue.take(); // Blocks until job is ready
                ThumbnailJob job = jobs.get(jobId);
                if (job == null) continue;

                processJob(job);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processJob(ThumbnailJob job) {
        job.setStatus(JobStatus.PROCESSING);

        try {
            for (Resolution res : job.getResolutions()) {
                String resultPath = imageProcessor.process(job.getOriginalImageId(), res);
                job.getGeneratedPaths().put(res.toString(), resultPath);
            }
            job.setStatus(JobStatus.DONE);
            triggerCallbacks(job);
        } catch (Exception e) {
            job.incrementRetries();
            if (job.getRetries() <= job.getMaxRetries()) {
                long backoffDelayMs = (long) Math.pow(2, job.getRetries()) * 100L;
                job.setStatus(JobStatus.PENDING);
                job.setErrorMessage("Retrying. Error: " + e.getMessage());
                
                retryScheduler.schedule(() -> jobQueue.offer(job.getId()), backoffDelayMs, TimeUnit.MILLISECONDS);
            } else {
                job.setStatus(JobStatus.FAILED);
                job.setErrorMessage("Max retries reached. Root cause: " + e.getMessage());
                triggerCallbacks(job);
            }
        }
    }

    private void triggerCallbacks(ThumbnailJob job) {
        List<String> urls = callbacks.get(job.getId());
        if (urls == null) return;
        
        for (String url : urls) {
            System.out.println("NOTIFY CALLBACK URL (" + url + ") -> Job: " + job.getId() + " Status: " + job.getStatus());
        }
    }

    private String calculateIdempotencyKey(String imageId, List<Resolution> resolutions) {
        List<Resolution> sorted = new ArrayList<>(resolutions);
        sorted.sort(Comparator.comparingInt(Resolution::getWidth).thenComparingInt(Resolution::getHeight));
        
        StringBuilder sb = new StringBuilder(imageId);
        for (Resolution res : sorted) {
            sb.append("|").append(res.toString());
        }

        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(sb.toString().getBytes(StandardCharsets.UTF_8));
            StringBuilder hexString = new StringBuilder();
            for (byte b : hash) {
                String hex = Integer.toHexString(0xff & b);
                if (hex.length() == 1) hexString.append('0');
                hexString.append(hex);
            }
            return hexString.toString().substring(0, 16);
        } catch (Exception e) {
            return String.valueOf(sb.toString().hashCode());
        }
    }

    public void shutdown() {
        workerPool.shutdownNow();
        retryScheduler.shutdownNow();
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        System.out.println("=== JAVA THUMBNAIL SERVICE DEMO ===");
        ThumbnailService service = new ThumbnailService(2, new HighQualityImageProcessor());

        List<Resolution> resolutions = Arrays.asList(
            new Resolution(150, 150),
            new Resolution(300, 300),
            new Resolution(600, 600)
        );

        String imageId = "profile_pic_2026";
        String callbackUrl = "https://myapi.com/webhooks/thumbnails";

        System.out.println("Submitting image processing job...");
        String jobId = service.submitJob(imageId, resolutions, callbackUrl);
        System.out.println("Job submitted. Generated Job ID: " + jobId);

        // Retrieve job status
        ThumbnailJob job = service.getJobStatus(jobId);
        System.out.println("Initial Job Status: " + job.getStatus());

        // Wait a bit for processing to progress (mocking asynchronous delay)
        System.out.println("Waiting for asynchronous workers to process...");
        Thread.sleep(1500);

        ThumbnailJob finalJobState = service.getJobStatus(jobId);
        System.out.println("Final Job Status: " + finalJobState.getStatus());
        System.out.println("Generated paths: " + finalJobState.getGeneratedPaths());
        if (finalJobState.getErrorMessage() != null) {
            System.out.println("Errors encountered: " + finalJobState.getErrorMessage());
        }

        // Test idempotency
        System.out.println("Submitting identical job to test idempotency...");
        String duplicateJobId = service.submitJob(imageId, resolutions, callbackUrl);
        System.out.println("Duplicate request Job ID: " + duplicateJobId + " (Matches original: " + duplicateJobId.equals(jobId) + ")");

        service.shutdown();
        System.out.println("=== END OF DEMO ===");
    }
}