构建支持流批一体的实时特征存储架构决策与实现


一个生产级的机器学习系统,特别是推荐或风控场景,其模型效果高度依赖于特征的质量与时效性。这就引出了一个核心的技术挑战:系统既需要处理用户实时交互产生的流式特征(例如,用户刚刚点击的商品ID),也需要整合每日或每小时计算生成的批量特征(例如,用户过去7天的购买力画像)。试图用单一的架构范式来应对这两种截然不同的数据模式,往往会导致系统复杂性失控或性能瓶颈。

我们面临的正是这样一个问题。最初的设想是构建一个统一的特征摄入管道,但很快就发现,强制将TB级的离线批量数据“伪装”成数据流塞进Kafka,不仅成本高昂,而且整个链路的稳定性也极易受到上游批量任务延迟或失败的冲击。因此,架构决策的焦点转向了如何设计一个能优雅地处理流式(Streaming)和批量(Batch)两种数据源的混合模式特征存储(Feature Store)。

方案A: 统一流式摄入架构

这个方案的核心思想是“万物皆流”。所有特征,无论来源,最终都被转换为事件流,通过统一的消息队列(如Kafka)进入一个流处理引擎(如Flink),再由该引擎写入在线存储(如Redis)。

graph TD
    subgraph "统一流式摄入"
        A[实时事件源, e.g., Clickstream] --> K[Apache Kafka];
        B[离线批量数据, e.g., Hive Table] -- "ETL Job" --> C[数据文件, e.g., Parquet];
        C -- "文件读取器/流化" --> K;
        K --> F[Apache Flink 作业];
        F --> R[Online Store: Redis];
        F --> O[Offline Store: S3/HDFS];
    end

    ML[ML模型服务] --> R;

优势分析:

  1. 单一数据模型: 整个系统的核心是一套流式处理逻辑,理论上简化了数据处理模型。
  2. 低延迟: 一旦数据进入Kafka,理论上可以实现毫秒级的特征更新。

劣势分析:

  1. 批量任务的流式改造复杂度: 将一个庞大的Hive表或Parquet文件集转换为独立的Kafka消息流,本身就是一个复杂的、资源密集型的任务。这个“流化”过程需要专门的调度和错误处理机制,它本身就是一个潜在的故障点。
  2. 资源浪费与成本: 为了处理批量数据导入时产生的流量洪峰,Kafka和Flink集群需要预留大量资源,而这些资源在大部分时间是闲置的。
  3. 强耦合与风险传导: 离线批量任务的失败或延迟会直接污染核心的实时流处理管道。例如,一个失败的批量任务可能需要重跑,这将导致大量重复或乱序的数据涌入Flink,处理逻辑会变得异常复杂。在真实项目中,保证 Exactly-Once 语义的成本极高。
  4. 原子性问题: 批量特征通常涉及一个实体(如用户)的多个特征的同时更新。将其拆分为单条消息流,很难保证这一组特征更新的原子性。

在生产环境中,一个常见的错误是过度追求架构的“理论优雅”而忽略了运维的实际成本。方案A就是一个典型例子,它将两种不同生命周期和SLA要求的数据源强行统一,最终会导致一个脆弱且难以维护的系统。

方案B: 双模混合摄入架构

这个方案承认并接纳了流式和批量数据的本质区别,为它们设计了各自最优的摄入路径。

graph TD
    subgraph "流式摄入路径 (Streaming Ingestion)"
        A[实时事件源] --> K[Apache Kafka];
        K --> F[轻量级流处理器/消费者];
        F -- "gRPC Push" --> S[核心特征服务 Feature Store Service];
    end

    subgraph "批量摄入路径 (Batch Ingestion)"
        B[离线批量数据] -- "Airflow/Scheduler" --> J[Spark/MapReduce Job];
        J --> DF[特征数据文件 Parquet/ORC];
        DF -- "REST API Bulk Load" --> S;
    end
    
    subgraph "核心服务与存储"
        S -- "写入" --> R[Online Store: Redis];
        S -- "写入" --> O[Offline Store: S3];
    end

    ML[ML模型服务] -- "特征读取" --> S;
    UI[管理与监控UI] -- "状态查询" --> S;

