pulsar
pulsar copied to clipboard
[improve][txn] PIP-160 Metrics stats of Transaction buffered writer
Master Issue: #15370
Motivation
see #15370
Modifications
I will complete proposal #15370 with these pull requests( current pull request is a part of step 7-1 ):
- Write the batch transaction log handler:
TxnLogBufferedWriter - Configuration changes and protocol changes.
- Transaction log store enables the batch feature.
- Pending ack log store enables the batch feature.
- Supports dynamic configuration.
- Append admin API for transaction batch log and docs( admin and configuration doc ). GET /admin/v3/transactions/coordinatorStats GET /admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName
- Append metrics support for transaction batch log.
7-1. Metrics of Txn Buffered Writer.
7-2.
TransactionLogandPendingAckStoreenables the Metrics of Txn Buffered Writer
The desired effect
TransactionLog should create TxnLogBufferedWriter with params:
{
"metricsPrefix": "pulsar_txn_tc",
"labelNames": "coordinatorId",
"labelValues": "1"
}
The metrics output of TransactionLog will like this:
# A metrics for how many batches were triggered due to threshold "batchedWriteMaxRecords".
# TYPE pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records Counter
pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records{coordinatorId="1"} 15
...
...
...
# pulsar_txn_tc_batched_log_records_count_per_entry A metrics for how many records in per batch written by the component[pulsar_txn_tc] per batch.
# TYPE pulsar_txn_tc_batched_log_records_count_per_entry Histogram
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="10"} 1
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="50"} 3
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="100"} 5
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="500"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="1000"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_count{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_sum{coordinatorId="1", le="+Inf"} 5432
PendingAckStore is the same. But all the PendingackStores will not differentiate the Subscription labels (because there are too many)
Manage the registered collectors ourselves.
To build Metrics Stat, we need to execute these two steps:
- Create
Collectorand register toCollectorRegistry, perhaps the Collector isHistogramorCounter - Register labels to
Collectorand getCollector.child(holds by Metrics Stat). This step can also be omitted because we can executecollector.labels(labelValues)to getCollector.child.
In the Transaction log scenario, multiple Transaction Logs share the same Collector, and each has its own Collector.Child, so when we build metrics stat for each Transaction Log, we call collector.labels(labelValues) to get the Collector.Child. However, the CollectorRegistry does not provide an API like this:
public Collector getRegistedCollector(String name);
and it will throw IllegalArgumentException when we registering collector with the same name more than once, see:
https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/CollectorRegistry.java#L49-L65
So we have to manage the registered collectors ourselves.
Holds the Collector.child by each Metrics stat instance
To save the overhead of collector.labels(labelValues), we make each Metrics Stat hold a reference of Collector.child, because this method is not light enough:
https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/SimpleCollector.java#L63-L80
Code will be removed in the next PR (7-2)
In the complete design, we should have two implementations like UML blow, one for enabling the batch feature, and another for disabled:

To reduce later maintenance costs, I'd like to ditch the 'DisabledMetricsStat' and we'll always use the implementation 'MetricsStatimpl' even if the Txn buffer writer disables the batch feature. This constructor without 'param-metricsStats' and these' null checks' will be removed in the next PR. This is compatible only with split PR, making each PR have less code
Documentation
-
[ ]
doc-required -
[x]
doc-not-needed -
[ ]
doc -
[ ]
doc-complete
@asafm Could you please help review the PR?
Hi @tjiuming
I don't see where the
TxnLogBufferedWriterMetricsDefinition.java'slabelNamesandlabelValueshas been set, except in the tests.
TxnLogBufferedWriterMetricsDefinitionwill create byMlTransactionLogImplandMlPendingAckStorein next PR.- In tests
TxnLogBufferedWriterTest,labelNamesandlabelValuesis set by Initializes block which is at top of the class.
/pulsarbot rerun-failure-checks
I started writing a few comments then I realized I have some profound suggestions I thought maybe it's a better idea to raise them first.
I think currently this PR in my opinion quite complicated. It took me 2 hours to get the hang of it all. Here is what I suggest IMO to make it a bit simpler:
- There is a lot of logic related to removing labels once the buffered writer is closing. It's even a bit more complicated as some buffered writers share the same label values, so they have one child of the histogram collector for example, but many usages. It's very complicated.
BufferedWriter is used on two occasions: Pending Ack stores, and Transaction Log.
Transaction Log doesn't need that complicated logic as it creates a single Buffered Writer.
Pending Ack Stores does. Since it has a provider class, we can make this logic "applicative" and place it in the MLPendingAckStoreProvider.
How? BufferedWriter will get a BufferedWriterMetrics in the constructor. BufferedWriterMetrics will get BufferedWriterMetricsConfiguration in its constructor.
BufferedWriterMetricsConfiguration is the same as the definition you have:
labelNames, labelValues, metricsPrefix (you called it component) and enabledMetrics.
Since you said the pending ack stores will share the same labels, but won't be differentiated by the label values, you can create a single BufferedWriterMetrics instance and use it whenever you create a new Buffered writer.
When a ledger is closed, its buffered writer is closed.
BufferedWriterMetrics will be closed by its creator: MLPendingAckStoreProvider will know when to close it's the only instance since it can easily keep track or already have the number of open ack stores. TransactionLog will keep its only instance and will close it upon closing the managed ledger / buffered writer.
-
I wouldn't bother with unregistering the metric. It's only relevant when there are no pending ack stores. The only cost in this case: is 2 lines emitted in the Prometheus output (help and type) since there are no samples to print.
-
In BufferedWriter Metrics init, I would use the same optimization trick I saw at
FunctionStatsManager: step 1: create all metrics: histogram, collectors, etc. mainly supplying labels names. step 2: create the child of each collector, by supplying the label values. Save it as a variable and use it. -
In close(), just remove the labels from the collector.
-
One larger change I would do: have BufferedWriterMetrics class have an action method: batchFlushedTriggedByForceFlush(), and its arguments containing anything you need for your metrics update. hide everything inside, including the
appendistogramyou have there. -
I would get rid of the disabled static instance, and simply do nothing upon each action method in BufferedWriterMetrics if
metricsEnabledis false. Encapsulate it.
I have many more comments, but I thought it's best to discuss the big ones first.
Hi @asafm
Here is what I suggest IMO to make it a bit simpler: 1-6
I've taken care of all the suggestions, could you review this PR again?
Hi @asafm
There is a way to solve this issue by making sure we're not defining metrics twice: We know that we plan to create only one Metrics instance per metic-prefix. So in that case, both TxLog and PendingAckStoreProvider will create one with its own prefix, and that's it. No need to verify it was created before. In the event a future developer will make a mistake, it will fail in the constructor in some test right since CollectorRegistry.register() will fail on a duplicate.
That's a good way to do it, but this can make the code confuse:
- When the Transaction Log Provider opens a Transaction Log, passed the
Histogramto Transaction Log. That is OK. - When the Transaction Log close, remove the labels. That is ok too.
But the Transaction Log Provider will hold all the Collector of Txn Buffered Writer, this is confusing
Hi @asafm
I still see the comment.
/** Metrics. **/.
Already removed.
Hi @asafm
Ok. So the if metrics != null will go away as well, right?
Yes, everything works, except metrics
Hi @asafm
I don't understand this entire explanation. Why does a metric indicating you flushed due to accepting a single large record obscure anything?
These comments are mainly to describe this:
For E.g. we write these data:
// max batch size = 100, max batch records = 5, max delay = 10 milliseconds
write data_01{ size = 2}
write data_02{ size = 2}
write data_03{ size = 2}
write data_04{ size = 2}
write data_05{ size = 2}
write data_06{ size = 111}
We have two plans of metrics: include data_06 or not.
- Exclude
data_06- avg ( batch records count ) = 5
- avg ( batch size) = 10
- Include
data_06- avg ( batch records count ) = 3
- avg ( batch size) = 60
The first plan(exclude data_06) presents metrics that make more sense for adjusting the threshold.
Hi @asafm
But the Transaction Log Provider will hold all the Collector of Txn Buffered Writer; this is confusing
Can you explain this further? I didn't understand how the provider would hold all the Collectors (metrics). It should only be to a single instance of the metric class.
When MLTransactionMetadataStoreProvider initialized, we create Collector like this:
public MLTransactionMetadataStoreProvider(){
this.recordsPerBatchMetric = ...
this.batchSizeBytesMetric = ...
this.oldestRecordInBatchDelayTimeSecondsMetric = ...
this.batchFlushTriggeredByMaxRecordsMetric = ...
this.batchFlushTriggeredByMaxSizeMetric = ...
this.batchFlushTriggeredByMaxDelayMetric = ...
}
And when creating MlTransactionLogImpl, pass these Collector to MlTransactionLogImpl like this:
public class MLTransactionMetadataStoreProvider{
public TransactionMetadataStore openStore(...){
TransactionMetadataStore store = ...;
setMetrics(store);
return store;
}
private void setMetrics(TransactionMetadataStore store) {
store.recordsPerBatchMetric = this.recordsPerBatchMetric;
store.batchSizeBytesMetric = this.batchSizeBytesMetric;
store.oldestRecordInBatchDelayTimeSecondsMetric = this.oldestRecordInBatchDelayTimeSecondsMetric;
...
}
}
The MLTransactionMetadataStoreProvider will hold all the Collector of Txn Buffered Writer, this is confusing
Hi @asafm
I've taken care of all the comments, could you review this PR again?
Hi @asafm
I asked if you will remove the if metrics != null statements in the next PRs.
Yes, I will
Hi @asafm
IMO this explanation is confusing and doesn't serve what you truly want to convey. I would add the following explanation:
Good suggestion. already fixed, thanks
Hi @asafm
As I understand, this only happens once per instance of MLTransactionLogImpl. So I don't understand why you want to pass metric by metric. Something doesn't add up here. I guess my main question is: how many MLTransactionLogImpl are there in a single broker process? More than 1?
Yes, maybe more than one.
Hi @asafm
I've taken care of all the comments, could you review this PR again?
Hi @asafm
As I understand, this only happens once per instance of MLTransactionLogImpl. So I don't understand why you want to pass metric by metric. Something doesn't add up here. I guess my main question is: how many MLTransactionLogImpl are there in a single broker process? More than 1?
Yes, maybe more than one.
Ok, if more than one, then the design must change. I thought the whole idea was that you have a single instance of metrics per metric prefix. If not, after much thought I suggest the following:
abstract class BufferMetrics {
protected abstract void observeRecordsPerBatch(int)
protected abstract void incFlushTriggeredByMaxRecords(int)
}
MLTransactionMetadataStoreBufferedWriterMetrics extends BufferMetrics {
static private Histogram recordsPerBatchMetric = new Histogram.Builder()
.name("pulsar_tx_store_bufferedwriter_batch_record_count")
.labelNames(new String[]{"txCoordinatorId"})
.help("Records per batch histogram")
.buckets(RECORD_COUNT_PER_ENTRY_BUCKETS)
.register(registry));
private Histogram.Child recordsPerBatchHistogram;
public MLTransactionMetadataStoreBufferedWriterMetrics(String txCoordinatorId) {
recordsPerBatchHistogram = recordsPerBatchHistogram.labels(txCoordinatorId)
}
protected observeRecordsPerBatch(value) {
recordsPerBatchHistogram.observe(value)
}
}
The pros:
- It's explicit
- No confusing pass of label names multiple times which after 2nd time are not really used.
The cons:
- A bit awkward
Another approach which I disliked a bit, but it's still ok: Add to Pulsar Common:
class PrometheusRegistryChecker {
static defaultMetricRegistryNameToCollector = new HashMap<String, Collector>()
static Collector registerIfNotExists(collector) {}
}
Like FunctionCollectorRegistryImpl
Hi @asafm
I've taken care of all the comments, could you review this PR again?
Hi @asafm @codelipenghui
I've taken care of all the comments, could you review this PR again?
Hi @asafm
As I understand, this only happens once per instance of MLTransactionLogImpl. So I don't understand why you want to pass metric by metric. Something doesn't add up here. I guess my main question is: how many MLTransactionLogImpl are there in a single broker process? More than 1?
Yes, maybe more than one.
Ok, if more than one, then the design must change. I thought the whole idea was that you have a single instance of metrics per metric prefix. If not, after much thought I suggest the following:
abstract class BufferMetrics { protected abstract void observeRecordsPerBatch(int) protected abstract void incFlushTriggeredByMaxRecords(int) } MLTransactionMetadataStoreBufferedWriterMetrics extends BufferMetrics { static private Histogram recordsPerBatchMetric = new Histogram.Builder() .name("pulsar_tx_store_bufferedwriter_batch_record_count") .labelNames(new String[]{"txCoordinatorId"}) .help("Records per batch histogram") .buckets(RECORD_COUNT_PER_ENTRY_BUCKETS) .register(registry)); private Histogram.Child recordsPerBatchHistogram; public MLTransactionMetadataStoreBufferedWriterMetrics(String txCoordinatorId) { recordsPerBatchHistogram = recordsPerBatchHistogram.labels(txCoordinatorId) } protected observeRecordsPerBatch(value) { recordsPerBatchHistogram.observe(value) } }The pros:
- It's explicit
- No confusing pass of label names multiple times which after 2nd time are not really used.
The cons:
- A bit awkward
Another approach which I disliked a bit, but it's still ok: Add to Pulsar Common:
class PrometheusRegistryChecker { static defaultMetricRegistryNameToCollector = new HashMap<String, Collector>() static Collector registerIfNotExists(collector) {} }Like
FunctionCollectorRegistryImpl
agreed, easier is better.
@tjiuming @poorbarcode I actually found this pattern already in Pulsar. See ComponentStatsManager
Hi @asafm
ComponentStatsManager
Yes, I know it exists, but there has a package dependency problem, need to pull it into pulsar-common.
Hi @asafm @tjiuming
I have removed the static variable Collector_cache from Class TxnLogBufferedWriterMetricsStat, because I changed the design:
- Transaction Log Provider will create many instances of ML Transaction Log, each with its own new instance of Txn buffered writer.
- All ML Transaction Logs will use the same shared
TxnLogBufferedWriterMetricsStatinstance.
Same with Pending Ack Store.
After that, TxnLogBufferedWriterMetricsStats of the same metrics prefix will only create once, so the static variable Collector_cache is unnecessary.
@tjiuming @poorbarcode I actually found this pattern already in Pulsar. See
ComponentStatsManager
Why - this was only given as an example. Not relevant now since you already modified the original code.
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks
I have rebased branch-master to solve the problem of Pulsar CI broker 1
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks