Machine Coding Problem

Log Search (ELK-style)

maco30maco60macoAlldevopsindex-rotationhot-warm
Commonly Asked By:SplunkElasticDatadog

Functional Specifications

  • Time-based Index Rotation: Seal indices and launch fresh segments dynamically as query timelines progress.
  • Hot-Warm-Cold Storage tiers: Move older segments across storage mediums (Fast SSD → HDD → Cloud Object S3 bucket) based on index age.
  • Inverted Index Keyword Search: Parse and tokenize incoming messages to build fast index references for text searching.
  • Unified Query routing: Route query requests only to index segments overlapping with the target range.

Production reference implementations demonstrating index rotations, inverted keyword maps, tier shifts, and query range intersections:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;

enum StorageTier { HOT, WARM, COLD, PURGED }

class LogEvent {
    private final String id;
    private final long timestamp;
    private final String level;
    private final String message;
    private final String service;

    public LogEvent(long timestamp, String level, String message, String service) {
        this.id = UUID.randomUUID().toString();
        this.timestamp = timestamp;
        this.level = level;
        this.message = message;
        this.service = service;
    }

    public String getId() { return id; }
    public long getTimestamp() { return timestamp; }
    public String getLevel() { return level; }
    public String getMessage() { return message; }
    public String getService() { return service; }
}

class IndexSegment {
    private final String id;
    private final long startTime;
    private final long endTime;
    private StorageTier tier = StorageTier.HOT;
    private final List<LogEvent> logs = new CopyOnWriteArrayList<>();
    private final ConcurrentHashMap<String, List<LogEvent>> invertedIndex = new ConcurrentHashMap<>();

    public IndexSegment(String id, long startTime, long endTime) {
        this.id = id;
        this.startTime = startTime;
        this.endTime = endTime;
    }

    public void index(LogEvent log) {
        logs.add(log);
        String[] tokens = log.getMessage().toLowerCase().split("\s+");
        for (String token : tokens) {
            invertedIndex.computeIfAbsent(token, k -> new CopyOnWriteArrayList<>()).add(log);
        }
    }

    public List<LogEvent> search(String keyword) {
        return invertedIndex.getOrDefault(keyword.toLowerCase(), Collections.emptyList());
    }

    public String getId() { return id; }
    public long getStartTime() { return startTime; }
    public long getEndTime() { return endTime; }
    public StorageTier getTier() { return tier; }
    public void setTier(StorageTier tier) { this.tier = tier; }
    public List<LogEvent> getLogs() { return logs; }
    public Map<String, List<LogEvent>> getInvertedIndex() { return invertedIndex; }
}

class LogSearchEngine {
    private final List<IndexSegment> segments = new CopyOnWriteArrayList<>();
    private IndexSegment currentHotSegment;
    private final long rotationIntervalMs;

    public LogSearchEngine(long rotationIntervalMs) {
        this.rotationIntervalMs = rotationIntervalMs;
        rotate(); // Initialize first segment
    }

    public synchronized void ingest(LogEvent log) {
        if (log.getTimestamp() > currentHotSegment.getEndTime()) {
            rotate();
        }
        currentHotSegment.index(log);
    }

    public synchronized void rotate() {
        long now = System.currentTimeMillis();
        String nextId = "idx-" + (segments.size() + 1);
        IndexSegment next = new IndexSegment(nextId, now, now + rotationIntervalMs);
        segments.add(next);
        currentHotSegment = next;
        
        // Migrate older segments across storage tiers
        manageTiers();
    }

    public synchronized void manageTiers() {
        long now = System.currentTimeMillis();
        for (IndexSegment segment : segments) {
            if (segment == currentHotSegment) continue;
            long age = now - segment.getEndTime();

            // Storage tier thresholds based on time age (simulated thresholds)
            if (age > 5000L) {
                segment.setTier(StorageTier.PURGED);
            } else if (age > 3000L) {
                segment.setTier(StorageTier.COLD);
            } else if (age > 1000L) {
                segment.setTier(StorageTier.WARM);
            }
        }
    }

    public List<LogEvent> query(String keyword, long start, long end) {
        List<LogEvent> results = new ArrayList<>();
        for (IndexSegment segment : segments) {
            if (segment.getTier() == StorageTier.PURGED) continue;

            // Target segment only if query range overlaps segment time
            if (segment.getStartTime() <= end && segment.getEndTime() >= start) {
                results.addAll(segment.search(keyword));
            }
        }
        return results;
    }

    public List<IndexSegment> getSegments() {
        return segments;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== ELK-STYLE LOG SEARCH ENGINE SIMULATION ===");
        // Rotation every 1000 milliseconds for fast testing
        LogSearchEngine engine = new LogSearchEngine(1000L);

        // Ingest into first HOT index segment
        System.out.println("Ingesting logs into hot segment...");
        long t1 = System.currentTimeMillis();
        engine.ingest(new LogEvent(t1, "INFO", "gateway server initialized successfully", "gateway"));
        engine.ingest(new LogEvent(t1 + 100, "ERROR", "database connection timeout occurred", "auth-service"));
        
        // Wait to trigger index rotation
        Thread.sleep(1200L);
        System.out.println("\nIngesting logs post-rotation...");
        long t2 = System.currentTimeMillis();
        engine.ingest(new LogEvent(t2, "WARN", "high cpu utilization threshold warning", "checkout-service"));
        engine.ingest(new LogEvent(t2 + 100, "INFO", "user transaction processed successfully", "payment-service"));

        // Wait to trigger tier shifts
        Thread.sleep(2000L);
        System.out.println("\nTriggering explicit segment rotation to process migrations...");
        engine.rotate();

        // Print Segment Tiers
        System.out.println("\n--- Segment Tiers Assessment ---");
        for (IndexSegment seg : engine.getSegments()) {
            System.out.println("Segment " + seg.getId() + " | Tier: " + seg.getTier() + " | Logs count: " + seg.getLogs().size());
        }

        // Full text search querying
        System.out.println("\n--- Querying 'database' over time range ---");
        List<LogEvent> databaseLogs = engine.query("database", t1 - 1000, t2 + 2000);
        for (LogEvent log : databaseLogs) {
            System.out.println("[" + log.getLevel() + "] " + log.getService() + ": " + log.getMessage());
        }

        System.out.println("\n--- Querying 'successfully' over time range ---");
        List<LogEvent> successfulLogs = engine.query("successfully", t1 - 1000, t2 + 2000);
        for (LogEvent log : successfulLogs) {
            System.out.println("[" + log.getLevel() + "] " + log.getService() + ": " + log.getMessage());
        }

        System.out.println("\nSimulation completed successfully.");
    }
}