优势分析:

  1. 关注点分离: 流式和批量处理路径物理隔离,技术栈可以独立演进。实时路径可以专注于低延迟和高吞吐,使用gRPC等高性能RPC框架。批量路径则可以专注于吞吐量和成本效益,使用简单的RESTful API进行数据交换。
  2. 鲁棒性: 批量任务的失败不会影响实时特征的更新。运维团队可以独立地对批量任务进行重试、回滚等操作,对核心在线服务无感。
  3. 成本效益: 每条路径可以根据其负载特性独立扩展。批量摄入通常在凌晨的低峰期进行,可以利用可抢占的计算资源,而实时路径则需要保证7x24小时的稳定资源。
  4. 原子性保障: 批量更新可以通过单个API请求完成,服务层可以更容易地实现事务性或原子性更新。

劣势分析:

  1. 实现复杂性: 服务层需要同时提供并维护两种不同风格的API(gRPC和REST)。
  2. 一致性窗口: 在批量数据更新期间,一个实体的特征可能短暂地处于“混合状态”(部分新特征,部分旧特征)。这需要在设计上明确最终一致性的策略,例如基于时间戳的合并。

最终决策:
我们选择了方案B。对于一个需要长期稳定运行的生产系统而言,架构的韧性、可维护性和故障域隔离的重要性,远超过单一模型的理论简洁性。方案B允许我们为正确的问题选择正确的工具,这是务实工程的体现。

核心实现概览: Java后端服务

我们的核心特征服务使用Java和Spring Boot构建。它负责接收数据,写入在线和离线存储,并提供给模型服务读取。

1. gRPC接口定义 (用于流式摄入)

我们使用Protocol Buffers来定义服务接口,这确保了强类型和高性能。

feature_store.proto:

syntax = "proto3";

package com.example.featurestore;

option java_multiple_files = true;
option java_package = "com.example.featurestore.grpc";

// 流式特征推送服务
service FeaturePushService {
  // 客户端以流的方式推送特征
  rpc PushFeatures(stream Feature) returns (PushSummary);
}

// 单个特征值
message FeatureValue {
  oneof value_type {
    string string_value = 1;
    int64 int64_value = 2;
    double double_value = 3;
    bool bool_value = 4;
  }
}

// 特征定义
message Feature {
  // 实体ID,例如 user_id, item_id
  string entity_id = 1;

  // 特征集名称,例如 user_realtime_behavior
  string feature_set_name = 2;

  // 特征名 -> 特征值
  map<string, FeatureValue> features = 3;

  // 事件时间戳 (UTC milliseconds)
  int64 event_timestamp = 4;
}

// 推送结果摘要
message PushSummary {
  bool success = 1;
  int64 processed_count = 2;
  string error_message = 3;
}

2. Java gRPC服务实现

这里是FeaturePushService的Java实现。注意它是一个客户端流式RPC,允许客户端在一个连接中持续推送数据,减少了网络开销。

FeaturePushServiceImpl.java:

import com.example.featurestore.grpc.*;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.concurrent.atomic.AtomicLong;

@GrpcService
public class FeaturePushServiceImpl extends FeaturePushServiceGrpc.FeaturePushServiceImplBase {

