基于 ActiveMQ 和 Docker 构建从 Storybook 到 AWS 的异步事件观测管道


我们的前端组件库 CI 流程一度是个黑盒。当一个基于 Playwright 的视觉回归测试失败时,我们得到的通常只是一张像素差异截图和一行“断言失败”的日志。对于一个拥有数百个组件、由数十个团队共同维护的系统来说,这远远不够。我们需要一种方法来捕获组件在交互过程中的详细状态和内部事件,并将这些遥测数据稳定地传送至后端进行分析,而不仅仅是依赖 CI Runner 短暂的 stdout

最初的构想很简单:在 Storybook 的测试环境中注入一个全局事件发射器,然后在 Playwright 脚本里监听这些事件。但这很快就暴露出问题:事件数据与测试进程紧密耦合,一旦测试框架崩溃,所有上下文都将丢失。我们需要一个解耦的、持久化的异步通信机制。团队内部的技术栈里,ActiveMQ 是一个被广泛使用的、经过生产验证的消息中间件。虽然将它用于前端开发工具链听起来有些异想天开,但复用现有且稳定的基础设施,远比引入一套全新的技术栈(如 Kafka 或 RabbitMQ)更具成本效益。

我们的目标是:构建一个从 Storybook 内部触发,经由 ActiveMQ 传输,最终被一个运行在 AWS 上的消费者服务处理的事件管道。所有组件都必须通过 Docker 进行容器化,以确保开发、CI 和最终部署环境的一致性。

第一步:建立稳定可靠的通信枢纽 - Dockerized ActiveMQ

ActiveMQ 的标准协议是 OpenWire,而浏览器环境无法直接使用。幸运的是,ActiveMQ 支持多种协议,包括对 Web 非常友好的 STOMP (Simple Text Oriented Messaging Protocol)。我们可以通过 WebSocket 将 STOMP 暴露给前端。

在真实项目中,我们使用的是一个高可用的 ActiveMQ 集群。但在本地开发和演示中,一个简单的 Docker 容器就足够了。这里的关键在于自定义配置文件 activemq.xml,以启用 STOMP over WebSocket 连接器。

首先,是 docker-compose.yml 的定义:

# docker-compose.yml
version: '3.8'

services:
  activemq:
    image: apache/activemq-classic:5.18.3
    container_name: local-activemq
    ports:
      - "61616:61616" # OpenWire/TCP
      - "8161:8161"   # Web Console
      - "61613:61613" # STOMP over WebSocket
    volumes:
      - ./conf/activemq.xml:/opt/apache-activemq/conf/activemq.xml
      - ./conf/jetty.xml:/opt/apache-activemq/conf/jetty.xml
    environment:
      - ACTIVEMQ_CONFIG_NAME=activemq.xml
      - ACTIVEMQ_CONFIG_DEFAULTACCOUNT=false
      - ACTIVEMQ_ADMIN_LOGIN=admin
      - ACTIVEMQ_ADMIN_PASSWORD=admin
    # 日志驱动配置,方便后续接入集中式日志系统
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

这里的核心是挂载了自定义的 activemq.xml。我们需要在 transportConnectors 部分添加一个新的 transportConnector

<!-- ./conf/activemq.xml (关键部分) -->
<beans ...>
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <pendingMessageLimitStrategy>
                      <constantPendingMessageLimitStrategy limit="1000"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

        <transportConnectors>
            <!-- 默认的 OpenWire 连接器 -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            
            <!-- 关键:添加 STOMP over WebSocket 连接器 -->
            <transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>
</beans>

这个配置使 ActiveMQ 在 61613 端口上监听 WebSocket 连接。现在,任何支持 STOMP 的客户端(包括浏览器)都可以与它通信。

第二步:打造事件源头 - 定制化 Storybook Addon

为了在 Storybook 中方便地发送事件,我们创建了一个专用的 Addon。这个 Addon 提供了一个简单的 UI 界面,允许开发者或测试脚本手动/自动触发事件发送。

Addon 的核心是使用 @stomp/stompjs 库,它是目前在 Web 环境下使用 STOMP 最可靠的选择,支持自动重连和心跳检测。

Addon 的文件结构如下:

