Machine Coding Problem

Metrics Aggregator

macoAlldevopsrollup-logicpercentiles
Commonly Asked By:DatadogSplunkDynatrace

Functional Scope (In-Scope)

  • Time-Series Sample Ingestion: Captures high-frequency raw float samples decorated with metadata labels.
  • Tiered Multi-Resolution Aggregators: Computes tiered windowed summaries (1-minute, 5-minute, and 1-hour rolls) using pyramidal consolidation.
  • Approximate Percentiles (Histogram Buckets): Employs boundary arrays and linear interpolation algorithms to resolve p50, p95, and p99 metrics under low memory overhead.
  • Range Query API: Filters, maps, and returns consolidated stats across custom intervals and resolution levels.

Explicit Boundaries (Out-of-Scope)

  • Full Alerting Engine: Excludes threshold evaluation loops and integration webhooks.
  • Cold Data Compression: Skips filesystem serialization and Gorillas/Chimp XOR float delta-compression techniques to prioritize memory structure and interpolation mechanics.

Production reference implementations demonstrating pyramidal aggregation windows, histogram bucketing, and linear percentile interpolation in Java and Python:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MetricSample {
    private final String metricName;
    private final double value;
    private final long timestampMs;
    private final Map<String, String> tags;

    public MetricSample(String metricName, double value, long timestampMs, Map<String, String> tags) {
        this.metricName = metricName;
        this.value = value;
        this.timestampMs = timestampMs;
        this.tags = new HashMap<>(tags);
    }

    public String getMetricName() { return metricName; }
    public double getValue() { return value; }
    public long getTimestampMs() { return timestampMs; }
    public Map<String, String> getTags() { return tags; }
}

class RollupBucket {
    private final long startTimeMs;
    private final long endTimeMs;
    private double min;
    private double max;
    private double sum;
    private long count;
    private final double[] bucketBoundaries;
    private final long[] histogramCounts;

    public RollupBucket(long startTimeMs, long endTimeMs, double[] bucketBoundaries) {
        this.startTimeMs = startTimeMs;
        this.endTimeMs = endTimeMs;
        this.min = Double.MAX_VALUE;
        this.max = Double.MIN_VALUE;
        this.sum = 0.0;
        this.count = 0;
        this.bucketBoundaries = bucketBoundaries.clone();
        this.histogramCounts = new long[bucketBoundaries.length + 1];
    }

    public synchronized void recordValue(double val) {
        this.count++;
        this.sum += val;
        if (val < this.min) this.min = val;
        if (val > this.max) this.max = val;

        int bucketIndex = bucketBoundaries.length;
        for (int i = 0; i < bucketBoundaries.length; i++) {
            if (val <= bucketBoundaries[i]) {
                bucketIndex = i;
                break;
            }
        }
        this.histogramCounts[bucketIndex]++;
    }

    public synchronized void merge(RollupBucket other) {
        if (other == null || other.count == 0) return;
        this.count += other.count;
        this.sum += other.sum;
        this.min = Math.min(this.min, other.min);
        this.max = Math.max(this.max, other.max);
        for (int i = 0; i < this.histogramCounts.length; i++) {
            this.histogramCounts[i] += other.histogramCounts[i];
        }
    }

    public long getStartTimeMs() { return startTimeMs; }
    public long getEndTimeMs() { return endTimeMs; }
    public synchronized double getMin() { return count == 0 ? 0.0 : min; }
    public synchronized double getMax() { return count == 0 ? 0.0 : max; }
    public synchronized double getSum() { return sum; }
    public synchronized long getCount() { return count; }
    public double[] getBucketBoundaries() { return bucketBoundaries; }
    public synchronized long[] getHistogramCounts() { return histogramCounts.clone(); }
}

class PercentileCalculator {
    public static double computePercentile(RollupBucket bucket, double percentile) {
        if (bucket == null || bucket.getCount() == 0) {
            return 0.0;
        }
        if (percentile < 0.0 || percentile > 100.0) {
            throw new IllegalArgumentException("Percentile must be between 0 and 100");
        }

        double target = bucket.getCount() * (percentile / 100.0);
        double[] boundaries = bucket.getBucketBoundaries();
        long[] counts = bucket.getHistogramCounts();

        double cumulativeCount = 0.0;
        double prevBoundary = 0.0;

        for (int i = 0; i < counts.length; i++) {
            double currentBoundary;
            if (i < boundaries.length) {
                currentBoundary = boundaries[i];
            } else {
                currentBoundary = bucket.getMax() * 1.5;
                if (currentBoundary <= prevBoundary) {
                    currentBoundary = prevBoundary + 100.0;
                }
            }

            long bucketCount = counts[i];
            if (cumulativeCount + bucketCount >= target) {
                if (bucketCount == 0) {
                    return prevBoundary;
                }
                double needed = target - cumulativeCount;
                double ratio = needed / bucketCount;
                return prevBoundary + (currentBoundary - prevBoundary) * ratio;
            }

            cumulativeCount += bucketCount;
            prevBoundary = currentBoundary;
        }

        return bucket.getMax();
    }
}

