项目初期,我们面临一个棘手的需求:移动端应用需要根据用户最近输入的文本(例如搜索查询、笔记内容)动态生成个性化推荐。这些推荐依赖于文本的语义向量表示,这意味着我们需要一个可靠的NLP模型。在移动设备上本地运行 bert-base-multilingual-cased
这种级别的模型会直接耗尽设备资源,而传统的同步API调用,等待模型完成推理再返回结果,会造成应用界面出现无法接受的卡顿。问题很明确:我们需要一套系统,能从Dart/Flutter客户端接收文本处理请求,在后端异步地、可靠地完成重量级的NLP特征提取,并将结果持久化以供下游服务使用,同时整个过程必须具备可追踪性和可管理性。
最初的构想很简单,一个API接收请求,放入消息队列,一个后台服务消费并处理。但细节是魔鬼。模型的版本如何管理?处理失败的任务如何重试?整个管道的状态如何监控?特征的生成过程如何与我们的模型实验管理平台打通?这不再是一个简单的API加后台任务,而是一个小型的异步特征工程平台。
经过几轮技术选型,我们最终确定了以下技术栈:
- 任务队列: Celery - Python生态中的事实标准,成熟、稳定,具备完善的重试、定时任务和监控机制。
- NLP模型: Hugging Face Transformers - 提供海量预训练模型,能够快速实现文本向量化。
- 数据存储: MariaDB - 作为我们的在线特征存储。在当前QPS需求下,其稳定性和我们团队的熟悉度超过了引入新NoSQL数据库的必要性。我们将使用其JSON类型来增加一定的灵活性。
- MLOps工具: MLflow - 用于追踪每次特征生成任务所使用的模型、版本和参数,为特征的可复现性与治理提供保障。
- 客户端: Dart - 我们需要为Flutter应用提供一个原生SDK来调用这个管道,因此直接使用Dart构建这个客户端接口层。
这套架构的核心在于解耦:客户端的请求与重量级的计算被Celery彻底分离,保证了客户端的响应速度;特征的生成与存储被清晰地划分,便于维护和扩展。
架构概览
在深入代码之前,整个系统的交互流程可以用下面的图来描述。客户端(Dart)通过一个轻量级的API网关(例如Flask)触发一个任务,网关将任务元数据推送到Celery的Broker(如Redis或RabbitMQ)。Celery Worker从队列中获取任务,执行核心的NLP推理,然后将结果写入MariaDB,并将执行元数据记录到MLflow Tracking Server。
sequenceDiagram participant DartClient as Dart/Flutter App participant APIGateway as API Gateway (Flask) participant CeleryBroker as Celery Broker (Redis) participant CeleryWorker as Celery Worker participant MariaDB as Feature Store (MariaDB) participant MLflow as MLflow Tracking Server DartClient->>+APIGateway: POST /api/v1/generate_feature (userId, text) APIGateway->>+CeleryBroker: publish task: process_text_feature.delay(userId, text) Note over APIGateway: Immediately returns task_id APIGateway-->>-DartClient: { "task_id": "..." } CeleryBroker-->>-CeleryWorker: deliver task CeleryWorker->>+HuggingFace: Download/Load Model HuggingFace-->>-CeleryWorker: Model Object Note over CeleryWorker: Perform NLP Inference CeleryWorker->>+MLflow: Start Run (log model, params) MLflow-->>-CeleryWorker: Run ID CeleryWorker->>+MariaDB: UPSERT user_features (userId, feature_vector) MariaDB-->>-CeleryWorker: Success CeleryWorker->>+MLflow: Log Metric (e.g., duration) & End Run MLflow-->>-CeleryWorker: Success
数据存储层:MariaDB的表结构设计
我们的特征存储设计必须满足快速写入和基于用户ID的快速查询。这里不追求复杂的结构,而是实用至上。我们创建一张表来存储用户特征。
一个常见的错误是为每种特征创建一个新列。这会导致表结构频繁变更,难以维护。更好的方式是使用feature_name
来标识特征,并将特征值存储在一个通用类型的列中。考虑到文本向量通常是高维浮点数组,将其序列化为JSON字符串存入JSON
或TEXT
字段是合理的选择。
-- MariaDB DDL for the user feature store
CREATE TABLE `user_features` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`user_id` VARCHAR(128) NOT NULL COMMENT '用户唯一标识',
`feature_name` VARCHAR(255) NOT NULL COMMENT '特征名称, e.g., search_text_embedding_v1',
`feature_value` JSON NOT NULL COMMENT '特征值, 存储为JSON数组',
`feature_version` VARCHAR(64) NOT NULL COMMENT '生成该特征的模型或代码版本',
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_feature` (`user_id`, `feature_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户实时特征表';
这里的uk_user_feature
唯一索引至关重要,它确保了每个用户的一种特定特征只有一条记录。后续的写入操作将使用INSERT ... ON DUPLICATE KEY UPDATE
(即UPSERT)来保证幂等性,这对于处理可能重试的任务至关重要。
核心处理单元:Celery Worker与Transformers集成
这是系统的核心。Celery Worker是真正执行计算的地方。我们将定义一个Celery任务,它负责加载模型、进行推理、与MLflow交互并最终写入数据库。
项目结构
feature_pipeline/
├── api/
│ └── server.py # Flask API Gateway
├── workers/
│ ├── celery_app.py # Celery App 初始化
│ ├── config.py # 配置 (Redis, MariaDB, MLflow URIs)
│ └── tasks.py # 核心Celery任务定义
├── common/
│ └── db.py # 数据库连接工具
└── requirements.txt
配置 workers/config.py
生产级的应用必须将配置外部化,严禁硬编码。
# workers/config.py
import os
# Celery Broker and Backend Configuration
BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
# Database Configuration
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = int(os.environ.get("DB_PORT", 3306))
DB_USER = os.environ.get("DB_USER", "feature_user")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "secret_password")
DB_NAME = os.environ.get("DB_NAME", "feature_store")
DB_URI = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# MLflow Configuration
MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://127.0.0.1:5000")
MLFLOW_EXPERIMENT_NAME = "realtime_feature_generation"
# Model Configuration
MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
MODEL_VERSION = "1.0.0" # Internal versioning
Celery应用初始化 workers/celery_app.py
# workers/celery_app.py
from celery import Celery
from . import config
app = Celery(
'feature_worker',
broker=config.BROKER_URL,
backend=config.RESULT_BACKEND,
include=['workers.tasks']
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
# 任务失败后自动重试,最多3次,延迟时间指数增长
task_acks_late=True,
task_reject_on_worker_lost=True,
)
if __name__ == '__main__':
app.start()
task_acks_late=True
是一个关键的生产环境配置。它意味着只有在任务执行成功后,Broker才会将消息标记为已处理。如果Worker在处理过程中崩溃,任务会由其他Worker重新执行。
核心任务实现 workers/tasks.py
这部分代码是整个系统的引擎,需要处理模型加载、推理、数据库写入、日志记录和异常。
# workers/tasks.py
import logging
import json
import time
from sqlalchemy import create_engine, text
from sentence_transformers import SentenceTransformer
import mlflow
import torch
from .celery_app import app
from . import config
# --- Setup Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Global objects for connection pooling and model caching ---
# 在Celery Worker的全局作用域中初始化,避免每次任务都重新创建。
# 这是性能优化的关键点。
try:
db_engine = create_engine(config.DB_URI, pool_recycle=3600)
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Worker starting. Using device: {device}")
# sentence-transformers库会自动处理模型的下载和缓存
model = SentenceTransformer(config.MODEL_NAME, device=device)
logger.info(f"Model {config.MODEL_NAME} loaded successfully.")
except Exception as e:
logger.critical(f"Failed to initialize global resources: {e}", exc_info=True)
# 如果关键资源加载失败,后续任务都会失败,应该让容器或进程管理器重启worker。
db_engine = None
model = None
# --- MLflow setup ---
mlflow.set_tracking_uri(config.MLFLOW_TRACKING_URI)
try:
# 如果实验不存在,则创建
experiment = mlflow.get_experiment_by_name(config.MLFLOW_EXPERIMENT_NAME)
if experiment is None:
experiment_id = mlflow.create_experiment(config.MLFLOW_EXPERIMENT_NAME)
else:
experiment_id = experiment.experiment_id
mlflow.set_experiment(experiment_id=experiment_id)
except Exception as e:
logger.error(f"Could not configure MLflow experiment: {e}")
@app.task(
bind=True,
autoretry_for=(Exception,), # 对所有异常都重试
retry_kwargs={'max_retries': 3, 'countdown': 5} # 最多重试3次,间隔5秒
)
def generate_text_embedding_feature(self, user_id: str, text_input: str):
"""
Celery task to generate text embedding and store it.
"""
if not all([user_id, text_input]):
logger.warning("Missing user_id or text_input. Skipping task.")
return {"status": "skipped", "reason": "invalid input"}
if db_engine is None or model is None:
logger.error("Global resources (DB or Model) not initialized. Retrying...")
raise ConnectionError("Worker is not ready, global resources failed to load.")
task_id = self.request.id
logger.info(f"[Task {task_id}] Starting feature generation for user: {user_id}")
with mlflow.start_run(run_name=f"feature_gen_{user_id}_{int(time.time())}"):
mlflow.set_tag("celery_task_id", task_id)
mlflow.set_tag("user_id", user_id)
mlflow.log_param("model_name", config.MODEL_NAME)
mlflow.log_param("text_length", len(text_input))
try:
start_time = time.time()
# 核心推理步骤
embedding = model.encode(text_input, convert_to_tensor=False)
inference_time = time.time() - start_time
mlflow.log_metric("inference_time_seconds", inference_time)
# 将numpy array转换为list以便JSON序列化
feature_vector = embedding.tolist()
# --- Database Operation ---
# 使用 `text` 来防止SQL注入,并使用 `bindparams`
upsert_query = text("""
INSERT INTO user_features (user_id, feature_name, feature_value, feature_version)
VALUES (:user_id, :feature_name, :feature_value, :feature_version)
ON DUPLICATE KEY UPDATE
feature_value = VALUES(feature_value),
feature_version = VALUES(feature_version),
updated_at = CURRENT_TIMESTAMP;
""")
with db_engine.connect() as connection:
connection.execute(upsert_query, {
"user_id": user_id,
"feature_name": "search_text_embedding_v1",
"feature_value": json.dumps(feature_vector),
"feature_version": config.MODEL_VERSION
})
# SQLAlchemy 2.0+ `connection.execute` auto-commits in this context.
# For older versions, you might need `connection.commit()`
logger.info(f"[Task {task_id}] Successfully stored feature for user: {user_id}")
mlflow.set_tag("status", "success")
return {
"status": "success",
"user_id": user_id,
"feature_shape": len(feature_vector)
}
except Exception as e:
logger.error(f"[Task {task_id}] An error occurred for user {user_id}: {e}", exc_info=True)
mlflow.set_tag("status", "failed")
mlflow.log_param("error_message", str(e))
# 异常会被 `autoretry_for` 捕获并触发重试
raise
这段代码体现了几个生产实践:
- 资源初始化: 数据库连接池和模型对象在Worker进程启动时作为全局变量初始化,避免了在每次任务执行时都重复创建这些昂贵的对象。
- 错误处理与重试: Celery的
autoretry_for
装饰器参数能够优雅地处理瞬时故障(如网络抖动、数据库死锁)。 - 可观测性: 详尽的日志记录了任务的生命周期。与MLflow的集成则提供了更高维度的追踪,我们可以清晰地看到每次特征生成是哪个模型、哪个版本、花了多长时间完成的。
- 幂等性: 数据库的UPSERT操作保证了即使任务被重试,数据也只会更新而不会重复插入,保证了数据一致性。
API网关层与Dart客户端SDK
API网关是外部世界与我们后端系统的接口,它必须保持轻量。
Flask API api/server.py
# api/server.py
from flask import Flask, request, jsonify
from celery import Celery
import logging
# 注意: API网关只需要知道如何连接到Broker,它不需要Celery App的完整配置
# 这里硬编码了broker地址,在生产中应从配置读取
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
@app.route('/api/v1/generate_feature', methods=['POST'])
def trigger_feature_generation():
data = request.get_json()
if not data or 'user_id' not in data or 'text' not in data:
return jsonify({"error": "Missing user_id or text"}), 400
user_id = data['user_id']
text = data['text']
try:
# 异步发送任务
# 我们指定任务的名称,这样API和Worker之间只需要共享任务签名,而不是代码
task = celery_app.send_task(
'workers.tasks.generate_text_embedding_feature',
args=[user_id, text]
)
app.logger.info(f"Dispatched task {task.id} for user {user_id}")
return jsonify({"status": "queued", "task_id": task.id}), 202
except Exception as e:
app.logger.error(f"Failed to dispatch task for user {user_id}: {e}")
return jsonify({"error": "Failed to queue task"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
这个API端点非常简单,它的唯一职责就是验证输入、将任务发送到Celery,然后立即返回一个任务ID。202 Accepted
状态码准确地传达了“请求已被接受,但处理尚未完成”的语义。
Dart客户端SDK
最后,我们需要一个Dart端的SDK来封装对这个API的调用,方便Flutter应用集成。
// lib/feature_client.dart
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';
class FeatureGenerationResponse {
final String status;
final String? taskId;
final String? error;
FeatureGenerationResponse({required this.status, this.taskId, this.error});
factory FeatureGenerationResponse.fromJson(Map<String, dynamic> json) {
return FeatureGenerationResponse(
status: json['status'] ?? 'error',
taskId: json['task_id'],
error: json['error'],
);
}
bool get isSuccess => status == 'queued' && taskId != null;
}
class FeaturePipelineClient {
final String _baseUrl;
final http.Client _httpClient;
final Logger _logger = Logger('FeaturePipelineClient');
FeaturePipelineClient({
required String baseUrl,
http.Client? httpClient,
}) : _baseUrl = baseUrl,
_httpClient = httpClient ?? http.Client() {
// Setup logging
Logger.root.level = Level.ALL;
Logger.root.onRecord.listen((record) {
print('${record.level.name}: ${record.time}: ${record.message}');
});
}
/// Triggers the asynchronous generation of a text-based feature for a user.
///
/// This method is non-blocking and returns immediately with a response
/// that includes a task ID if the request was successfully queued.
/// It does not wait for the feature to be computed.
Future<FeatureGenerationResponse> generateTextFeature({
required String userId,
required String text,
Duration timeout = const Duration(seconds: 10),
}) async {
final uri = Uri.parse('$_baseUrl/api/v1/generate_feature');
final headers = {'Content-Type': 'application/json; charset=UTF-8'};
final body = jsonEncode({'user_id': userId, 'text': text});
try {
_logger.info('Requesting feature generation for user $userId...');
final response = await _httpClient
.post(uri, headers: headers, body: body)
.timeout(timeout);
final responseBody = jsonDecode(response.body);
if (response.statusCode == 202) {
_logger.info(
'Task successfully queued. Task ID: ${responseBody['task_id']}');
return FeatureGenerationResponse.fromJson(responseBody);
} else {
_logger.warning(
'API returned non-202 status: ${response.statusCode}. Body: ${response.body}');
return FeatureGenerationResponse.fromJson(responseBody);
}
} catch (e, stackTrace) {
_logger.severe('Failed to send feature generation request.', e, stackTrace);
return FeatureGenerationResponse(
status: 'error',
error: 'Network or client-side error: ${e.toString()}',
);
}
}
void dispose() {
_httpClient.close();
}
}
// --- Example Usage (in a Flutter widget or service) ---
/*
void main() async {
final client = FeaturePipelineClient(baseUrl: 'http://your-api-gateway.com');
final response = await client.generateTextFeature(
userId: 'user-12345',
text: 'How to optimize battery life on my new phone?',
);
if (response.isSuccess) {
print('Feature generation started with task ID: ${response.taskId}');
// The app can now continue without blocking.
// The actual feature will be available in the backend database shortly.
} else {
print('Failed to start feature generation: ${response.error}');
}
client.dispose();
}
*/
这个Dart SDK提供了类型安全的调用方式,并包含了基本的日志和错误处理。在真实的Flutter应用中,这个FeaturePipelineClient
实例会被注入到一个服务定位器或依赖注入框架中进行管理。
方案的局限性与未来迭代方向
这套架构解决了我们最初的核心痛点,但它并非银弹。一个务实的工程师必须清楚其边界。
首先,延迟问题。尽管对客户端是异步的,但从任务触发到特征在MariaDB中可用,仍然存在一个端到端的延迟,这个延迟取决于Celery队列的繁忙程度、Worker的计算能力(CPU vs GPU)以及模型推理的耗时。对于需要亚秒级特征的场景,这套基于任务队列的架构可能就不够了,需要转向基于流处理(如Kafka Streams, Flink)的架构。
其次,Worker的伸缩性。当前模型加载在Worker启动时完成。如果我们需要支持多种不同的NLP模型,每个Worker加载所有模型会消耗大量内存。一种改进是实现一个模型动态加载和卸载的策略,或者为不同类型的模型任务设置专用的Celery队列和Worker池。使用KEDA(Kubernetes-based Event-Driven Autoscaling)可以根据队列长度自动伸缩Celery Worker的Pod数量,这是云原生环境下的标准做法。
最后,数据存储的瓶颈。MariaDB作为在线特征存储,在每秒几千次读写的场景下表现良好。但如果业务发展到需要每秒数十万次查询,MariaDB可能会成为瓶颈。届时,将热点特征缓存到Redis或迁移到专门的在线特征存储(如Feast与Redis/DynamoDB的组合)将是必要的演进路径。
目前这个方案,它以一种成本可控、技术栈成熟且高度可观测的方式,有效地解决了移动端异步执行复杂AI任务的难题。它不是终点,而是一个坚实的起点。