Feature/cpm Client-server and communication
- [ ] I have read the CONTRIBUTING.md guidelines.
- [ ] I have registered the PR changes.
Ⅰ. Describe what this PR did
II. Client–Server Communication
During the heartbeat sending cycle, the client reports connection-pool metrics (HikariCP, Druid) to the server at a fixed interval.
1. Client-side Heartbeat Reporting of Metrics
1) Modifications
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
- Added extension points
createMetricsMessage()andshouldReportPoolInfo(), used during the Writer Idle event to determine whether connection-pool metrics should be sent. - In the
IdleStateEvent.WRITER_IDLE_STATE_EVENTbranch, added reporting logic: whenshouldReportPoolInfo()is true, send the result ofcreateMetricsMessage().
Example snippet:
// On Writer Idle, send heartbeat and optionally send connection-pool metrics
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
if (shouldReportPoolInfo()) {
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), createMetricsMessage());
}
core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
- Overrides
createMetricsMessage()andshouldReportPoolInfo(),delegating to the client-side processor. - Adds
setEnableConnectionPoolMetrics(boolean)andsetHttpPort(int),for enabling metrics reporting and setting the client HTTP port. - Registers the connection-pool metrics processor in
registerProcessor().
Example:
@Override
public Object createMetricsMessage() {
return clientConnectionPoolMetricsProcessor.createMetricsMessage();
}
@Override
public boolean shouldReportPoolInfo() {
return clientConnectionPoolMetricsProcessor.shouldReportPoolInfo();
}
// Register processor
clientConnectionPoolMetricsProcessor = new ClientConnectionPoolMetricsProcessor(applicationId);
super.registerProcessor(MessageType.TYPE_CONNECTION_POOL_METRICS, clientConnectionPoolMetricsProcessor, null);
core/src/main/java/org/apache/seata/core/protocol/MessageType.java
- Added a new type code:
TYPE_CONNECTION_POOL_METRICS = 122,used for routing connection-pool metrics messages.
2) Additions
core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientConnectionPoolMetricsProcessor.java
- Client-side metrics processor: constructs
ConnectionPoolMetricsMessage((includes HikariCP and Druid metrics), and controls report throttling (default 10 seconds). - Responsible for generating
clientUrl(ip:port), allowing the server to identify the client address (for update operations).
Example:
public Object createMetricsMessage() {
ConnectionPoolMetricsMessage msg = new ConnectionPoolMetricsMessage();
msg.setApplicationId(applicationId);
msg.setClientUrl(getClientHttpUrl());
msg.setDruidMetrics(DataSourceConnectionPoolCollector.collectAllDruidPoolMetrics());
msg.setHikariMetrics(DataSourceConnectionPoolCollector.collectAllHikariPoolMetrics());
msg.setSequenceNumber(sequence.incrementAndGet());
lastReportTime = System.currentTimeMillis();
msg.setTimestamp(lastReportTime);
return msg;
}
core/src/main/java/org/apache/seata/core/protocol/ConnectionPoolMetricsMessage.java
- Metrics message body: carries application ID, client URL, HikariCP/Druid metrics list, sequence number, timestamp.
Uses type code
TYPE_CONNECTION_POOL_METRICS。
2. Server-side Handling of Metrics
1)Modifications
core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java
- Registers the server-side metrics processor
registerProcessor()inServerConnectionPoolMetricsProcessor,enabling reception and processing of client reports.
2)Additions
core/src/main/java/org/apache/seata/core/rpc/processor/server/ServerConnectionPoolMetricsProcessor.java
- Server-side processor: parses messages and dispatches them to
ConnectionPoolInfoCache,where metrics are aggregated bypoolNameacross multiple clients.
core/src/main/java/org/apache/seata/core/rpc/processor/server/ConnectionPoolInfoCache.java
- Server-side cache: stores HikariCP/Druid metrics, pool type, and client URL grouped by pool name.
- Provides
refresh(PoolConfigUpdateRequest)to update the configuration snapshot in cache.
Auxiliary types:
core/src/main/java/org/apache/seata/core/protocol/PoolType.java- Enumeration of connection-pool types.
3.Serialization/Deserialization: Protobuf
1)Modifications
serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/manager/ProtobufConvertManager.java
Purpose: Protobuf Conversion Manager — responsible for registering and managing converters for all message types.
- Maintains three core mapping tables.
- Provides registration and lookup for type converters.
- Supports bi-directional conversion (Java Model ↔ Protobuf Message).
protobufConvertManager.convertorMap.put(
ConnectionPoolMetricsMessage.class.getName(), new ConnectionPoolMetricsMessageConvertor());
protobufConvertManager.protoClazzMap.put(
ConnectionPoolMetricsMessageProto.getDescriptor().getFullName(),
ConnectionPoolMetricsMessageProto.class);
protobufConvertManager.reverseConvertorMap.put(
ConnectionPoolMetricsMessageProto.class.getName(), new ConnectionPoolMetricsMessageConvertor());
2)Additions
serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/convertor/ConnectionPoolMetricsMessageConvertor.java
Purpose: Handles Protobuf serialization and deserialization of connection-pool metrics messages.
@Override
public ConnectionPoolMetricsMessageProto convert2Proto(ConnectionPoolMetricsMessage msg) {
// Build protobuf message, handle null fields
builder.setApplicationId(msg.getApplicationId() == null ? "" : msg.getApplicationId());
// Convert Hikari & Druid metrics
}
@Override
public ConnectionPoolMetricsMessage convert2Model(ConnectionPoolMetricsMessageProto proto) {
// Restore Java object from protobuf message
// Handle conversion for list types
}
serializer/seata-serializer-protobuf/src/main/resources/protobuf/org/apache/seata/protocol/transcation/connectionPoolMetricsMessage.proto
Purpose: Defines the Protobuf schema for connection-pool metrics messages.
4.Serialization/Deserialization: Seata Codec
1)Modifications
serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/MessageCodecFactory.java
Purpose: Factory for Seata message codecs; creates corresponding codec instances based on message type.
-
New imports:
- Added import for
ConnectionPoolMetricsMessage - Added import for
ConnectionPoolMetricsMessageCodec
- Added import for
-
getMessageCodec method:
- Added support for
MessageType.TYPE_CONNECTION_POOL_METRICS - Returns instance of
ConnectionPoolMetricsMessageCodec
- Added support for
-
getMessage method:
- Added support for
MessageType.TYPE_CONNECTION_POOL_METRICS - Returns new
ConnectionPoolMetricsMessageinstance
- Added support for
case MessageType.TYPE_CONNECTION_POOL_METRICS:
msgCodec = new ConnectionPoolMetricsMessageCodec(version);
break;
case MessageType.TYPE_CONNECTION_POOL_METRICS:
abstractMessage = new ConnectionPoolMetricsMessage();
break;
2)Additions
serializer/seata-serializer-seata/src/main/java/org/apache/seata/serializer/seata/protocol/ConnectionPoolMetricsMessageCodec.java
Purpose: Implements binary encoding/decoding for ConnectionPoolMetricsMessage supporting compact serialization of connection-pool monitoring data.
-
Message type support:
- Implements
MessageSeataCodec - Handles only
ConnectionPoolMetricsMessage
- Implements
-
Encoding:
- Encodes metrics message into binary
- Serializes HikariCP/Druid metrics, SQL execution records, slow-SQL lists
-
Decoding:
- Decodes binary data back into
ConnectionPoolMetricsMessage - Supports list reconstruction for both pool types and SQL entries
- Decodes binary data back into
@Override
public Class<?> getMessageClassType() {
return ConnectionPoolMetricsMessage.class;
}
@Override
public <T> void encode(T t, ByteBuf out) {
// Binary encoding of connection-pool metrics
}
@Override
public <T> void decode(T t, ByteBuffer in) {
// Binary decoding of connection-pool metrics
}
5.Explanation of Key Method Changes
-
Client sending timing (core loop):
AbstractNettyRemotingClientshouldReportPoolInfo():default false; subclasses determine reporting based on switch and time window.createMetricsMessage():default returnsHeartbeatMessage.PING,RM overrides to returnConnectionPoolMetricsMessage。
-
RM client settings:
RmNettyRemotingClientsetEnableConnectionPoolMetrics(boolean):setHttpPort(int):registerProcessor():RegistersClientConnectionPoolMetricsProcessorfor message construction & processing.
-
Metrics collection:
DataSourceConnectionPoolCollectorcollectHikariMetrics(...):reads metrics viaHikariPoolMXBeanandHikariDataSourcecollectDruidMetrics(...):reads pool metrics fromDruidDataSourceand injects SQL execution data.updateConfig(poolName, request):updates pool config using reflection, ensuring safety.
-
Message construction & throttling:
ClientConnectionPoolMetricsProcessorcreateMetricsMessage():wraps application ID, client URL, metrics, sequence, timestamp.shouldReportPoolInfo():throttles based on last report time (default every 10 seconds).
-
Server-side processing chain:
ServerConnectionPoolMetricsProcessor→ConnectionPoolInfoCache- Aggregates metrics by
poolNameand stores pool type + client URL.
- Aggregates metrics by
-
Protocol & serialization:
ConnectionPoolMetricsMessageandMessageType.TYPE_CONNECTION_POOL_METRICS- Fully compatible with Netty/Grpc serialization paths.
6.Class Relationship Diagram
┌──────────────────────┐ ┌──────────────────────────────────────┐
│ RmNettyRemotingClient│ uses │ ClientConnectionPoolMetricsProcessor │
└─────────┬────────────┘ └──────────────┬───────────────────────┘
│ │ builds
│ ▼
│ ConnectionPoolMetricsMessage
│ │ carries
│ ▼
heartbeat │ HikariConnectionPoolMetrics / DruidConnectionPoolMetrics
│ ▲
│ │ collects
│ DataSourceConnectionPoolCollector
│ ▲
│ │ uses
│ SqlCollector(+Slow/Exec)
│
▼
┌────────────────────┐ ┌─────────────────────────────────────┐
│NettyRemotingServer │ uses │ ServerConnectionPoolMetricsProcessor│
└────────────────────┘ └─────────────────────────────────────┘
│ cache
▼
┌────────────────────────┐
│ ConnectionPoolInfoCache│
└────────────────────────┘