.storybook/
├── main.js
├── preview.js
└── addons/
    └── event-emitter/
        ├── manager.js      # Addon UI 注册和渲染
        ├── preset.js       # 告诉 Storybook 这是一个 Addon
        └── StompClient.js  # 封装 STOMP 连接逻辑

preset.js 文件非常简单,只是为了让 Storybook 识别这个目录。

// .storybook/addons/event-emitter/preset.js
function config(entry = []) {
  return [...entry, require.resolve("./manager")];
}

module.exports = { config };

.storybook/main.js 中引用它:

// .storybook/main.js
module.exports = {
  // ... other configs
  addons: [
    // ... other addons
    './addons/event-emitter/preset.js', // 引用本地 Addon
  ],
};

StompClient.js 负责封装所有与 ActiveMQ 的交互逻辑,这是为了保持 UI 组件的纯粹。

// .storybook/addons/event-emitter/StompClient.js
import { Client } from '@stomp/stompjs';

const BROKER_URL = 'ws://localhost:61613';
const DESTINATION = '/topic/storybook.events';

class StompClient {
  constructor() {
    this.client = null;
    this.connectionStatus = 'DISCONNECTED';
    this.statusListeners = new Set();
  }

  // 允许外部组件订阅连接状态的变化
  addStatusListener(callback) {
    this.statusListeners.add(callback);
    return () => this.statusListeners.delete(callback);
  }

  _updateStatus(status) {
    this.connectionStatus = status;
    this.statusListeners.forEach(listener => listener(status));
  }

  connect() {
    if (this.client && this.client.active) {
      console.log('STOMP client is already active.');
      return;
    }

    this.client = new Client({
      brokerURL: BROKER_URL,
      // 在生产环境中,这里应该配置用户名和密码
      // connectHeaders: {
      //   login: 'admin',
      //   passcode: 'admin',
      // },
      debug: (str) => {
        console.log('[STOMP]', str);
      },
      reconnectDelay: 5000, // 5秒后自动重连
      heartbeatIncoming: 4000,
      heartbeatOutgoing: 4000,
    });

    this.client.onConnect = (frame) => {
      this._updateStatus('CONNECTED');
      console.log('Connected to ActiveMQ:', frame);
    };

    this.client.onStompError = (frame) => {
      this._updateStatus('ERROR');
      console.error('Broker reported error:', frame.headers['message']);
      console.error('Additional details:', frame.body);
    };

    this.client.onWebSocketClose = () => {
        this._updateStatus('DISCONNECTED');
        console.log('WebSocket connection closed.');
    };
    
    this._updateStatus('CONNECTING');
    this.client.activate();
  }

  disconnect() {
    if (this.client) {
      this.client.deactivate();
      this.client = null;
      this._updateStatus('DISCONNECTED');
    }
  }

  /**
   * 发送事件
   * @param {object} eventPayload - The event data to send.
   * @param {string} eventPayload.componentId - The ID of the component.
   * @param {string} eventPayload.storyId - The ID of the story.
   * @param {string} eventPayload.eventType - The type of the event (e.g., 'INTERACTION', 'STATE_CHANGE').
   * @param {object} eventPayload.payload - The actual event data.
   */
  sendEvent(eventPayload) {
    if (!this.client || !this.client.active) {
      console.error('Cannot send event. STOMP client is not connected.');
      // 这里的错误处理很重要,可以增加一个本地队列来缓存未发送的事件
      return Promise.reject(new Error('Client not connected'));
    }

    try {
      const message = JSON.stringify({
        ...eventPayload,
        timestamp: new Date().toISOString(),
        source: 'storybook-addon',
      });

      this.client.publish({
        destination: DESTINATION,
        body: message,
        headers: { 'content-type': 'application/json' },
      });

      console.log('Event sent:', eventPayload);
      return Promise.resolve();
    } catch (error) {
      console.error('Failed to send event:', error);
      return Promise.reject(error);
    }
  }
}

// 单例模式,确保整个 Storybook 应用共享一个客户端实例
export const stompClient = new StompClient();

最后是 manager.js,它使用 Storybook 的 Addon API 来渲染一个面板,并与 StompClient 交互。

// .storybook/addons/event-emitter/manager.js
import React, { useState, useEffect } from 'react';
import { addons, types } from '@storybook/manager-api';
import { AddonPanel } from '@storybook/components';
import { useStorybookState } from '@storybook/manager-api';
import { stompClient } from './StompClient';

