基于 Quarkus 与图数据库构建 mTLS 双向认证的实时数据推送服务


在构建一个高安全等级的内部服务时,一个棘手的技术痛点浮出水面:如何确保调用方(无论是自动化服务还是内部工具)的身份是绝对可信的?传统的 API Key 或 Bearer Token 方案,在密钥泄露时会带来巨大的安全风险。我们需要一种基于密码学的强认证机制,确保通信双方都持有可信的私钥,这便是双向 TLS(mTLS)的用武之地。

我们的目标是构建一个供应链实时异常监控服务的核心引擎。它需要满足两个核心要求:

  1. 通过一个 RESTful API 接收来自可信数据源(例如,仓库管理系统、物流卡车传感器)的事件更新,这些数据源必须通过 mTLS 认证。
  2. 在接收到关键事件后,立即通过图数据库分析其潜在影响,并将告警信息通过安全的 WebSocket 连接实时推送给已认证的监控仪表盘。

这个场景下,选择一个像 Istio 这样的服务网格来处理 mTLS 是一种常见的重量级方案。但在我们的案例中,该服务是边界服务,且为了追求极致的性能和更低的运维复杂度,我们决定在应用层直接实现 mTLS。Quarkus 以其对 GraalVM 原生编译的卓越支持和内建的强大网络与安全功能,成为了实现这一目标的理想技术栈。

第一步:奠定信任的基石,生成证书体系

在真实项目中,证书管理通常由一个专门的公钥基础设施(PKI)系统,如 HashiCorp Vault,来负责。但为了完整地理解 mTLS 的工作原理,我们从根源入手,使用 openssl 手动创建一个微型的证书体系。这是整个安全架构中最容易出错但也是最关键的一环。

我们需要创建以下实体:

  • 一个自签名的证书颁发机构(CA),作为信任的根。
  • 一个由该 CA 签发的服务器证书,用于我们的 Quarkus 应用。
  • 一个由该 CA 签发的客户端证书,用于模拟调用我们 API 的可信微服务。

创建一个脚本 generate-certs.sh 来自动化这个过程:

#!/bin/bash

# 清理旧文件
rm -rf ./certs
mkdir -p ./certs

# 1. 创建根证书颁发机构 (CA)
echo "--- Generating CA ---"
openssl genrsa -out ./certs/ca.key 4096
openssl req -x509 -new -nodes -key ./certs/ca.key -sha256 -days 3650 -out ./certs/ca.pem -subj "/CN=MyInternalCA"

# 2. 创建服务器证书
echo "--- Generating Server Certificate ---"
openssl genrsa -out ./certs/server.key 4096
openssl req -new -key ./certs/server.key -out ./certs/server.csr -subj "/CN=localhost"
openssl x509 -req -in ./certs/server.csr -CA ./certs/ca.pem -CAkey ./certs/ca.key -CAcreateserial -out ./certs/server.crt -days 365 -sha256

# 3. 创建客户端证书
echo "--- Generating Client Certificate ---"
openssl genrsa -out ./certs/client.key 4096
openssl req -new -key ./certs/client.key -out ./certs/client.csr -subj "/CN=trusted-data-source-1"
openssl x509 -req -in ./certs/client.csr -CA ./certs/ca.pem -CAkey ./certs/ca.key -CAcreateserial -out ./certs/client.crt -days 365 -sha256

# 4. 为 Quarkus (Java) 创建 PKCS12 格式的 Keystore 和 Truststore
echo "--- Creating Keystore and Truststore for Server ---"
# 服务器 Keystore: 包含服务器的私钥和证书链
openssl pkcs12 -export -out ./certs/server.keystore.p12 -name server -inkey ./certs/server.key -in ./certs/server.crt -certfile ./certs/ca.pem -password pass:password

# 服务器 Truststore: 包含它信任的 CA 证书
keytool -import -trustcacerts -alias ca -file ./certs/ca.pem -keystore ./certs/server.truststore.p12 -storepass password -noprompt