class MetricsAggregator {
    private final ConcurrentHashMap<String, Queue<MetricSample>> rawStores = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, List<RollupBucket>> rollups1m = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, List<RollupBucket>> rollups5m = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, List<RollupBucket>> rollups1h = new ConcurrentHashMap<>();
    private final double[] bucketBoundaries = {10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0};

    public void recordSample(String metricName, double value, Map<String, String> tags) {
        rawStores.computeIfAbsent(metricName, k -> new ConcurrentLinkedQueue<>())
                 .offer(new MetricSample(metricName, value, System.currentTimeMillis(), tags));
    }

    public void runRollup1m(String metricName, long windowStartMs) {
        long windowEndMs = windowStartMs + 60000;
        Queue<MetricSample> queue = rawStores.get(metricName);
        if (queue == null) return;

        RollupBucket bucket = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);

        while (true) {
            MetricSample sample = queue.peek();
            if (sample == null) break;
            if (sample.getTimestampMs() >= windowEndMs) {
                break; // belongs to a future window
            }
            queue.poll();
            if (sample.getTimestampMs() >= windowStartMs) {
                bucket.recordValue(sample.getValue());
            }
        }

        rollups1m.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket);
    }

    public void runRollup5m(String metricName, long windowStartMs) {
        long windowEndMs = windowStartMs + 300000;
        List<RollupBucket> buckets1m = rollups1m.get(metricName);
        if (buckets1m == null) return;

        RollupBucket bucket5m = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);
        for (RollupBucket b1m : buckets1m) {
            if (b1m.getStartTimeMs() >= windowStartMs && b1m.getEndTimeMs() <= windowEndMs) {
                bucket5m.merge(b1m);
            }
        }
        rollups5m.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket5m);
    }

    public void runRollup1h(String metricName, long windowStartMs) {
        long windowEndMs = windowStartMs + 3600000;
        List<RollupBucket> buckets5m = rollups5m.get(metricName);
        if (buckets5m == null) return;

        RollupBucket bucket1h = new RollupBucket(windowStartMs, windowEndMs, bucketBoundaries);
        for (RollupBucket b5m : buckets5m) {
            if (b5m.getStartTimeMs() >= windowStartMs && b5m.getEndTimeMs() <= windowEndMs) {
                bucket1h.merge(b5m);
            }
        }
        rollups1h.computeIfAbsent(metricName, k -> new CopyOnWriteArrayList<>()).add(bucket1h);
    }

    public List<RollupBucket> query(String metricName, long startTimeMs, long endTimeMs, String resolution) {
        List<RollupBucket> source;
        if ("1m".equals(resolution)) {
            source = rollups1m.get(metricName);
        } else if ("5m".equals(resolution)) {
            source = rollups5m.get(metricName);
        } else {
            source = rollups1h.get(metricName);
        }

        if (source == null) return Collections.emptyList();

        List<RollupBucket> result = new ArrayList<>();
        for (RollupBucket b : source) {
            if (b.getStartTimeMs() >= startTimeMs && b.getEndTimeMs() <= endTimeMs) {
                result.add(b);
            }
        }
        return result;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== JAVA METRICS AGGREGATOR SIMULATION ===");
        MetricsAggregator aggregator = new MetricsAggregator();
        Map<String, String> tags = Map.of("env", "prod");

        long startTime = System.currentTimeMillis();

        aggregator.recordSample("api.latency", 120.5, tags);
        aggregator.recordSample("api.latency", 85.0, tags);
        aggregator.recordSample("api.latency", 450.0, tags);
        aggregator.recordSample("api.latency", 12.0, tags);

        aggregator.runRollup1m("api.latency", startTime - 10000);

        List<RollupBucket> buckets = aggregator.query("api.latency", startTime - 20000, startTime + 100000, "1m");
        System.out.println("Query returned buckets: " + buckets.size());
        if (!buckets.isEmpty()) {
            RollupBucket b = buckets.get(0);
            System.out.println("Min: " + b.getMin());
            System.out.println("Max: " + b.getMax());
            System.out.println("Count: " + b.getCount());
            System.out.println("p50: " + PercentileCalculator.computePercentile(b, 50.0));
            System.out.println("p95: " + PercentileCalculator.computePercentile(b, 95.0));
            System.out.println("p99: " + PercentileCalculator.computePercentile(b, 99.0));
        }
        System.out.println("=== END OF JAVA SIMULATION ===");
    }
}