const ADDON_ID = 'event-emitter-addon';
const PANEL_ID = `${ADDON_ID}/panel`;

const EventPanel = () => {
  const [status, setStatus] = useState(stompClient.connectionStatus);
  const [message, setMessage] = useState('');
  const storybookState = useStorybookState();

  useEffect(() => {
    stompClient.connect();
    const unsubscribe = stompClient.addStatusListener(setStatus);
    return () => {
      unsubscribe();
      // 在 addon 卸载时不应断开连接,以保持连接的持久性
      // stompClient.disconnect();
    };
  }, []);
  
  const handleSend = () => {
    if (!storybookState.storyId) {
      alert("No story selected.");
      return;
    }

    const payload = {
      componentId: storybookState.storyId.split('--')[0],
      storyId: storybookState.storyId,
      eventType: 'MANUAL_TRIGGER',
      payload: { customMessage: message || 'Test Event' },
    };
    stompClient.sendEvent(payload).catch(err => alert(err.message));
  };

  return (
    <div style={{ padding: '10px' }}>
      <h3>ActiveMQ Event Emitter</h3>
      <p>Status: <b style={{ color: status === 'CONNECTED' ? 'green' : 'red' }}>{status}</b></p>
      <textarea
        value={message}
        onChange={(e) => setMessage(e.target.value)}
        placeholder="Enter event message (JSON)"
        rows={4}
        style={{ width: '100%', marginBottom: '10px' }}
      />
      <button onClick={handleSend} disabled={status !== 'CONNECTED'}>
        Send Event
      </button>
    </div>
  );
};

addons.register(ADDON_ID, () => {
  addons.add(PANEL_ID, {
    type: types.PANEL,
    title: 'Event Emitter',
    render: ({ active }) => (
      <AddonPanel active={active}>
        <EventPanel />
      </AddonPanel>
    ),
  });
});

现在,启动 Storybook,你会在底部看到一个 “Event Emitter” 面板,它会尝试连接到本地的 ActiveMQ 实例,并允许你发送消息。

第三步:事件的终点 - AWS 上的消费者服务

事件被发送到 ActiveMQ 后,需要一个服务来消费它们。这个服务我们将用 Node.js 实现,并将其容器化,以便部署到 AWS ECS 或 EKS。在真实场景中,这个服务可能会将数据写入 OpenSearch、ClickHouse 或直接推送到 Datadog/New Relic。为了简化,我们的消费者仅将收到的结构化日志打印到 stdout。在 AWS 环境中,这些日志可以被 CloudWatch Logs Agent 自动收集。

// consumer-service/index.js
const { Client } = require('@stomp/stompjs');
const { v4: uuidv4 } = require('uuid');

const BROKER_URL = process.env.BROKER_URL || 'ws://activemq:61613';
const SUBSCRIBE_DESTINATION = '/topic/storybook.events';

const client = new Client({
  brokerURL: BROKER_URL,
  reconnectDelay: 5000,
  heartbeatIncoming: 4000,
  heartbeatOutgoing: 4000,
});

client.onConnect = (frame) => {
  console.log(JSON.stringify({
    level: 'info',
    message: 'Consumer connected to ActiveMQ broker',
    details: frame.headers,
    timestamp: new Date().toISOString(),
  }));

  client.subscribe(SUBSCRIBE_DESTINATION, (message) => {
    try {
      const body = JSON.parse(message.body);
      // 在这里对消息进行处理、扩充,例如加入 traceId
      const enrichedLog = {
        level: 'info',
        message: 'Received Storybook event',
        traceId: uuidv4(), // 模拟分布式追踪的 traceId
        source: 'activemq-consumer',
        event: body,
        receivedAt: new Date().toISOString(),
      };
      // 输出结构化日志
      console.log(JSON.stringify(enrichedLog));
    } catch (error) {
      console.error(JSON.stringify({
        level: 'error',
        message: 'Failed to process message',
        error: error.message,
        rawMessage: message.body,
        timestamp: new Date().toISOString(),
      }));
    }
  });
};

client.onStompError = (frame) => {
  console.error(JSON.stringify({
    level: 'error',
    message: 'Broker reported an error',
    details: frame.headers,
    body: frame.body,
    timestamp: new Date().toISOString(),
  }));
};

