构建连接 SQL Server 与列式存储的动态数据归档管道


一、问题的根源:被历史数据拖垮的OLTP核心

一个典型的技术困境始于一个成功的业务系统。我们的核心交易系统,构建在 SQL Server 之上,稳定运行多年。但随着业务量指数级增长,OrdersOrderItemsTransactionLogs 这类核心表的数据量已经膨胀到数十亿行。带来的直接后果是:在线交易的响应时间(P99)劣化,常规的索引优化、查询重构已收效甚微;数据库维护窗口越来越长,备份和恢复变得异常痛苦;更致命的是,业务分析团队对历史数据的查询需求,几乎能随时将生产环境的 CPU 打满,我们不得不严格限制任何超过3个月数据的深度查询。

问题很明确:海量的“冷”数据与需要极致性能的“热”数据混杂在同一个 OLTP 数据库中,造成了严重的资源竞争和性能瓶颈。我们需要一个健壮的方案,将历史数据无缝、可靠地归档到一个专为分析而设计的存储中,并从主库中安全地清理掉。

二、方案权衡:为何放弃传统ETL与纯CDC

在架构选型阶段,我们评估了两种主流方案。

方案A:传统的批处理ETL

这是最直接的思路。使用 SQL Server Integration Services (SSIS) 或者自研的定时任务脚本,在业务低峰期(通常是深夜)执行。

  • 优势:
    • 技术成熟,有大量的现成工具。
    • 逻辑简单,易于理解和实现。
  • 劣势:
    • 延迟性高: 只能做到 T+1 的数据同步,无法满足业务近实时分析的需求。
    • 冲击性强: 批处理任务在运行时会对源数据库产生巨大的IO和CPU压力,即使在深夜,也可能影响到24/7运行的业务。
    • 逻辑僵化: 归档规则(比如“归档90天前已完成且未退款的订单”)一旦硬编码在ETL流程中,任何微小的业务逻辑变更都需要开发、测试、重新部署整个ETL包。在我们的场景中,业务规则的调整相当频繁。

方案B:基于日志的变更数据捕获 (CDC)

使用 Debezium 或类似工具捕获 SQL Server 的事务日志,将数据变更实时流入消息队列(如 Kafka),再由消费者写入目标分析型数据库。

  • 优势:
    • 近乎实时: 数据延迟可以控制在秒级。
    • 对源库影响小: 读取事务日志通常比全表扫描或范围查询的侵入性要小。
  • 劣势:
    • 架构复杂度剧增: 引入了 Debezium、Kafka Connect、Kafka、消费者等一系列分布式组件,运维成本和系统的潜在故障点都大幅增加。
    • 不适用于归档场景: CDC 关注的是“变更”,而我们的核心需求是“归档”。我们只需要移动满足特定条件的存量历史数据,而不是实时捕获新数据的每一次 insert/update。
    • 规则引擎缺失: CDC本身不解决复杂的业务归档逻辑。我们仍然需要一个服务来消费消息,并判断这条变更是否满足归档条件,这同样会导致逻辑硬编码。

最终决策:可配置的、并发的微服务归档器

两种方案都无法完美解决我们“动态归档规则”和“系统影响可控”的核心诉셔。因此,我们决定设计一个独立的归档服务。该服务的核心架构决策如下:

  1. 技术栈:Elixir/Phoenix。 我们需要一个能够轻松处理高并发I/O任务的平台。归档任务本质上是从一个数据源读取,转换,再写入另一个数据源,这是典型的I/O密集型操作。Erlang BEAM 虚拟机的轻量级进程和抢占式调度模型,加上 Elixir 优雅的并发原语(如Task, GenServer),是构建此类数据管道的理想选择。Phoenix 框架虽然常用于Web开发,但其提供的应用生命周期管理和监督树(Supervision Tree)为我们构建一个健壮的后台服务提供了坚实的基础。

  2. 规则引擎:嵌入式 Lua。 为了将易变的业务规则与稳定的服务代码解耦,我们决定引入一个脚本引擎。Lua 是一个绝佳的选择:它极其轻量、快速,且非常容易嵌入到其他应用程序中。我们可以将归档逻辑(如表名、时间窗口、过滤条件、目标表结构映射)定义在单独的 .lua 文件中。业务变更时,只需修改并热加载脚本,无需重新编译和部署整个服务。

  3. 目标存储:列式数据库 (ClickHouse)。 关系型数据库用于分析查询的性能在数据量巨大时会急剧下降。列式存储,如 ClickHouse,通过其数据压缩和面向列的查询执行引擎,为聚合和分析查询提供了数量级的性能提升。这正是我们业务分析团队所需要的。

  4. 部署一致性:Packer。 这个架构融合了多种技术:Erlang/Elixir 运行时、Lua 解释器、SQL Server 的 odbc 驱动、ClickHouse 的 http 客户端。为了避免环境不一致导致的“在我机器上能跑”的问题,我们使用 Packer 来预先构建一个包含所有依赖的黄金镜像(Golden Image),无论是部署到VM还是容器,都能保证环境的绝对一致性。