    private static final Logger log = LoggerFactory.getLogger(FeaturePushServiceImpl.class);

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public StreamObserver<Feature> pushFeatures(StreamObserver<PushSummary> responseObserver) {
        
        return new StreamObserver<Feature>() {
            private final AtomicLong processedCount = new AtomicLong(0);

            @Override
            public void onNext(Feature feature) {
                try {
                    // 在真实项目中,这里会有更复杂的逻辑,
                    // 包括验证、元数据检查、写入离线存储等。
                    processSingleFeature(feature);
                    processedCount.incrementAndGet();
                } catch (Exception e) {
                    log.error("Failed to process feature for entity: {}", feature.getEntityId(), e);
                    // 考虑实现一个死信队列来处理失败的消息
                }
            }

            @Override
            public void onError(Throwable t) {
                log.error("Error during feature push stream", t);
                PushSummary summary = PushSummary.newBuilder()
                        .setSuccess(false)
                        .setErrorMessage(t.getMessage())
                        .setProcessedCount(processedCount.get())
                        .build();
                responseObserver.onNext(summary);
                responseObserver.onCompleted();
            }

            @Override
            public void onCompleted() {
                log.info("Feature push stream completed. Processed {} features.", processedCount.get());
                PushSummary summary = PushSummary.newBuilder()
                        .setSuccess(true)
                        .setProcessedCount(processedCount.get())
                        .build();
                responseObserver.onNext(summary);
                responseObserver.onCompleted();
            }

            private void processSingleFeature(Feature feature) {
                // Redis Key的设计至关重要。例如: fs:{feature_set_name}:{entity_id}
                String redisKey = String.format("fs:%s:%s", feature.getFeatureSetName(), feature.getEntityId());
                
                // 使用Redis Hash来存储一个实体的所有特征
                feature.getFeaturesMap().forEach((name, value) -> {
                    String stringValue = convertFeatureValueToString(value);
                    if (stringValue != null) {
                        redisTemplate.opsForHash().put(redisKey, name, stringValue);
                    }
                });
                // 还可以设置一个合理的过期时间
                // redisTemplate.expire(redisKey, 24, TimeUnit.HOURS);
            }

            private String convertFeatureValueToString(FeatureValue value) {
                switch (value.getValueTypeCase()) {
                    case STRING_VALUE: return value.getStringValue();
                    case INT64_VALUE: return String.valueOf(value.getInt64Value());
                    case DOUBLE_VALUE: return String.valueOf(value.getDoubleValue());
                    case BOOL_VALUE: return String.valueOf(value.getBoolValue());
                    case VALUETYPE_NOT_SET:
                    default:
                        return null;
                }
            }
        };
    }
}

3. REST API实现 (用于批量摄入)

对于批量摄入,我们提供一个接受JSON格式数据的RESTful API。这对于调度系统如Airflow来说非常友好。

BatchIngestionController.java:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.Valid;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/batch-ingest")
public class BatchIngestionController {

    private static final Logger log = LoggerFactory.getLogger(BatchIngestionController.class);

    @Autowired
    private BatchIngestionService batchIngestionService;

    @PostMapping("/{featureSetName}")
    public ResponseEntity<IngestionResponse> ingestBatch(
            @PathVariable String featureSetName,
            @Valid @RequestBody List<BatchFeatureEntity> payload) {
        
        if (payload == null || payload.isEmpty()) {
            return ResponseEntity.badRequest().body(new IngestionResponse("Payload cannot be empty."));
        }

        log.info("Starting batch ingestion for feature set '{}' with {} entities.", featureSetName, payload.size());

        // 使用异步服务来处理,避免长时间阻塞HTTP请求线程
        // 这是生产级服务必须考虑的要点
        batchIngestionService.processBatch(featureSetName, payload);

        String message = String.format("Accepted %d entities for feature set '%s'. Processing is asynchronous.", 
                                       payload.size(), featureSetName);
        return ResponseEntity.accepted().body(new IngestionResponse(message));
    }
}

// DTOs for request and response
class BatchFeatureEntity {
    @NotNull public String entityId;
    @NotEmpty public Map<String, Object> features;
}

class IngestionResponse {
    public String message;
    public IngestionResponse(String message) { this.message = message; }
}

BatchIngestionService.java:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

@Service
public class BatchIngestionService {

    private static final Logger log = LoggerFactory.getLogger(BatchIngestionService.class);

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Async("taskExecutor") // 指定一个专用的线程池
    public void processBatch(String featureSetName, List<BatchFeatureEntity> entities) {
        long startTime = System.currentTimeMillis();
        // 在真实项目中,我们会使用Redis的pipeline来批量执行命令,以获得极高的吞吐量
        redisTemplate.executePipelined(redis.RedisCallback<Object>() {
            @Override
            public Object doInRedis(redis.RedisConnection connection) throws redis.dao.DataAccessException {
                for (BatchFeatureEntity entity : entities) {
                    String redisKey = String.format("fs:%s:%s", featureSetName, entity.entityId);
                    
                    // Pipeline模式下,所有命令先进入缓冲区,然后一次性发送
                    // 注意,需要自己处理序列化
                    connection.hMSet(
                        redisKey.getBytes(), 
                        convertMapToBytesMap(entity.features)
                    );
                }
                return null;
            }
        });
        long duration = System.currentTimeMillis() - startTime;
        log.info("Completed batch ingestion for '{}'. Processed {} entities in {}ms.", featureSetName, entities.size(), duration);
    }
    
    // 省略 map to byte map 的转换辅助函数...
}

React管理UI与React Testing Library深度测试

