Interview Prompt
Design Distributed Metrics Aggregation System.
Clarifying Questions (ask before designing)
| Question | Why it matters |
|---|---|
| Which of these is highest priority: StatsD-style push model, Pre-aggregation at agents, Rollup storage? | Forces scope negotiation — senior candidates trim before drawing boxes. |
| What scale should we design for — DAU, QPS, data volume? | Drives every capacity decision; shows structured thinking. |
| What are the read vs write patterns on the critical path? | Determines caching, DB choice, and replication topology. |
| What consistency and durability guarantees are required? | Separates strong-consistency paths from eventual ones — a senior differentiator. |
Scope
In scope
- StatsD-style push model
- Pre-aggregation at agents
- Rollup storage
- Percentile computation
- Capacity estimation with shown math
Out of scope (state explicitly)
- Application instrumentation SDK design
- Full distributed tracing system (#33)
- On-call paging and escalation policy (#37)
Assumptions
- Clarify scale (DAU, QPS, data volume) for distributed metrics aggregation in the first 5 minutes
- Standard reliability target 99.9%–99.99% unless problem implies higher (payments, booking)
- Managed cloud services (RDS, S3, Kafka, Redis) are acceptable building blocks
These foundational concepts underpin the patterns used in this problem. Review them before deep-diving into component-level trade-offs.
- Ingest metrics: Receive time-series metrics from thousands of hosts
- Aggregate: Sum, avg, p50/p95/p99, min, max across dimensions
- Downsample: Auto-downsample old data (1s → 1m → 1h → 1d)
- Query: "Average CPU across region=us-east for last 6 hours at 1-minute granularity"
- Alerting integration: Feed metrics to alerting rules
- Dashboarding: Low-latency queries for Grafana-style dashboards
- High Throughput: 10M+ data points/sec ingested
- Low Latency Queries: Dashboard queries in < 500 ms
- Long Retention: Raw data 30 days, downsampled data 2 years
- Scalability: Thousands of metrics × thousands of hosts = billions of time series
- High Availability: 99.99%: metrics drive alerting
- Cardinality Handling: Support high-cardinality labels without explosion
| Metric | Calculation | Value |
|---|---|---|
| Hosts reporting | Given (assumption documented in value) | 500K |
| Metrics per host | Given (assumption documented in value) | 200 |
| Total unique time series | 500K hosts × 200 metrics/host | 100M |
| Data points / sec | Given (10M points/s) | 10M |
| Data point size | Given (assumption documented in value) | 16 bytes (timestamp + value) |
| Ingestion throughput | 10M × 16 bytes = 160 MB/s | 160 MB/s |
| Raw storage / day | 160 MB/s × 86400 ≈ 13.8 TB | 13.8 TB |
| With downsampling | Given | ~500 TB |
Push vs Pull Ingestion
| Model | How | Pros | Cons |
|---|---|---|---|
| Pull (Prometheus) | Central server scrapes targets every 15s | Server controls pace; discovers targets via service discovery | Doesn't scale to millions of targets; need federation |
| Push (StatsD/OTEL) ⭐ | Agents push metrics to gateway | Scales to millions of agents; works for ephemeral workloads | Must handle ingestion spikes; agents need gateway address |
Time-Series Database: Why Specialized?
Regular DB (PostgreSQL): At 10M inserts/sec → PostgreSQL dies Query "avg cpu for last 6 hours across 1000 hosts" → full table scan → minutes Time-Series DB (VictoriaMetrics / InfluxDB / Mimir): 1. High write throughput (append-only, LSM-tree / columnar) 2. Time-range queries (data organized by time) 3. Compression (delta-of-delta + XOR → 10× compression) 4. Downsampling (automatic resolution reduction for old data)
Downsampling Strategy
Resolution tiers: Raw (10s): retained for 30 days 1-min avg: retained for 6 months → 6× reduction 1-hour avg: retained for 2 years → 360× reduction 1-day avg: retained for 5 years → 8,640× reduction Total: ~485 TB (vs 10 PB without downsampling)
Cardinality Explosion: The #1 Operational Problem
Each unique request_id creates a NEW time series. 10K requests/sec × 86400 sec/day = 864M unique time series per day Impact: Ingester OOM, index grows unbounded, query timeout Solutions: 1. Label cardinality limits: Reject if > 10K unique values per label 2. Relabeling at ingestion: Drop or hash high-cardinality labels 3. Active series limit: Max 1M active series per tenant 4. Monitoring: Dashboard showing top 10 metrics by cardinality
Event Bus Design (Kafka)
Topic: distributed_metrics_aggregation-events Partitions: 64 (scale consumers horizontally) Partition key: entity_id (user_id / order_id — preserves per-entity ordering) Retention: 7 days (compliance) or 24h (high-volume telemetry) Replication factor: 3, min.insync.replicas: 2 Producer: idempotent producer enabled (enable.idempotence=true) Consumer: consumer group "distributed_metrics_aggregation-processors" - At-least-once delivery + idempotent handlers (dedup by event_id) - DLQ topic: distributed_metrics_aggregation-events-dlq (poison messages after 3 retries) - Lag alert: consumer lag > 60s → scale workers Design a Distributed Metrics Aggregation System: async side effects MUST NOT block the synchronous API response. Sync path: validate → persist source of truth → publish event → return 201 Async path: consumers update caches, indexes, notifications, aggregates
Write Metrics (Prometheus Remote Write)
POST /api/v1/write
Content-Type: application/x-protobuf
Body: TimeSeries { labels: [{name:"__name__", value:"cpu_usage"}, {name:"host", value:"h1"}], samples: [{timestamp: 1710320000, value: 85.2}] }Query (PromQL)
GET /api/v1/query?query=avg(cpu_usage{region="us-east"})&time=1710320000
GET /api/v1/query_range?query=rate(http_requests_total[5m])&start=1710316400&end=1710320000&step=60Common Error Responses
400 Bad Request: invalid input, missing fields, or malformed JSON 401 Unauthorized: missing or invalid auth token or API key 403 Forbidden: authenticated but insufficient permissions 404 Not Found: resource ID does not exist 409 Conflict: duplicate write or version conflict; retry with idempotency key 422 Unprocessable Entity: valid syntax but invalid business logic 429 Too Many Requests: rate limit exceeded; honor Retry-After header 500 Internal Error: unexpected server fault; retry with idempotency key 503 Service Unavailable: dependency down or overloaded; use exponential backoff 504 Gateway Timeout: index shard slow; narrow query or retry
Time-Series Storage Format
Series: cpu_usage{host="h1", region="us-east", env="prod"}
Stored as:
Series ID: hash(sorted_labels) = 0xABCD1234
Chunk (2-hour block):
Timestamps: delta-of-delta encoded
Values: XOR float encoding
Compression ratio: ~1.5 bytes per data pointLabel Index (Inverted Index)
Label "region=us-east" → Series IDs: [0xABCD1234, 0xDEF56789, ...]
Label "host=h1" → Series IDs: [0xABCD1234, ...]
Query: cpu_usage{region="us-east", host="h1"}
→ Intersect postings lists → Series ID 0xABCD1234
→ Fetch chunks for that series in the queried time range| Concern | Solution |
|---|---|
| Ingestion spike | Kafka buffers; auto-scale ingestion workers |
| TSDB node failure | Replicated storage (Mimir uses object store + replicated ingesters) |
| Data loss | Write-ahead log on ingesters; replay on restart |
| High cardinality | Limit labels (reject metrics with >100K unique series per metric name) |
| Query overload | Query concurrency limits; query timeout (30s max); caching layer |
| Clock skew | Accept data within ±5 minute window; reject future timestamps |
Percentile Calculation at Scale
You CANNOT compute global p99 by averaging p99 of each server! Solutions: 1. Histogram buckets ⭐: Pre-define buckets, count per bucket, merge across servers 2. T-Digest: Streaming algorithm, mergeable, ~1% error 3. DDSketch: Deterministic error bound, constant memory
Write-Ahead Log (WAL): Ingester Crash Recovery
Every sample → appended to WAL (sequential write, negligible latency) On crash: new ingester replays WAL → resumes operation Recovery time: ~30 seconds for 2-hour WAL Data loss: 0
Multi-Tenancy
Per-tenant rate limits: Max samples/sec: 100K, Max active series: 1M Max labels per series: 30, Max label value length: 2048 chars Tenant-aware sharding + query isolation + usage tracking + cost estimation
Interview Walkthrough
- Start with the data model: time-series = metric name + label set + (timestamp, value) — label cardinality is the primary scaling enemy.
- Describe pull-based scraping (Prometheus) vs push-based agents (StatsD/Telegraf) and when each fits operational constraints.
- Route samples through ingesters with a Write-Ahead Log so crash recovery replays uncommitted batches without data loss.
- Enforce per-tenant label limits (max 30 labels, 2048-char values) to prevent cardinality explosions from dynamic user IDs.
- For percentiles, use pre-defined histogram buckets or T-Digest — these structures merge correctly across server boundaries.
- Shard by tenant or hash(metric_name) with dedicated query nodes so one noisy tenant cannot starve others.
- Quantify ingest: 1M samples/sec × 16 bytes ≈ 16 MB/s — compression and downsampling reduce long-term storage by 10×.
- Common pitfall: computing global p99 by averaging per-server p99 values — mathematically wrong and produces misleading SLO dashboards.
VictoriaMetrics vs Prometheus vs InfluxDB vs Mimir
Staff interviews expect you to articulate how the system evolves under real growth — not jump straight to the final architecture.
Phase 1: MVP (0 to 100K users)
Monolith or minimal services proving core distributed metrics aggregation flows. Optimize for shipping speed and correctness over scale.
Key components: Single region · Primary DB + Redis cache · Synchronous core path · Basic monitoring
Move to next phase when: p99 latency exceeds SLO or DB CPU sustained above 70%
Phase 2: Growth (100K to 10M users)
Split read/write paths, introduce async processing for non-critical work, add caching layers and horizontal scaling.
Key components: Read replicas or CQRS · Message queue for async work · CDN / edge caching · Service-level SLOs
Move to next phase when: Hot keys, fan-out bottlenecks, or ops toil from manual scaling
Phase 3: Scale (10M+ users)
Shard data plane, multi-region active-active or active-passive, formal DR runbooks, cost optimization.
Key components: Database sharding / partitioning · Multi-region replication · Auto-scaling + chaos testing · Dedicated platform/SRE ownership
Move to next phase when: Regional failure domain risk, compliance data residency, or linear cost growth unsustainable
SLOs & Error Budgets
| Metric | Target | Rationale |
|---|---|---|
| Core user-facing availability | 99.95% | Budget for planned maintenance + unplanned failures without user-visible outage. |
| p99 latency (critical path) | Problem-specific — state target early and tie to capacity math | Interview credibility comes from connecting SLO to architecture choices. |
| Error rate (5xx) | < 0.1% | Distinguishes transient blips from systemic failure requiring rollback. |
| Data durability | 99.999999999% (11 nines) for committed writes | Define which operations require fsync/quorum vs async replication. |
Incident Scenarios (2am reality)
| Scenario | How you detect | Mitigation |
|---|---|---|
| Primary database unavailable | Health check failures, connection pool exhaustion alerts, elevated 5xx | Failover to replica / promote standby; enable read-only degraded mode if writes impossible; queue writes if async path exists |
| Traffic spike (10× normal) | RPS anomaly alert, autoscaling lag, latency SLO burn rate | Rate limit non-critical endpoints; scale read path horizontally; pre-warm caches; shed load on expensive operations |
| Bad deploy causing elevated errors | Canary metric regression, error budget burn, deployment correlation | Automated rollback within 5 minutes; feature flag kill switch; maintain N-1 compatibility |
Cost Drivers (Staff lens)
- Egress bandwidth and CDN (often dominates media/data-heavy systems)
- Database storage + IOPS at scale (plan compaction, TTL, tiering)
- Compute for async pipelines (right-size workers, spot instances for batch)
- Managed service premiums vs operational headcount trade-off
Multi-Region & DR
Start single-region with cross-AZ redundancy. Add read replicas in secondary region for DR. Move to active-active only when latency SLO or data residency requires it — accept conflict resolution complexity explicitly.