三、核心实现概览

下面的架构图展示了整个数据流。

graph TD
    subgraph "生产环境"
        SQLServer[(SQL Server
OLTP Database)] end subgraph "归档服务 (Packer 构建的镜像)" direction LR Scheduler[Phoenix Application
定时调度器] --> WorkerPool{工作进程池} WorkerPool --> Fetcher(数据拉取
Task) Fetcher -- 批量数据 --> Transformer(数据转换
GenServer) Transformer -- 调用 --> LuaEngine{Lua 脚本引擎
rules.lua} LuaEngine -- 归档规则 --> Transformer Transformer -- 转换后数据 --> Writer(数据写入
Task) Writer -- HTTP/Batch --> ClickHouse[(ClickHouse
OLAP Database)] end SQLServer -- ODBC --> Fetcher subgraph "运维与配置" Packer(Packer) -- 构建 --> GoldenImage[VM/Container Image] LuaScripts(Lua 归档脚本) -.-> |热加载| LuaEngine end GoldenImage -- 部署 --> Scheduler

1. Packer 镜像构建

我们的第一步是确保一个稳定、可复现的运行环境。Packer 配置文件定义了这一切。

archiver.pkr.hcl:

// archiver.pkr.hcl
packer {
  required_plugins {
    amazon = {
      version = ">= 1.0.0"
      source  = "github.com/hashicorp/amazon"
    }
  }
}

variable "aws_access_key" {
  type    = string
  default = env("AWS_ACCESS_KEY_ID")
}

variable "aws_secret_key" {
  type      = string
  default   = env("AWS_SECRET_ACCESS_KEY")
  sensitive = true
}

source "amazon-ebs" "ubuntu" {
  access_key    = var.aws_access_key
  secret_key    = var.aws_secret_key
  region        = "us-east-1"
  instance_type = "t3.medium"
  source_ami_filter {
    filters = {
      name                = "ubuntu/images/hvm-ssd/ubuntu-focal-20.04-amd64-server-*"
      root-device-type    = "ebs"
      virtualization-type = "hvm"
    }
    most_recent = true
    owners      = ["099720109477"] # Canonical's owner ID
  }
  ssh_username = "ubuntu"
  ami_name     = "archiver-app-{{timestamp}}"
}

build {
  name = "archiver-builder"
  sources = [
    "source.amazon-ebs.ubuntu"
  ]

  provisioner "shell" {
    environment_vars = [
      "DEBIAN_FRONTEND=noninteractive",
    ]
    inline = [
      "echo 'Waiting for cloud-init to finish...'",
      "cloud-init status --wait",
      "sudo apt-get update",
      "sudo apt-get install -y wget gnupg software-properties-common curl unixodbc-dev",

      // 安装 Erlang 和 Elixir (使用 asdf 版本管理器)
      "sudo apt-get install -y git",
      "git clone https://github.com/asdf-vm/asdf.git ~/.asdf --branch v0.10.2",
      ". $HOME/.asdf/asdf.sh",
      "asdf plugin add erlang",
      "asdf plugin add elixir",
      "asdf install erlang 25.0.4",
      "asdf install elixir 1.14.0-otp-25",
      "asdf global erlang 25.0.4",
      "asdf global elixir 1.14.0-otp-25",

      // 安装 Lua 5.4
      "sudo apt-get install -y lua5.4 liblua5.4-dev",

      // 安装 MS ODBC Driver for SQL Server
      "curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -",
      "curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list",
      "sudo apt-get update",
      "sudo ACCEPT_EULA=Y apt-get install -y msodbcsql17",
    ]
  }

  // 将编译好的 Phoenix 应用和 Lua 脚本上传到镜像中
  provisioner "file" {
    source      = "../_build/prod/rel/archiver/"
    destination = "/opt/archiver"
  }

  provisioner "file" {
    source      = "../priv/lua/"
    destination = "/opt/archiver/rules/"
  }

  provisioner "shell" {
    inline = [
      "sudo chown -R ubuntu:ubuntu /opt/archiver"
    ]
  }
}