console.log(JSON.stringify({
    level: 'info',
    message: `Consumer service starting, attempting to connect to ${BROKER_URL}`
}));

client.activate();

// 优雅停机处理
process.on('SIGINT', () => {
    console.log(JSON.stringify({ level: 'info', message: 'SIGINT received, deactivating client.' }));
    client.deactivate().then(() => {
        process.exit(0);
    });
});

对应的 Dockerfile 非常标准:

# consumer-service/Dockerfile
FROM node:18-alpine

WORKDIR /usr/src/app

COPY package*.json ./
RUN npm install

COPY . .

# 暴露端口并非必须,因为它是出站连接,但作为好习惯保留
# EXPOSE 3000

ENV BROKER_URL=ws://activemq:61613

CMD [ "node", "index.js" ]

最后,更新 docker-compose.yml 以包含这个新的服务,并建立它们之间的网络连接。

# docker-compose.yml (完整版)
version: '3.8'

services:
  activemq:
    image: apache/activemq-classic:5.18.3
    container_name: local-activemq
    ports:
      - "61616:61616"
      - "8161:8161"
      - "61613:61613"
    volumes:
      - ./conf/activemq.xml:/opt/apache-activemq/conf/activemq.xml
      - ./conf/jetty.xml:/opt/apache-activemq/conf/jetty.xml
    environment:
      - ACTIVEMQ_CONFIG_NAME=activemq.xml
      - ACTIVEMQ_CONFIG_DEFAULTACCOUNT=false
      - ACTIVEMQ_ADMIN_LOGIN=admin
      - ACTIVEMQ_ADMIN_PASSWORD=admin
    logging: &logging_config
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

  consumer:
    build: ./consumer-service
    container_name: event-consumer
    # 依赖 activemq 先启动,但不保证其完全可用
    depends_on:
      - activemq
    # 环境变量指向 compose 网络中的 activemq 服务
    environment:
      - BROKER_URL=ws://activemq:61613
    restart: always
    logging: *logging_config

架构与数据流

现在,整个本地开发和测试流程已经建立。

graph TD
    subgraph Browser
        A[Storybook UI] --> B{Addon Panel};
    end

    subgraph "Docker Network"
        C[ActiveMQ Container]
        D[Consumer Service Container]
    end
    
    subgraph "Observability Backend (AWS)"
        E[CloudWatch Logs / OpenSearch]
    end

    B -- "1. Send Event (STOMP over WebSocket)" --> C;
    C -- "2. Push to Topic 'storybook.events'" --> D;
    D -- "3. Process & Log (Structured JSON)" --> F((stdout));
    F -- "4. Collected by Docker/AWS Agent" --> E;

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px
    style D fill:#cfc,stroke:#333,stroke-width:2px

当开发者在本地运行 docker-compose up 后,他们可以打开 Storybook,在 Addon 面板中发送事件。终端中会立刻看到消费者服务打印出的结构化日志。在 CI 环境中,Playwright 脚本可以执行 JavaScript 来调用 stompClient.sendEvent,从而在自动化测试的特定步骤中注入遥测数据点。这些数据不再是 ephemeral 的控制台输出,而是持久化、可查询的日志流。

局限性与未来展望

这个方案成功地解决了我们的核心痛点,但它并非没有权衡。

首先,引入 ActiveMQ 增加了系统的复杂性。对于小型项目而言,这无疑是过度设计。我们之所以选择它,是因为它已经是我们技术版图中的一部分。

其次,当前的实现是“尽力而为”的。如果消费者服务宕机,ActiveMQ 中的消息(如果是持久化主题或队列)会保留。但如果 Storybook Addon 在连接断开时尝试发送,事件就会丢失。一个更健壮的实现应该在 StompClient 中加入一个内存队列和重试机制,或者使用浏览器的 IndexedDB 进行离线缓存。

最后,这条管道的终点目前只是结构化日志。真正的价值在于将这些事件与分布式追踪系统(如 AWS X-Ray 或 Jaeger)集成。未来的迭代方向是,在消费者服务中不仅仅是生成一个 traceId,而是根据收到的事件,启动或延续一个完整的 Trace Span,并将其导出到 OpenTelemetry Collector。这将允许我们将一次前端组件交互,与后续可能触发的多个后端微服务调用串联起来,真正实现从用户界面到数据库的全链路可观测性。


  目录