# 5. 为客户端 (Java/curl) 创建 PKCS12 格式的 Keystore 和 Truststore
echo "--- Creating Keystore and Truststore for Client ---"
# 客户端 Keystore: 包含客户端的私钥和证书链
openssl pkcs12 -export -out ./certs/client.keystore.p12 -name client -inkey ./certs/client.key -in ./certs/client.crt -certfile ./certs/ca.pem -password pass:password

# 客户端 Truststore: 包含它信任的 CA 证书
keytool -import -trustcacerts -alias ca -file ./certs/ca.pem -keystore ./certs/client.truststore.p12 -storepass password -noprompt

echo "--- Certificate generation complete in ./certs directory ---"

这个脚本清晰地定义了信任链:客户端和服务端都拥有由同一个 CA 签发的证书。当它们通信时,服务端会要求客户端出示证书,并用自己的信任库(server.truststore.p12,其中包含 CA 证书)来验证该证书的合法性。反之亦然。

第二步:配置 Quarkus,启用 mTLS 壁垒

Quarkus 的配置文件 application.properties 是声明式配置的核心。我们只需将上一步生成的证书库路径和密码告知 Quarkus,并强制要求客户端认证。

将生成好的 server.keystore.p12server.truststore.p12 放置到 src/main/resources 目录下。

# src/main/resources/application.properties

# --- Core Application Config ---
quarkus.application.name=supply-chain-monitor
quarkus.application.version=1.0.0

# --- HTTP Server & mTLS Configuration ---
# 启用 HTTPS,这是 mTLS 的前提
quarkus.http.ssl.port=8443

# 配置服务器的密钥库 (Keystore)
# 它包含了服务器自己的私钥和证书
quarkus.http.ssl.certificate.key-store-file=server.keystore.p12
quarkus.http.ssl.certificate.key-store-password=password
quarkus.http.ssl.certificate.key-store-file-type=PKCS12

# 配置服务器的信任库 (Truststore)
# 它包含了服务器信任的 CA 证书,用于验证客户端证书
quarkus.http.ssl.certificate.trust-store-file=server.truststore.p12
quarkus.http.ssl.certificate.trust-store-password=password
quarkus.http.ssl.certificate.trust-store-file-type=PKCS12

# 关键: 设置客户端认证策略为 "REQUIRED"
# 这会强制所有连接到 8443 端口的客户端必须提供一个有效的、可信的证书
quarkus.http.ssl.client-auth=REQUIRED

# --- Neo4j Graph Database Configuration ---
# 我们使用官方的 Neo4j Java Driver
quarkus.neo4j.uri=bolt://localhost:7687
quarkus.neo4j.authentication.username=neo4j
quarkus.neo4j.authentication.password=your_neo4j_password

# --- Logging ---
quarkus.log.level=INFO
quarkus.log.category."com.example".level=DEBUG

这里的 quarkus.http.ssl.client-auth=REQUIRED 是整个 mTLS 设置的开关。设置为 REQUEST 会请求证书但非强制,而 REQUIRED 则会拒绝任何没有提供有效证书的连接。在生产环境中,这提供了一道坚固的防线。

第三步:构建图模型与数据接入层

我们的供应链模型可以简化为节点和关系:(:Part)-[:SUPPLIED_BY]->(:Supplier)(:Shipment)-[:CONTAINS]->(:Part)(:Shipment)-[:DELAYED]。当一个 Shipment 延迟,我们需要快速找到所有受影响的 Part 以及它们的 Supplier

Graph Service
这个服务封装了与 Neo4j 的所有交互逻辑。

package com.example.service;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.neo4j.driver.Driver;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveSession;

import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
public class GraphService {

    private static final Logger LOGGER = Logger.getLogger(GraphService.class.getName());

    @Inject
    Driver driver;

