incubator-seata icon indicating copy to clipboard operation
incubator-seata copied to clipboard

Feature/cpm Client-server and communication

Open Oscarcheng0312 opened this issue 1 month ago • 0 comments

Ⅰ. Describe what this PR did

II. Client–Server Communication

During the heartbeat sending cycle, the client reports connection-pool metrics (HikariCP, Druid) to the server at a fixed interval.

1. Client-side Heartbeat Reporting of Metrics

1) Modifications

core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java

  • Added extension points createMetricsMessage() and shouldReportPoolInfo(), used during the Writer Idle event to determine whether connection-pool metrics should be sent.
  • In the IdleStateEvent.WRITER_IDLE_STATE_EVENT branch, added reporting logic: when shouldReportPoolInfo() is true, send the result of createMetricsMessage().

Example snippet:

// On Writer Idle, send heartbeat and optionally send connection-pool metrics
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
if (shouldReportPoolInfo()) {
    AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), createMetricsMessage());
}

core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java

  • Overrides createMetricsMessage() and shouldReportPoolInfo(),delegating to the client-side processor.
  • Adds setEnableConnectionPoolMetrics(boolean) and setHttpPort(int),for enabling metrics reporting and setting the client HTTP port.
  • Registers the connection-pool metrics processor in registerProcessor() .

Example:

@Override
public Object createMetricsMessage() {
    return clientConnectionPoolMetricsProcessor.createMetricsMessage();
}

@Override
public boolean shouldReportPoolInfo() {
    return clientConnectionPoolMetricsProcessor.shouldReportPoolInfo();
}

// Register processor
clientConnectionPoolMetricsProcessor = new ClientConnectionPoolMetricsProcessor(applicationId);
super.registerProcessor(MessageType.TYPE_CONNECTION_POOL_METRICS, clientConnectionPoolMetricsProcessor, null);

core/src/main/java/org/apache/seata/core/protocol/MessageType.java

  • Added a new type code: TYPE_CONNECTION_POOL_METRICS = 122,used for routing connection-pool metrics messages.

2) Additions

core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientConnectionPoolMetricsProcessor.java

  • Client-side metrics processor: constructs ConnectionPoolMetricsMessage((includes HikariCP and Druid metrics), and controls report throttling (default 10 seconds).
  • Responsible for generating clientUrl(ip:port), allowing the server to identify the client address (for update operations).

Example:

public Object createMetricsMessage() {
    ConnectionPoolMetricsMessage msg = new ConnectionPoolMetricsMessage();
    msg.setApplicationId(applicationId);
    msg.setClientUrl(getClientHttpUrl());
    msg.setDruidMetrics(DataSourceConnectionPoolCollector.collectAllDruidPoolMetrics());
    msg.setHikariMetrics(DataSourceConnectionPoolCollector.collectAllHikariPoolMetrics());
    msg.setSequenceNumber(sequence.incrementAndGet());
    lastReportTime = System.currentTimeMillis();
    msg.setTimestamp(lastReportTime);
    return msg;
}

core/src/main/java/org/apache/seata/core/protocol/ConnectionPoolMetricsMessage.java

  • Metrics message body: carries application ID, client URL, HikariCP/Druid metrics list, sequence number, timestamp. Uses type code TYPE_CONNECTION_POOL_METRICS

2. Server-side Handling of Metrics

1)Modifications

core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java

  • Registers the server-side metrics processor registerProcessor() in ServerConnectionPoolMetricsProcessor,enabling reception and processing of client reports.

2)Additions

core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerConnectionPoolMetricsProcessor.java

  • Server-side processor: parses messages and dispatches them to ConnectionPoolInfoCache,where metrics are aggregated by poolName across multiple clients.

core/src/main/java/org/apache/seata/core/rpc/processor/server/ConnectionPoolInfoCache.java

  • Server-side cache: stores HikariCP/Druid metrics, pool type, and client URL grouped by pool name.
  • Provides refresh(PoolConfigUpdateRequest) to update the configuration snapshot in cache.

Auxiliary types:

  • core/src/main/java/org/apache/seata/core/protocol/PoolType.java
    • Enumeration of connection-pool types.

3.Serialization/Deserialization: Protobuf

1)Modifications

serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/manager/ProtobufConvertManager.java

Purpose: Protobuf Conversion Manager — responsible for registering and managing converters for all message types.

  • Maintains three core mapping tables.
  • Provides registration and lookup for type converters.
  • Supports bi-directional conversion (Java Model ↔ Protobuf Message).
protobufConvertManager.convertorMap.put(
        ConnectionPoolMetricsMessage.class.getName(), new ConnectionPoolMetricsMessageConvertor());

protobufConvertManager.protoClazzMap.put(
        ConnectionPoolMetricsMessageProto.getDescriptor().getFullName(),
        ConnectionPoolMetricsMessageProto.class);

protobufConvertManager.reverseConvertorMap.put(
        ConnectionPoolMetricsMessageProto.class.getName(), new ConnectionPoolMetricsMessageConvertor());

2)Additions

serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/convertor/ConnectionPoolMetricsMessageConvertor.java

Purpose: Handles Protobuf serialization and deserialization of connection-pool metrics messages.