一个健壮的后端服务需要一个同样健壮的管理界面。这个界面让数据科学家和运维人员能够监控特征集的健康状况、查看最新的批量任务状态等。我们的UI使用React构建。

一个核心组件是 FeatureSetDashboard,它需要同时展示来自流式管道的实时统计(例如,过去一分钟的更新速率)和来自批量管道的历史作业记录。这正是对我们双模架构的直接体现。

FeatureSetDashboard.jsx:

import React, { useState, useEffect } from 'react';
import api from '../services/api'; // 封装的API请求模块

export const FeatureSetDashboard = ({ featureSetName }) => {
    const [stats, setStats] = useState({ updateRate: 0, lastUpdated: null });
    const [batchJobs, setBatchJobs] = useState([]);
    const [loading, setLoading] = useState(true);
    const [error, setError] = useState(null);

    useEffect(() => {
        const fetchData = async () => {
            setLoading(true);
            setError(null);
            try {
                // 并行获取两种数据
                const [statsResponse, jobsResponse] = await Promise.all([
                    api.get(`/stats/realtime/${featureSetName}`),
                    api.get(`/jobs/batch/${featureSetName}?limit=5`)
                ]);

                if (statsResponse.ok) {
                    setStats(statsResponse.data);
                } else {
                    throw new Error(`Failed to fetch realtime stats: ${statsResponse.status}`);
                }

                if (jobsResponse.ok) {
                    setBatchJobs(jobsResponse.data);
                } else {
                    throw new Error(`Failed to fetch batch jobs: ${jobsResponse.status}`);
                }

            } catch (err) {
                setError(err.message);
            } finally {
                setLoading(false);
            }
        };

        fetchData();
        // 在真实应用中,实时统计部分会使用WebSocket或轮询
    }, [featureSetName]);

    if (loading) {
        return <div>Loading dashboard for {featureSetName}...</div>;
    }

    if (error) {
        return <div role="alert">Error: {error}</div>;
    }

    return (
        <div>
            <h2>{featureSetName}</h2>
            <section>
                <h3>Real-time Stats</h3>
                <p>Update Rate: {stats.updateRate.toFixed(2)} features/sec</p>
                <p>Last Update (UTC): {stats.lastUpdated ? new Date(stats.lastUpdated).toISOString() : 'N/A'}</p>
            </section>
            <section>
                <h3>Recent Batch Jobs</h3>
                {batchJobs.length === 0 ? (
                    <p>No recent batch jobs found.</p>
                ) : (
                    <table>
                        <thead>
                            <tr>
                                <th>Job ID</th>
                                <th>Status</th>
                                <th>Completed At</th>
                            </tr>
                        </thead>
                        <tbody>
                            {batchJobs.map(job => (
                                <tr key={job.id}>
                                    <td>{job.id}</td>
                                    <td className={`status-${job.status.toLowerCase()}`}>{job.status}</td>
                                    <td>{new Date(job.completedAt).toISOString()}</td>
                                </tr>
                            ))}
                        </tbody>
                    </table>
                )}
            </section>
        </div>
    );
};

这个组件的逻辑并不简单:它并行发起两个API请求,并管理着加载、成功、失败三种状态。手动测试非常繁琐且容易遗漏边缘情况。这正是 React Testing Library 发挥巨大价值的地方。

FeatureSetDashboard.test.js:

import React from 'react';
import { render, screen, waitFor } from '@testing-library/react';
import { setupServer } from 'msw/node';
import { rest } from 'msw';
import { FeatureSetDashboard } from './FeatureSetDashboard';

// 使用 MSW (Mock Service Worker) 来拦截和模拟API请求
// 这是测试异步数据获取组件的行业标准实践
const server = setupServer(
    rest.get('/api/stats/realtime/:name', (req, res, ctx) => {
        return res(ctx.json({
            updateRate: 125.7,
            lastUpdated: '2023-10-27T10:00:00Z'
        }));
    }),
    rest.get('/api/jobs/batch/:name', (req, res, ctx) => {
        return res(ctx.json([
            { id: 'job-001', status: 'SUCCESS', completedAt: '2023-10-27T04:00:00Z' },
            { id: 'job-002', status: 'FAILED', completedAt: '2023-10-26T04:05:00Z' }
        ]));
    })
);

beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