    /**
     * 当一个货运批次发生延迟时,记录该事件并找出所有受影响的下游零件。
     * 这是一个典型的图查询场景,在关系型数据库中实现会非常复杂和低效。
     * @param shipmentId 延迟的货运批次ID
     * @return 一个包含受影响零件信息的 Multi (Reactive Stream)
     */
    public Multi<Map<String, Object>> processShipmentDelay(String shipmentId) {
        // 在真实项目中,Cypher 查询会更复杂,可能涉及多层关系和路径分析
        String cypherQuery = """
            MATCH (s:Shipment {id: $shipmentId})
            // 确保事件的幂等性,如果已标记为延迟则不重复操作
            MERGE (s)-[d:IS_DELAYED]->(e:DelayEvent {timestamp: timestamp()})
            WITH s
            MATCH (s)-[:CONTAINS]->(p:Part)
            RETURN p.id AS partId, p.name AS partName
            """;

        // 使用 Mutiny 和 Neo4j Reactive Driver
        // 这种响应式编程模型非常适合 I/O 密集型操作
        return Multi.createFrom().resource(
            () -> driver.session(ReactiveSession.class),
            session -> session.executeRead(tx -> {
                LOGGER.info("Executing Cypher for shipment: " + shipmentId);
                ReactiveResult result = tx.run(cypherQuery, Map.of("shipmentId", shipmentId));
                // 将响应式结果流转换为记录流
                return Multi.createFrom().publisher(result.records())
                    .map(record -> record.asMap());
            })
        ).withFinalizer(ReactiveSession::close);
    }
}

mTLS Protected REST Endpoint
这个 JAX-RS 端点是数据入口。注意,代码本身并不包含任何 mTLS 相关的逻辑。Quarkus 的底层网络服务器(Undertow)在请求到达我们的应用代码之前就已经完成了 TLS 握手和客户端证书验证。

package com.example.api;

import com.example.service.GraphService;
import com.example.websocket.AlertWebSocket;
import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.SecurityContext;

import java.security.Principal;
import java.util.logging.Logger;

@Path("/events")
public class ShipmentEventResource {

    private static final Logger LOGGER = Logger.getLogger(ShipmentEventResource.class.getName());

    @Inject
    GraphService graphService;

    @Inject
    AlertWebSocket alertWebSocket;

    @POST
    @Path("/shipment/{id}/delay")
    public void reportDelay(@PathParam("id") String shipmentId, @Context SecurityContext securityContext) {
        Principal principal = securityContext.getUserPrincipal();
        // 关键日志: 验证调用者身份。principal.getName() 会返回客户端证书的主题名称(CN)
        // 例如 "CN=trusted-data-source-1"
        LOGGER.info("Received delay event for shipment " + shipmentId + " from authenticated client: " + principal.getName());

        // 业务逻辑: 处理事件并广播
        graphService.processShipmentDelay(shipmentId)
            .subscribe().with(
                affectedPart -> {
                    // 对于每个受影响的零件,通过 WebSocket 广播一个告警
                    String alertMessage = String.format("ALERT: Shipment %s delay impacts Part %s (%s)",
                        shipmentId, affectedPart.get("partId"), affectedPart.get("partName"));
                    alertWebSocket.broadcast(alertMessage);
                },
                failure -> LOGGER.severe("Error processing delay event: " + failure.getMessage())
            );
    }
}

通过注入 SecurityContext,我们可以获取到已认证客户端的 Principal,其名称就是客户端证书的 Subject DN。这是一个非常有用的审计和授权手段。在真实项目中,可以基于这个 Principal 实现更细粒度的访问控制。

第四步:实现安全的实时 WebSocket 推送

当 REST API 接收到异常事件并经由图数据库分析后,我们需要将结果实时推送出去。WebSocket 是实现这一目标的不二之选。一个常见的误区是认为需要为 WebSocket 单独配置一套安全机制。实际上,WebSocket 的初始连接请求是一个 HTTP GET 请求(Upgrade 头),因此它会完全复用我们在 application.properties 中为整个应用配置的 HTTPS 和 mTLS 策略。

package com.example.websocket;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

@ServerEndpoint("/alerts")
@ApplicationScoped
public class AlertWebSocket {

    private static final Logger LOGGER = Logger.getLogger(AlertWebSocket.class.getName());

