Machine Coding Problem

Feature Store (ML)

macoAllmlpoint-in-time-correctness
Commonly Asked By:UberFeastTectonAirbnbGoogle

Functional Specifications

  • Low-Latency Online serving: Quick features query interface backed by in-memory stores with under 10ms serving limits.
  • Point-in-Time Correctness (AS-OF Joins): Assemble historical training arrays matched exactly at specific timestamps. Eliminates data leakage traps.
  • Offline Ledger Storage: Maintain deep columnar logs tracking all feature adjustments sequentially.
  • Dynamic Pipeline Registration: Connect processing algorithms with schemas, intervals, and fresh SLA configurations.

Production reference implementations demonstrating online caches, offline time-series records, binary floor searches, and training join checks:

// ─── JAVA BLUEPRINT ──────────────────────────────────────────────────────────
import java.util.*;
import java.util.concurrent.*;

class FeatureDefinition {
    private final String name;
    private final String entityType;
    private final String description;

    public FeatureDefinition(String name, String entityType, String description) {
        this.name = name;
        this.entityType = entityType;
        this.description = description;
    }

    public String getName() { return name; }
    public String getEntityType() { return entityType; }
    public String getDescription() { return description; }
}

class FeatureValue {
    private final Object value;
    private final long timestamp;

    public FeatureValue(Object value, long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    public Object getValue() { return value; }
    public long getTimestamp() { return timestamp; }
}

class FeatureStore {
    private final ConcurrentHashMap<String, FeatureDefinition> registry = new ConcurrentHashMap<>();
    
    // Online Store: entityId -> (featureName -> FeatureValue)
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, FeatureValue>> onlineStore = new ConcurrentHashMap<>();

    // Offline Store: entityId -> (featureName -> TreeMap<Long, Object>) (Timestamps sorted naturally)
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, TreeMap<Long, Object>>> offlineStore = new ConcurrentHashMap<>();

    public void registerFeature(FeatureDefinition definition) {
        registry.put(definition.getName(), definition);
    }

    public void ingestFeature(String entityId, String featureName, Object value, long timestamp) {
        if (!registry.containsKey(featureName)) {
            throw new IllegalArgumentException("Feature not registered: " + featureName);
        }

        // 1. Ingest to Online Store (Always keep the latest timestamp)
        onlineStore.computeIfAbsent(entityId, k -> new ConcurrentHashMap<>())
                   .compute(featureName, (k, existing) -> {
                       if (existing == null || timestamp >= existing.getTimestamp()) {
                           return new FeatureValue(value, timestamp);
                       }
                       return existing;
                   });

        // 2. Ingest to Offline Store (Append to timeseries for bulk analysis & training joins)
        offlineStore.computeIfAbsent(entityId, k -> new ConcurrentHashMap<>())
                    .computeIfAbsent(featureName, k -> new TreeMap<>())
                    .put(timestamp, value);
    }

    public Map<String, Object> getOnlineFeatures(String entityId, List<String> featureNames) {
        Map<String, Object> result = new HashMap<>();
        Map<String, FeatureValue> entityFeatures = onlineStore.get(entityId);
        if (entityFeatures == null) return result;

        for (String name : featureNames) {
            FeatureValue val = entityFeatures.get(name);
            if (val != null) {
                result.put(name, val.getValue());
            }
        }
        return result;
    }

    public Map<String, Object> getHistoricalFeatures(String entityId, List<String> featureNames, long asOfTimestamp) {
        Map<String, Object> result = new HashMap<>();
        ConcurrentHashMap<String, TreeMap<Long, Object>> entityFeatures = offlineStore.get(entityId);
        if (entityFeatures == null) return result;

        for (String name : featureNames) {
            TreeMap<Long, Object> history = entityFeatures.get(name);
            if (history != null) {
                // floorEntry returns the greatest timestamp <= asOfTimestamp (Point-in-time Correct)
                Map.Entry<Long, Object> entry = history.floorEntry(asOfTimestamp);
                if (entry != null) {
                    result.put(name, entry.getValue());
                }
            }
        }
        return result;
    }
}

public class Main {
    public static void main(String[] args) {
        System.out.println("=== ML FEATURE STORE SIMULATION RUNNER ===");
        FeatureStore store = new FeatureStore();

        // Register features
        store.registerFeature(new FeatureDefinition("session_count", "user", "Total sessions of the user"));
        store.registerFeature(new FeatureDefinition("fraud_risk", "user", "ML model calculated fraud probability"));
        System.out.println("Registered features: session_count, fraud_risk");

        // Ingest features over time
        System.out.println("\nIngesting historical feature data...");
        // User 1 updates
        store.ingestFeature("user_123", "session_count", 5, 1000L);
        store.ingestFeature("user_123", "fraud_risk", 0.02, 1000L);

        store.ingestFeature("user_123", "session_count", 6, 2000L);
        store.ingestFeature("user_123", "fraud_risk", 0.15, 2000L);

        store.ingestFeature("user_123", "session_count", 8, 3000L);
        store.ingestFeature("user_123", "fraud_risk", 0.82, 3000L);

        System.out.println("Ingestion complete.");

        // Query Online Features (should be latest values)
        System.out.println("\n--- ONLINE FEATURE SERVING (Low latency) ---");
        List<String> queryFeatures = Arrays.asList("session_count", "fraud_risk");
        Map<String, Object> onlineVals = store.getOnlineFeatures("user_123", queryFeatures);
        System.out.println("Online Features for user_123: " + onlineVals);

        // Query Historical Features (Point-in-Time Correctness / AS-OF Joins)
        System.out.println("\n--- HISTORICAL POINT-IN-TIME RETRIEVAL (AS-OF JOINS) ---");
        
        long[] asOfTimes = { 500L, 1000L, 1500L, 2000L, 2500L, 3500L };
        for (long time : asOfTimes) {
            Map<String, Object> histVals = store.getHistoricalFeatures("user_123", queryFeatures, time);
            System.out.println("AS-OF Timestamp " + time + " -> " + histVals);
        }
        
        System.out.println("\nSimulation completed successfully.");
    }
}