这份配置定义了一个完整的构建流程:从未经修改的 Ubuntu 20.04 AMI 开始,依次安装了 Erlang/Elixir、Lua 运行时、SQL Server ODBC 驱动,最后将我们预编译好的 Phoenix 应用和 Lua 脚本复制到镜像的指定位置。每次构建都会产生一个包含了所有依赖的、不可变的 AMI。

2. Phoenix 应用:调度与执行核心

应用的核心是一个监督树,它管理着调度器和动态生成的归档工作进程。

application.ex:

# lib/archiver/application.ex
defmodule Archiver.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # 数据库连接 Repo
      Archiver.Repo,
      # 归档任务调度器
      {Archiver.Scheduler, []}
    ]

    opts = [strategy: :one_for_one, name: Archiver.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

调度器 Scheduler 会定期(例如每小时)触发归档流程。它会读取 Lua 配置,为每个需要归档的表启动一个独立的监督进程。

scheduler.ex:

# lib/archiver/scheduler.ex
defmodule Archiver.Scheduler do
  use GenServer
  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts ++ [name: __MODULE__])
  end

  @impl true
  def init(:ok) do
    # 立即执行一次,然后每小时执行一次
    schedule_work()
    {:ok, %{timer: nil}}
  end

  @impl true
  def handle_info(:work, state) do
    Logger.info("Starting archiving cycle...")
    
    # 从 Lua 脚本加载归档任务配置
    case Archiver.LuaRules.load_tasks() do
      {:ok, tasks} ->
        for task <- tasks do
          # 为每个任务启动一个独立的、受监控的执行器
          Archiver.TaskSupervisor.start_task(task)
        end
      {:error, reason} ->
        Logger.error("Failed to load archiving tasks from Lua: #{reason}")
    end

    schedule_work()
    {:noreply, state}
  end

  defp schedule_work do
    # 在生产环境中,可以增加随机抖动,避免所有实例同时执行
    interval = :timer.minutes(60)
    Process.send_after(self(), :work, interval)
  end
end

真正的核心逻辑在 Archiver.Executor 中,它是一个 GenServer,负责一个完整表的归档生命周期:分批次拉取数据 -> 调用Lua转换 -> 批量写入ClickHouse -> 删除源数据。

executor.ex:

# lib/archiver/executor.ex
defmodule Archiver.Executor do
  use GenServer
  require Logger

  alias Archiver.Db.SqlServerFetcher
  alias Archiver.Db.ClickHouseWriter
  alias Archiver.LuaRules

  def start_link(task_config) do
    GenServer.start_link(__MODULE__, task_config, [])
  end

  @impl true
  def init(task_config) do
    Logger.info("Starting executor for task: #{task_config.name}")
    # 立即开始执行
    send(self(), :execute_batch)
    {:ok, %{task_config: task_config, cursor: nil}}
  end

  @impl true
  def handle_info(:execute_batch, state) do
    task_config = state.task_config
    
    # 1. 从 SQL Server 拉取一批数据
    {:ok, data, next_cursor} = 
      SqlServerFetcher.fetch_batch(task_config, state.cursor)
    
    if Enum.any?(data) do
      # 2. 调用 Lua 引擎进行数据转换和过滤
      {:ok, transformed_data} = LuaRules.transform(task_config.name, data)

      # 3. 批量写入 ClickHouse
      :ok = ClickHouseWriter.write_batch(task_config, transformed_data)

      # 4. 在确认写入成功后,从 SQL Server 删除已归档的数据
      primary_keys = Enum.map(data, &Map.get(&1, task_config.primary_key))
      :ok = SqlServerFetcher.delete_batch(task_config, primary_keys)
      
      Logger.info("Successfully archived #{Enum.count(data)} records for #{task_config.name}")

      # 安排下一批次
      send(self(), :execute_batch)
      {:noreply, %{state | cursor: next_cursor}}
    else
      # 没有更多数据,此轮归档结束
      Logger.info("Archiving finished for task: #{task_config.name}")
      {:stop, :normal, state}
    end
  rescue
    error ->
      Logger.error("Archiving batch failed for #{task_config.name}: #{inspect(error)}")
      # 简单的重试策略:1分钟后重试
      Process.send_after(self(), :execute_batch, :timer.minutes(1))
      {:noreply, state}
  end
end

这里的错误处理非常关键。在真实的生产项目中,我们会引入更复杂的重试逻辑、指数退避、以及一个死信队列来处理无法归档的数据,而不是简单地停止。delete_batch 必须在 write_batch 成功后才执行,以保证至少一次(at-least-once)的交付语义。

3. Lua 规则引擎:动态的业务逻辑

这是解耦的关键。我们使用 luerl 这个纯 Elixir 实现的 Lua 解释器库。

lua_rules.ex:

# lib/archiver/lua_rules.ex
defmodule Archiver.LuaRules do
  @rules_path "/opt/archiver/rules/tasks.lua" # 从配置中读取

  # 加载并解析任务定义
  def load_tasks do
    with {:ok, script} <- File.read(@rules_path),
         {:ok, lua_state} <- :luerl.new(),
         {:ok, [tasks_table], lua_state_after} <- :luerl.do(script, lua_state) do
      tasks = :luerl.decode(tasks_table) |> Enum.map(&to_atom_keys/1)
      {:ok, tasks}
    else
      error -> {:error, error}
    end
  end

  # 调用 Lua 的 transform 函数
  def transform(task_name, data) do
    # 在生产环境中,我们会缓存编译后的 Lua 脚本以提高性能
    with {:ok, script} <- File.read(@rules_path),
         {:ok, lua_state} <- :luerl.new(),
         # 先执行脚本,加载函数定义
         {:ok, _, lua_state_with_funcs} <- :luerl.do(script, lua_state) do
      
      # 调用全局的 transform 函数
      # 参数: 任务名,数据列表
      encoded_data = :luerl.encode(data)
      call = 'transform("#{task_name}", data)'
      
      # 绑定 Elixir 数据到 Lua 环境
      :luerl.set_global('data', encoded_data, lua_state_with_funcs)
      
      case :luerl.do(call, lua_state_with_funcs) do
        {:ok, [result], _} ->
          {:ok, :luerl.decode(result)}
        error ->
          {:error, "Lua transform execution failed: #{inspect(error)}"}
      end
    end
  end
  
  # 辅助函数,将字符串key转为atom key
  defp to_atom_keys(map) when is_map(map) do
    for {k, v} <- map, into: %{}, do: {String.to_atom(k), v}
  end
end

而对应的 Lua 脚本则清晰地定义了业务逻辑。

tasks.lua:

-- /opt/archiver/rules/tasks.lua

-- 定义所有归档任务
local tasks = {
    -- 任务1: 归档订单表
    orders = {
        name = "orders",
        source_table = "Orders",
        target_table = "archived_orders",
        primary_key = "OrderID",
        timestamp_column = "OrderDate",
        -- 归档90天前的数据
        retention_days = 90,
        batch_size = 5000,
        -- 定义源表到目标表的字段映射
        schema_mapping = {
            OrderID = "id",
            CustomerID = "customer_id",
            OrderDate = "order_date",
            TotalAmount = "total_amount",
            Status = "status"
        }
    },
    -- 任务2: 归档日志表
    transaction_logs = {
        name = "transaction_logs",
        source_table = "TransactionLogs",
        target_table = "archived_transaction_logs",
        primary_key = "LogID",
        timestamp_column = "LogTimestamp",
        retention_days = 30,
        batch_size = 10000,
        schema_mapping = {
            LogID = "id",
            TransactionID = "tx_id",
            LogTimestamp = "log_ts",
            Message = "message",
            Severity = "severity"
        }
    }
}