describe('FeatureSetDashboard', () => {

    test('should display loading state initially', () => {
        render(<FeatureSetDashboard featureSetName="user_profile" />);
        // 验证加载状态是否正确显示
        expect(screen.getByText(/Loading dashboard for user_profile.../i)).toBeInTheDocument();
    });

    test('should render stats and jobs after successful fetch', async () => {
        render(<FeatureSetDashboard featureSetName="user_profile" />);

        // 使用 findBy* 系列查询,它们会等待元素出现
        expect(await screen.findByText(/125.70 features\/sec/i)).toBeInTheDocument();
        expect(screen.getByText('2023-10-27T10:00:00.000Z')).toBeInTheDocument();

        expect(screen.getByRole('cell', { name: 'job-001' })).toBeInTheDocument();
        expect(screen.getByRole('cell', { name: 'SUCCESS' })).toBeInTheDocument();
        expect(screen.getByRole('cell', { name: 'FAILED' })).toBeInTheDocument();
    });

    test('should display a generic error message if both APIs fail', async () => {
        server.use(
            rest.get('/api/stats/realtime/:name', (req, res, ctx) => res(ctx.status(500))),
            rest.get('/api/jobs/batch/:name', (req, res, ctx) => res(ctx.status(500)))
        );

        render(<FeatureSetDashboard featureSetName="user_profile" />);

        const alert = await screen.findByRole('alert');
        expect(alert).toHaveTextContent('Error: Failed to fetch realtime stats: 500');
    });

    test('should display a specific error if only one API fails', async () => {
        server.use(
            rest.get('/api/jobs/batch/:name', (req, res, ctx) => {
                return res(ctx.status(404), ctx.json({ message: 'Not Found' }));
            })
        );
        render(<FeatureSetDashboard featureSetName="user_profile" />);
        
        // 我们期望看到错误信息,即使部分数据加载成功
        // Promise.all 的 fail-fast 特性决定了这一点
        const alert = await screen.findByRole('alert');
        expect(alert).toHaveTextContent('Error: Failed to fetch batch jobs: 404');

        // 同时,成功加载的数据不应该被渲染
        expect(screen.queryByText(/125.70 features\/sec/i)).not.toBeInTheDocument();
    });

    test('should display a message when no batch jobs are returned', async () => {
        server.use(
            rest.get('/api/jobs/batch/:name', (req, res, ctx) => res(ctx.json([])))
        );

        render(<FeatureSetDashboard featureSetName="user_profile" />);

        expect(await screen.findByText(/No recent batch jobs found./i)).toBeInTheDocument();
        // 验证实时数据仍然正常显示
        expect(screen.getByText(/125.70 features\/sec/i)).toBeInTheDocument();
    });
});

这个测试套件覆盖了加载、成功、部分失败、全部失败、空数据等多种情况,确保了UI在各种后端响应下的行为符合预期。这种测试的投入,对于一个作为核心基础设施的管理平台来说,回报是巨大的,它能有效防止前端代码的回归,并让重构变得充满信心。

架构的扩展性与局限性

当前这套双模架构在扩展性上表现良好。当需要支持新的特征集时,无论是流式还是批量,都只需要配置相应的摄入管道,而无需改动核心服务。流式路径可以通过增加Kafka分区和消费者实例来水平扩展,批量路径的吞吐能力则取决于后端异步处理线程池和Redis集群的写入能力。

然而,这个架构也存在一些局限性和未来的演进方向。
首先,特征的读取逻辑目前相对简单。当一个模型需要同时拉取一个实体的流式和批量特征时,服务需要在读取时进行合并。这个合并逻辑如果处理不当,可能会成为性能瓶颈。一个可能的优化是在写入时就进行预合并(write-time merge),但这又会增加写入路径的复杂性。
其次,系统缺乏一个统一的特征元数据注册中心。目前,特征的定义、类型、所有者等信息都是隐式地存在于生产者和消费者的代码中。构建一个元数据中心,实现特征的自动发现、血缘追踪和版本管理,是这个系统从一个“项目”演进为一个“平台”的关键。
最后,数据一致性模型是最终一致性。在批量数据写入的窗口期内,读取到的特征可能不是最新的。对于某些对一致性要求极高的场景(如金融反欺诈),可能需要引入更复杂的机制,如版本号控制或读写锁,但这会牺牲一部分性能和可用性。明确当前架构的适用边界至关重要。


  目录