一、问题的根源:被历史数据拖垮的OLTP核心
一个典型的技术困境始于一个成功的业务系统。我们的核心交易系统,构建在 SQL Server 之上,稳定运行多年。但随着业务量指数级增长,Orders
、OrderItems
、TransactionLogs
这类核心表的数据量已经膨胀到数十亿行。带来的直接后果是:在线交易的响应时间(P99)劣化,常规的索引优化、查询重构已收效甚微;数据库维护窗口越来越长,备份和恢复变得异常痛苦;更致命的是,业务分析团队对历史数据的查询需求,几乎能随时将生产环境的 CPU 打满,我们不得不严格限制任何超过3个月数据的深度查询。
问题很明确:海量的“冷”数据与需要极致性能的“热”数据混杂在同一个 OLTP 数据库中,造成了严重的资源竞争和性能瓶颈。我们需要一个健壮的方案,将历史数据无缝、可靠地归档到一个专为分析而设计的存储中,并从主库中安全地清理掉。
二、方案权衡:为何放弃传统ETL与纯CDC
在架构选型阶段,我们评估了两种主流方案。
方案A:传统的批处理ETL
这是最直接的思路。使用 SQL Server Integration Services (SSIS) 或者自研的定时任务脚本,在业务低峰期(通常是深夜)执行。
- 优势:
- 技术成熟,有大量的现成工具。
- 逻辑简单,易于理解和实现。
- 劣势:
- 延迟性高: 只能做到 T+1 的数据同步,无法满足业务近实时分析的需求。
- 冲击性强: 批处理任务在运行时会对源数据库产生巨大的IO和CPU压力,即使在深夜,也可能影响到24/7运行的业务。
- 逻辑僵化: 归档规则(比如“归档90天前已完成且未退款的订单”)一旦硬编码在ETL流程中,任何微小的业务逻辑变更都需要开发、测试、重新部署整个ETL包。在我们的场景中,业务规则的调整相当频繁。
方案B:基于日志的变更数据捕获 (CDC)
使用 Debezium 或类似工具捕获 SQL Server 的事务日志,将数据变更实时流入消息队列(如 Kafka),再由消费者写入目标分析型数据库。
- 优势:
- 近乎实时: 数据延迟可以控制在秒级。
- 对源库影响小: 读取事务日志通常比全表扫描或范围查询的侵入性要小。
- 劣势:
- 架构复杂度剧增: 引入了 Debezium、Kafka Connect、Kafka、消费者等一系列分布式组件,运维成本和系统的潜在故障点都大幅增加。
- 不适用于归档场景: CDC 关注的是“变更”,而我们的核心需求是“归档”。我们只需要移动满足特定条件的存量历史数据,而不是实时捕获新数据的每一次 insert/update。
- 规则引擎缺失: CDC本身不解决复杂的业务归档逻辑。我们仍然需要一个服务来消费消息,并判断这条变更是否满足归档条件,这同样会导致逻辑硬编码。
最终决策:可配置的、并发的微服务归档器
两种方案都无法完美解决我们“动态归档规则”和“系统影响可控”的核心诉셔。因此,我们决定设计一个独立的归档服务。该服务的核心架构决策如下:
技术栈:Elixir/Phoenix。 我们需要一个能够轻松处理高并发I/O任务的平台。归档任务本质上是从一个数据源读取,转换,再写入另一个数据源,这是典型的I/O密集型操作。Erlang BEAM 虚拟机的轻量级进程和抢占式调度模型,加上 Elixir 优雅的并发原语(如
Task
,GenServer
),是构建此类数据管道的理想选择。Phoenix 框架虽然常用于Web开发,但其提供的应用生命周期管理和监督树(Supervision Tree)为我们构建一个健壮的后台服务提供了坚实的基础。规则引擎:嵌入式 Lua。 为了将易变的业务规则与稳定的服务代码解耦,我们决定引入一个脚本引擎。Lua 是一个绝佳的选择:它极其轻量、快速,且非常容易嵌入到其他应用程序中。我们可以将归档逻辑(如表名、时间窗口、过滤条件、目标表结构映射)定义在单独的
.lua
文件中。业务变更时,只需修改并热加载脚本,无需重新编译和部署整个服务。目标存储:列式数据库 (ClickHouse)。 关系型数据库用于分析查询的性能在数据量巨大时会急剧下降。列式存储,如 ClickHouse,通过其数据压缩和面向列的查询执行引擎,为聚合和分析查询提供了数量级的性能提升。这正是我们业务分析团队所需要的。
部署一致性:Packer。 这个架构融合了多种技术:Erlang/Elixir 运行时、Lua 解释器、SQL Server 的
odbc
驱动、ClickHouse 的http
客户端。为了避免环境不一致导致的“在我机器上能跑”的问题,我们使用 Packer 来预先构建一个包含所有依赖的黄金镜像(Golden Image),无论是部署到VM还是容器,都能保证环境的绝对一致性。
三、核心实现概览
下面的架构图展示了整个数据流。
graph TD subgraph "生产环境" SQLServer[(SQL Server
OLTP Database)] end subgraph "归档服务 (Packer 构建的镜像)" direction LR Scheduler[Phoenix Application
定时调度器] --> WorkerPool{工作进程池} WorkerPool --> Fetcher(数据拉取
Task) Fetcher -- 批量数据 --> Transformer(数据转换
GenServer) Transformer -- 调用 --> LuaEngine{Lua 脚本引擎
rules.lua} LuaEngine -- 归档规则 --> Transformer Transformer -- 转换后数据 --> Writer(数据写入
Task) Writer -- HTTP/Batch --> ClickHouse[(ClickHouse
OLAP Database)] end SQLServer -- ODBC --> Fetcher subgraph "运维与配置" Packer(Packer) -- 构建 --> GoldenImage[VM/Container Image] LuaScripts(Lua 归档脚本) -.-> |热加载| LuaEngine end GoldenImage -- 部署 --> Scheduler
1. Packer 镜像构建
我们的第一步是确保一个稳定、可复现的运行环境。Packer 配置文件定义了这一切。
archiver.pkr.hcl
:
// archiver.pkr.hcl
packer {
required_plugins {
amazon = {
version = ">= 1.0.0"
source = "github.com/hashicorp/amazon"
}
}
}
variable "aws_access_key" {
type = string
default = env("AWS_ACCESS_KEY_ID")
}
variable "aws_secret_key" {
type = string
default = env("AWS_SECRET_ACCESS_KEY")
sensitive = true
}
source "amazon-ebs" "ubuntu" {
access_key = var.aws_access_key
secret_key = var.aws_secret_key
region = "us-east-1"
instance_type = "t3.medium"
source_ami_filter {
filters = {
name = "ubuntu/images/hvm-ssd/ubuntu-focal-20.04-amd64-server-*"
root-device-type = "ebs"
virtualization-type = "hvm"
}
most_recent = true
owners = ["099720109477"] # Canonical's owner ID
}
ssh_username = "ubuntu"
ami_name = "archiver-app-{{timestamp}}"
}
build {
name = "archiver-builder"
sources = [
"source.amazon-ebs.ubuntu"
]
provisioner "shell" {
environment_vars = [
"DEBIAN_FRONTEND=noninteractive",
]
inline = [
"echo 'Waiting for cloud-init to finish...'",
"cloud-init status --wait",
"sudo apt-get update",
"sudo apt-get install -y wget gnupg software-properties-common curl unixodbc-dev",
// 安装 Erlang 和 Elixir (使用 asdf 版本管理器)
"sudo apt-get install -y git",
"git clone https://github.com/asdf-vm/asdf.git ~/.asdf --branch v0.10.2",
". $HOME/.asdf/asdf.sh",
"asdf plugin add erlang",
"asdf plugin add elixir",
"asdf install erlang 25.0.4",
"asdf install elixir 1.14.0-otp-25",
"asdf global erlang 25.0.4",
"asdf global elixir 1.14.0-otp-25",
// 安装 Lua 5.4
"sudo apt-get install -y lua5.4 liblua5.4-dev",
// 安装 MS ODBC Driver for SQL Server
"curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -",
"curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list",
"sudo apt-get update",
"sudo ACCEPT_EULA=Y apt-get install -y msodbcsql17",
]
}
// 将编译好的 Phoenix 应用和 Lua 脚本上传到镜像中
provisioner "file" {
source = "../_build/prod/rel/archiver/"
destination = "/opt/archiver"
}
provisioner "file" {
source = "../priv/lua/"
destination = "/opt/archiver/rules/"
}
provisioner "shell" {
inline = [
"sudo chown -R ubuntu:ubuntu /opt/archiver"
]
}
}
这份配置定义了一个完整的构建流程:从未经修改的 Ubuntu 20.04 AMI 开始,依次安装了 Erlang/Elixir、Lua 运行时、SQL Server ODBC 驱动,最后将我们预编译好的 Phoenix 应用和 Lua 脚本复制到镜像的指定位置。每次构建都会产生一个包含了所有依赖的、不可变的 AMI。
2. Phoenix 应用:调度与执行核心
应用的核心是一个监督树,它管理着调度器和动态生成的归档工作进程。
application.ex
:
# lib/archiver/application.ex
defmodule Archiver.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# 数据库连接 Repo
Archiver.Repo,
# 归档任务调度器
{Archiver.Scheduler, []}
]
opts = [strategy: :one_for_one, name: Archiver.Supervisor]
Supervisor.start_link(children, opts)
end
end
调度器 Scheduler
会定期(例如每小时)触发归档流程。它会读取 Lua 配置,为每个需要归档的表启动一个独立的监督进程。
scheduler.ex
:
# lib/archiver/scheduler.ex
defmodule Archiver.Scheduler do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts ++ [name: __MODULE__])
end
@impl true
def init(:ok) do
# 立即执行一次,然后每小时执行一次
schedule_work()
{:ok, %{timer: nil}}
end
@impl true
def handle_info(:work, state) do
Logger.info("Starting archiving cycle...")
# 从 Lua 脚本加载归档任务配置
case Archiver.LuaRules.load_tasks() do
{:ok, tasks} ->
for task <- tasks do
# 为每个任务启动一个独立的、受监控的执行器
Archiver.TaskSupervisor.start_task(task)
end
{:error, reason} ->
Logger.error("Failed to load archiving tasks from Lua: #{reason}")
end
schedule_work()
{:noreply, state}
end
defp schedule_work do
# 在生产环境中,可以增加随机抖动,避免所有实例同时执行
interval = :timer.minutes(60)
Process.send_after(self(), :work, interval)
end
end
真正的核心逻辑在 Archiver.Executor
中,它是一个 GenServer
,负责一个完整表的归档生命周期:分批次拉取数据 -> 调用Lua转换 -> 批量写入ClickHouse -> 删除源数据。
executor.ex
:
# lib/archiver/executor.ex
defmodule Archiver.Executor do
use GenServer
require Logger
alias Archiver.Db.SqlServerFetcher
alias Archiver.Db.ClickHouseWriter
alias Archiver.LuaRules
def start_link(task_config) do
GenServer.start_link(__MODULE__, task_config, [])
end
@impl true
def init(task_config) do
Logger.info("Starting executor for task: #{task_config.name}")
# 立即开始执行
send(self(), :execute_batch)
{:ok, %{task_config: task_config, cursor: nil}}
end
@impl true
def handle_info(:execute_batch, state) do
task_config = state.task_config
# 1. 从 SQL Server 拉取一批数据
{:ok, data, next_cursor} =
SqlServerFetcher.fetch_batch(task_config, state.cursor)
if Enum.any?(data) do
# 2. 调用 Lua 引擎进行数据转换和过滤
{:ok, transformed_data} = LuaRules.transform(task_config.name, data)
# 3. 批量写入 ClickHouse
:ok = ClickHouseWriter.write_batch(task_config, transformed_data)
# 4. 在确认写入成功后,从 SQL Server 删除已归档的数据
primary_keys = Enum.map(data, &Map.get(&1, task_config.primary_key))
:ok = SqlServerFetcher.delete_batch(task_config, primary_keys)
Logger.info("Successfully archived #{Enum.count(data)} records for #{task_config.name}")
# 安排下一批次
send(self(), :execute_batch)
{:noreply, %{state | cursor: next_cursor}}
else
# 没有更多数据,此轮归档结束
Logger.info("Archiving finished for task: #{task_config.name}")
{:stop, :normal, state}
end
rescue
error ->
Logger.error("Archiving batch failed for #{task_config.name}: #{inspect(error)}")
# 简单的重试策略:1分钟后重试
Process.send_after(self(), :execute_batch, :timer.minutes(1))
{:noreply, state}
end
end
这里的错误处理非常关键。在真实的生产项目中,我们会引入更复杂的重试逻辑、指数退避、以及一个死信队列来处理无法归档的数据,而不是简单地停止。delete_batch
必须在 write_batch
成功后才执行,以保证至少一次(at-least-once)的交付语义。
3. Lua 规则引擎:动态的业务逻辑
这是解耦的关键。我们使用 luerl
这个纯 Elixir 实现的 Lua 解释器库。
lua_rules.ex
:
# lib/archiver/lua_rules.ex
defmodule Archiver.LuaRules do
@rules_path "/opt/archiver/rules/tasks.lua" # 从配置中读取
# 加载并解析任务定义
def load_tasks do
with {:ok, script} <- File.read(@rules_path),
{:ok, lua_state} <- :luerl.new(),
{:ok, [tasks_table], lua_state_after} <- :luerl.do(script, lua_state) do
tasks = :luerl.decode(tasks_table) |> Enum.map(&to_atom_keys/1)
{:ok, tasks}
else
error -> {:error, error}
end
end
# 调用 Lua 的 transform 函数
def transform(task_name, data) do
# 在生产环境中,我们会缓存编译后的 Lua 脚本以提高性能
with {:ok, script} <- File.read(@rules_path),
{:ok, lua_state} <- :luerl.new(),
# 先执行脚本,加载函数定义
{:ok, _, lua_state_with_funcs} <- :luerl.do(script, lua_state) do
# 调用全局的 transform 函数
# 参数: 任务名,数据列表
encoded_data = :luerl.encode(data)
call = 'transform("#{task_name}", data)'
# 绑定 Elixir 数据到 Lua 环境
:luerl.set_global('data', encoded_data, lua_state_with_funcs)
case :luerl.do(call, lua_state_with_funcs) do
{:ok, [result], _} ->
{:ok, :luerl.decode(result)}
error ->
{:error, "Lua transform execution failed: #{inspect(error)}"}
end
end
end
# 辅助函数,将字符串key转为atom key
defp to_atom_keys(map) when is_map(map) do
for {k, v} <- map, into: %{}, do: {String.to_atom(k), v}
end
end
而对应的 Lua 脚本则清晰地定义了业务逻辑。
tasks.lua
:
-- /opt/archiver/rules/tasks.lua
-- 定义所有归档任务
local tasks = {
-- 任务1: 归档订单表
orders = {
name = "orders",
source_table = "Orders",
target_table = "archived_orders",
primary_key = "OrderID",
timestamp_column = "OrderDate",
-- 归档90天前的数据
retention_days = 90,
batch_size = 5000,
-- 定义源表到目标表的字段映射
schema_mapping = {
OrderID = "id",
CustomerID = "customer_id",
OrderDate = "order_date",
TotalAmount = "total_amount",
Status = "status"
}
},
-- 任务2: 归档日志表
transaction_logs = {
name = "transaction_logs",
source_table = "TransactionLogs",
target_table = "archived_transaction_logs",
primary_key = "LogID",
timestamp_column = "LogTimestamp",
retention_days = 30,
batch_size = 10000,
schema_mapping = {
LogID = "id",
TransactionID = "tx_id",
LogTimestamp = "log_ts",
Message = "message",
Severity = "severity"
}
}
}
-- 全局的转换函数
-- Elixir 将会调用此函数处理每个批次的数据
-- @param task_name: 任务名称, e.g., "orders"
-- @param data_batch: 从 SQL Server 拉取的数据列表,每个元素是一个 map
-- @return 转换后的数据列表
function transform(task_name, data_batch)
local task_config = tasks[task_name]
if not task_config then
-- 在实际代码中,这里应该返回错误而不是空表
return {}
end
local transformed_batch = {}
local mapping = task_config.schema_mapping
for i, row in ipairs(data_batch) do
local new_row = {}
-- 1. 字段重命名
for source_field, target_field in pairs(mapping) do
new_row[target_field] = row[source_field]
end
-- 2. 数据清洗或转换逻辑
if task_name == "orders" then
-- 示例:将状态码转换为可读字符串
local status_map = { [1] = "Pending", [2] = "Completed", [3] = "Cancelled" }
new_row["status"] = status_map[new_row["status"]] or "Unknown"
end
if task_name == "transaction_logs" then
-- 示例:只归档严重级别大于 3 的日志
if row["Severity"] <= 3 then
-- 返回 nil 来过滤掉这条记录
new_row = nil
end
end
if new_row ~= nil then
table.insert(transformed_batch, new_row)
end
end
return transformed_batch
end
-- 返回任务定义,供 Elixir 初始化时加载
return tasks
现在,如果业务需要将订单的保留期从90天改为120天,或者增加一个新的过滤条件,只需修改这个Lua文件并让服务热加载即可,主应用代码无需任何变更。
四、架构的扩展性与局限性
这个架构并非银弹,它有明确的适用边界和需要注意的权衡。
扩展性:
- 垂直扩展: BEAM 虚拟机能极好地利用多核CPU。增加归档服务的CPU和内存,可以直接提升单个节点的处理能力。
- 水平扩展: 可以部署多个归档服务实例。但需要引入分布式锁或任务分片机制,以避免多个实例同时处理同一批数据。例如,可以使用 Redis 或数据库来实现一个简单的租约锁。
- 规则扩展: Lua 脚本的灵活性是无限的。可以轻松添加新的归档任务,或者实现更复杂的、跨越多行数据的转换逻辑。
局限性与潜在风险:
- 非严格实时: 这是一个近实时的批处理系统。它的延迟取决于调度周期和批处理的大小,无法做到像CDC那样的秒级延迟。
- 数据一致性: 在“写入ClickHouse”和“删除SQL Server数据”这两个步骤之间存在一个时间窗口。如果服务在写入后、删除前崩溃,那么下一次运行时会重复处理这批数据。这要求写入ClickHouse的操作是幂等的。我们通常通过在目标表上设置唯一键(如果业务允许)或者在加载逻辑中处理重复来解决。
- Lua脚本的安全性与性能: 允许动态加载脚本带来灵活性的同时,也引入了风险。必须确保Lua运行在一个严格的沙箱环境中,限制其访问文件系统、网络等系统资源。性能不佳的Lua脚本(例如,包含死循环或高复杂度算法)可能会阻塞Elixir的调度器,影响整个应用的性能。在生产环境中,需要对Lua脚本的执行设置超时(timeout)。
- 对源数据库的压力: 虽然是分批次拉取,但高频度的归档仍然会对源OLTP数据库产生压力。查询语句必须经过精心设计,确保能高效地利用索引(通常是在时间戳和主键上的复合索引),避免全表扫描。在真实的生产系统中,我们会监控查询性能,并实现动态调整批处理大小和拉取频率的“背压”机制。