    // 使用线程安全的 Set 来存储所有活跃的 WebSocket 会话
    private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap<>());

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
        // 在此可以获取客户端证书信息,原理同 REST
        // Principal principal = session.getUserPrincipal();
        // LOGGER.info("WebSocket connection opened from: " + (principal != null ? principal.getName() : "Anonymous"));
        LOGGER.info("WebSocket connection opened: " + session.getId());
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
        LOGGER.info("WebSocket connection closed: " + session.getId());
    }

    /**
     * 向所有连接的客户端广播消息。
     * 在生产环境中,这应该被一个更健壮的消息队列(如 Kafka)或主题订阅模式所取代。
     * @param message 要广播的消息
     */
    public void broadcast(String message) {
        LOGGER.info("Broadcasting alert: " + message);
        sessions.forEach(session -> {
            session.getAsyncRemote().sendText(message, result -> {
                if (result.getException() != null) {
                    LOGGER.warning("Failed to send message to session " + session.getId() + ": " + result.getException().getMessage());
                }
            });
        });
    }
}

第五步:验证与测试

验证 mTLS 配置是否生效,最好的方式是使用 curl。我们需要指定客户端证书、客户端私钥以及 CA 证书来成功发起请求。

成功请求 (使用正确的客户端证书):

curl --verbose \
     --cert ./certs/client.crt \
     --key ./certs/client.key \
     --cacert ./certs/ca.pem \
     -X POST https://localhost:8443/events/shipment/SHP-001/delay

你将会在 Quarkus 应用的日志中看到 Received delay event ... from authenticated client: CN=trusted-data-source-1

失败请求 (不提供客户端证书):

curl --verbose --cacert ./certs/ca.pem -X POST https://localhost:8443/events/shipment/SHP-001/delay

这个请求会立即被服务器拒绝,curl 的输出会包含 TLS 握手失败的信息,例如 alert certificate required。这证明了我们的 mTLS 壁垒正在生效。

架构流程总览

整个数据流和安全验证过程可以用下图清晰地表示:

sequenceDiagram
    participant Client as Trusted Service
    participant Quarkus as Quarkus mTLS Service
    participant Neo4j as Graph Database
    participant Dashboard as Monitoring Dashboard

    Client->>+Quarkus: POST /events/shipment/... (HTTPS with Client Cert)
    Note over Quarkus: TLS Handshake & mTLS Validation
    Quarkus->>-Client: Handshake OK
    Quarkus->>Quarkus: ShipmentEventResource.reportDelay()
    Note over Quarkus: Authenticated Principal: "CN=trusted-data-source-1"
    Quarkus->>+Neo4j: Cypher Query (Find impacted parts)
    Neo4j-->>-Quarkus: Return impacted parts stream
    Quarkus->>Dashboard: WebSocket Broadcast("ALERT: ...")

局限性与未来展望

我们成功地在 Quarkus 中原生实现了 mTLS,为 REST API 和 WebSocket 提供了统一且强大的安全保障,并结合图数据库处理了复杂的关联分析。然而,这个方案并非没有权衡。

首先,证书管理的复杂性。手动通过 openssl 管理证书在生产环境中是不可行的。证书的签发、分发、吊销和轮换需要一个自动化的 PKI 解决方案。若没有这样的基础设施,维护成本会迅速攀升。

其次,WebSocket 的广播模型过于简单。当前的实现将告警广播给所有连接的客户端。在一个多租户或有不同关注点的系统中,需要引入基于主题的发布/订阅模型,可能需要借助 Redis Pub/Sub 或 Apache Kafka 来解耦,实现更精细的消息路由。

最后,原生 mTLS 与服务网格的权衡。我们选择原生实现,获得了极致的性能和较低的运行时依赖。但代价是失去了服务网格提供的统一可观测性、高级流量控制(如金丝雀发布)和集中的策略管理能力。对于一个拥有大量微服务的复杂系统,服务网格的运维优势可能会超过其性能开销。因此,该方案更适用于对性能和安全有极致要求的边界服务,或内部微服务数量可控的场景。


  目录