@Override
public ConnectionPoolMetricsMessageProto convert2Proto(ConnectionPoolMetricsMessage msg) {
    // Build protobuf message, handle null fields
    builder.setApplicationId(msg.getApplicationId() == null ? "" : msg.getApplicationId());
    // Convert Hikari & Druid metrics
}

@Override
public ConnectionPoolMetricsMessage convert2Model(ConnectionPoolMetricsMessageProto proto) {
    // Restore Java object from protobuf message
    // Handle conversion for list types
}

serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/connectionPoolMetricsMessage.proto

Purpose: Defines the Protobuf schema for connection-pool metrics messages.

4.Serialization/Deserialization: Seata Codec

1)Modifications

serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java

Purpose: Factory for Seata message codecs; creates corresponding codec instances based on message type.

  1. New imports

    • Added import for ConnectionPoolMetricsMessage
    • Added import for ConnectionPoolMetricsMessageCodec
  2. getMessageCodec method

    • Added support for MessageType.TYPE_CONNECTION_POOL_METRICS
    • Returns instance ofConnectionPoolMetricsMessageCodec
  3. getMessage method

    • Added support for MessageType.TYPE_CONNECTION_POOL_METRICS
    • Returns new ConnectionPoolMetricsMessage instance
case MessageType.TYPE_CONNECTION_POOL_METRICS:
    msgCodec = new ConnectionPoolMetricsMessageCodec(version);
    break;

case MessageType.TYPE_CONNECTION_POOL_METRICS:
    abstractMessage = new ConnectionPoolMetricsMessage();
    break;

2)Additions

serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/ConnectionPoolMetricsMessageCodec.java

Purpose: Implements binary encoding/decoding for ConnectionPoolMetricsMessage supporting compact serialization of connection-pool monitoring data.

  1. Message type support

    • Implements MessageSeataCodec
    • Handles only ConnectionPoolMetricsMessage
  2. Encoding

    • Encodes metrics message into binary
    • Serializes HikariCP/Druid metrics, SQL execution records, slow-SQL lists
  3. Decoding

    • Decodes binary data back into ConnectionPoolMetricsMessage
    • Supports list reconstruction for both pool types and SQL entries
@Override
public Class<?> getMessageClassType() {
    return ConnectionPoolMetricsMessage.class;
}

@Override
public <T> void encode(T t, ByteBuf out) {
    // Binary encoding of connection-pool metrics
}

@Override
public <T> void decode(T t, ByteBuffer in) {
    // Binary decoding of connection-pool metrics
}

5.Explanation of Key Method Changes

  • Client sending timing (core loop):AbstractNettyRemotingClient

    • shouldReportPoolInfo():default false; subclasses determine reporting based on switch and time window.
    • createMetricsMessage():default returns HeartbeatMessage.PING,RM overrides to return ConnectionPoolMetricsMessage
  • RM client settings: RmNettyRemotingClient

    • setEnableConnectionPoolMetrics(boolean):
    • setHttpPort(int):
    • registerProcessor():Registers ClientConnectionPoolMetricsProcessor for message construction & processing.
  • Metrics collection: DataSourceConnectionPoolCollector

    • collectHikariMetrics(...):reads metrics via HikariPoolMXBean and HikariDataSource
    • collectDruidMetrics(...):reads pool metrics from DruidDataSourceand injects SQL execution data.
    • updateConfig(poolName, request):updates pool config using reflection, ensuring safety.
  • Message construction & throttling: ClientConnectionPoolMetricsProcessor

    • createMetricsMessage():wraps application ID, client URL, metrics, sequence, timestamp.
    • shouldReportPoolInfo():throttles based on last report time (default every 10 seconds).
  • Server-side processing chain: ServerConnectionPoolMetricsProcessorConnectionPoolInfoCache

    • Aggregates metrics by poolName and stores pool type + client URL.
  • Protocol & serialization:

    • ConnectionPoolMetricsMessage and MessageType.TYPE_CONNECTION_POOL_METRICS
    • Fully compatible with Netty/Grpc serialization paths.

6.Class Relationship Diagram

┌──────────────────────┐           ┌──────────────────────────────────────┐
│ RmNettyRemotingClient│    uses   │ ClientConnectionPoolMetricsProcessor │
└─────────┬────────────┘           └──────────────┬───────────────────────┘
          │                                       │ builds
          │                                       ▼
          │                         ConnectionPoolMetricsMessage
          │                                       │ carries
          │                                       ▼
heartbeat │                HikariConnectionPoolMetrics / DruidConnectionPoolMetrics
          │                                       ▲
          │                                       │ collects
          │                         DataSourceConnectionPoolCollector
          │                                       ▲
          │                                       │ uses
          │                               SqlCollector(+Slow/Exec)
          │                                       
          ▼
┌────────────────────┐            ┌─────────────────────────────────────┐  
│NettyRemotingServer │   uses     │ ServerConnectionPoolMetricsProcessor│
└────────────────────┘            └─────────────────────────────────────┘
                                                 		│  cache                                     
                                                 		▼
                                        ┌────────────────────────┐  
                                        │ ConnectionPoolInfoCache│
                                        └────────────────────────┘

Ⅴ. Special notes for reviews

Oscarcheng0312 avatar Nov 19 '25 02:11 Oscarcheng0312