-- 全局的转换函数
-- Elixir 将会调用此函数处理每个批次的数据
-- @param task_name: 任务名称, e.g., "orders"
-- @param data_batch: 从 SQL Server 拉取的数据列表,每个元素是一个 map
-- @return 转换后的数据列表
function transform(task_name, data_batch)
    local task_config = tasks[task_name]
    if not task_config then
        -- 在实际代码中,这里应该返回错误而不是空表
        return {}
    end

    local transformed_batch = {}
    local mapping = task_config.schema_mapping

    for i, row in ipairs(data_batch) do
        local new_row = {}
        -- 1. 字段重命名
        for source_field, target_field in pairs(mapping) do
            new_row[target_field] = row[source_field]
        end

        -- 2. 数据清洗或转换逻辑
        if task_name == "orders" then
            -- 示例:将状态码转换为可读字符串
            local status_map = { [1] = "Pending", [2] = "Completed", [3] = "Cancelled" }
            new_row["status"] = status_map[new_row["status"]] or "Unknown"
        end

        if task_name == "transaction_logs" then
            -- 示例:只归档严重级别大于 3 的日志
            if row["Severity"] <= 3 then
                -- 返回 nil 来过滤掉这条记录
                new_row = nil 
            end
        end
        
        if new_row ~= nil then
          table.insert(transformed_batch, new_row)
        end
    end

    return transformed_batch
end

-- 返回任务定义,供 Elixir 初始化时加载
return tasks

现在,如果业务需要将订单的保留期从90天改为120天,或者增加一个新的过滤条件,只需修改这个Lua文件并让服务热加载即可,主应用代码无需任何变更。

四、架构的扩展性与局限性

这个架构并非银弹,它有明确的适用边界和需要注意的权衡。

扩展性:

  • 垂直扩展: BEAM 虚拟机能极好地利用多核CPU。增加归档服务的CPU和内存,可以直接提升单个节点的处理能力。
  • 水平扩展: 可以部署多个归档服务实例。但需要引入分布式锁或任务分片机制,以避免多个实例同时处理同一批数据。例如,可以使用 Redis 或数据库来实现一个简单的租约锁。
  • 规则扩展: Lua 脚本的灵活性是无限的。可以轻松添加新的归档任务,或者实现更复杂的、跨越多行数据的转换逻辑。

局限性与潜在风险:

  • 非严格实时: 这是一个近实时的批处理系统。它的延迟取决于调度周期和批处理的大小,无法做到像CDC那样的秒级延迟。
  • 数据一致性: 在“写入ClickHouse”和“删除SQL Server数据”这两个步骤之间存在一个时间窗口。如果服务在写入后、删除前崩溃,那么下一次运行时会重复处理这批数据。这要求写入ClickHouse的操作是幂等的。我们通常通过在目标表上设置唯一键(如果业务允许)或者在加载逻辑中处理重复来解决。
  • Lua脚本的安全性与性能: 允许动态加载脚本带来灵活性的同时,也引入了风险。必须确保Lua运行在一个严格的沙箱环境中,限制其访问文件系统、网络等系统资源。性能不佳的Lua脚本(例如,包含死循环或高复杂度算法)可能会阻塞Elixir的调度器,影响整个应用的性能。在生产环境中,需要对Lua脚本的执行设置超时(timeout)。
  • 对源数据库的压力: 虽然是分批次拉取,但高频度的归档仍然会对源OLTP数据库产生压力。查询语句必须经过精心设计,确保能高效地利用索引(通常是在时间戳和主键上的复合索引),避免全表扫描。在真实的生产系统中,我们会监控查询性能,并实现动态调整批处理大小和拉取频率的“背压”机制。

  目录