PIP-215: Configurable TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView
Motivation
Currently, the Topic compaction logic implemented in TwoPhaseCompactor only compacts messages to the last one(with the same key).
Here, we want to configure Topic compaction with different strategies. For example, to support the Conflict State Resolution(Race Conditions) in PIP-192 (https://github.com/apache/pulsar/issues/16691), we need to compact messages with the first valid states.
Goal
-
Create another Topic compactor,
StrategicTwoPhaseCompactor, where we can configure a compaction strategy,TopicCompactionStrategy -
Update the
TableViewConfigurationDatato load and consider theTopicCompactionStrategywhen updating the internal key-value map inTableView. -
Add
TopicCompactionStrategyin Topic-level Policy to runStrategicTwoPhaseCompactorinstead ofTwoPhaseCompactorwhen executing compaction.
API Changes
public interface TopicCompactionStrategy<T> {
/**
* Returns the schema object for this strategy.
* @return
*/
Schema<T> getSchema();
/**
* Tests if the compaction needs to keep the left(previous message value) compared with the right(current message value) for the same key.
*
* @param prev previous message value
* @param cur current message value
* @return True if it needs to keep the prev and ignore the cur. Otherwise, False.
*/
boolean shouldKeepLeft(T prev, T cur);
/**
* Check if the merge is enabled. If enabled, it will run T merge(..).
*
* @return True if the merge is enabled.
*/
boolean isMergeEnabled();
/**
* Merges the previous message value and the cur message value.
*
* @param prev previous message value
* @param cur current message value
* @return the merged value
*/
T merge(T prev, T cur);
static TopicCompactionStrategy load(String topicCompactionStrategy) {
if (topicCompactionStrategy == null) {
return null;
}
try {
//
Class<?> clazz = Class.forName(topicCompactionStrategy);
Object instance = clazz.getDeclaredConstructor().newInstance();
return (TopicCompactionStrategy) instance;
} catch (Exception e) {
throw new IllegalArgumentException("Error when loading topic compaction strategy.", e);
}
}
}
public interface TableView<T> extends Closeable {
/**
* Performs the give action for each future entry in this map until action throws an exception.
*
* @param action The action to be performed for each entry
*/
void listen(BiConsumer<String, T> action);
public class TableViewConfigurationData implements Serializable, Cloneable {
...
+ private String topicCompactionStrategy;
public class TableViewImpl<T> implements TableView<T> {
private final TableViewConfigurationData conf;
...
+ private TopicCompactionStrategy<T> compactionStrategy;
TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
...
+ this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
public class ReaderConfigurationData<T> implements Serializable, Cloneable {
+ private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;
+ private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
public class CompactionReaderImpl<T> extends ReaderImpl<T> {
ConsumerBase<T> consumer;
private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
Schema<T> schema) {
super(client, readerConfiguration, executorProvider, consumerFuture, schema);
this.consumer = getConsumer();
}
public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
CompletableFuture<Consumer<T>> consumerFuture) {
ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
conf.setTopicName(topic);
conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
conf.setStartMessageId(MessageId.earliest);
conf.setAutoUpdatePartitions(true);
conf.setAutoUpdatePartitionsIntervalSeconds(30);
conf.setReadCompacted(true);
conf.setPoolMessages(true);
conf.setSubscriptionMode(SubscriptionMode.Durable);
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
}
...
@Override
public CompletableFuture<Message<T>> readNextAsync() {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
return receiveFuture;
}
...
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
}
}
public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
...
public <T> CompletableFuture<Long> compact(String topic,
TopicCompactionStrategy<T> strategy) {
CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>();
CompactionReaderImpl reader = CompactionReaderImpl.create(
(PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture);
return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
}
...
pulsar-admin topicPolicies set-compaction-strategy options
pulsar-admin topicPolicies get-compaction-strategy options
Implementation
# Goal 1:
- Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can configure a compaction strategy,
`TopicCompactionStrategy`
StrategicTwoPhaseCompactor will have two phases.
First Phase:
Using the CompactionReader<T>, instead of RawReader, it will iterate each message and compact messages with the same keys by following the merge() in TopicCompactionStrategy.
The CompactionReader<T> will be added to the pulsar-broker only(not in the pulsar-client).
Second Phase: The compacted messages will be written to a ledger.
# Goal 2:
- Update the `TableViewConfigurationData` to load and consider the `TopicCompactionStrategy` when updating the internal key-value map in `TableView`.
When updating the internal key-value map, it will follow the same compaction logic defined in TopicCompactionStrategy .
# Goal 3:
- Add `TopicCompactionStrategy` in Topic-level Policy to run `StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing compaction.
When running the compaction, it will look up the TopicCompactionStrategy in the Topic-level Policy and run StrategicTwoPhaseCompactor, if configured. By default, it should run TwoPhaseCompactor.
Alternatives
Why not resolve conflict by a single broker(leader broker) using two topics : un-compacted and competed(pre-filter)?
- brokers broadcast messages to the non-compacted topic first.
- only the leader broker consumes this non-compacted topic and compacts any conflicting messages. Then the leader produces compacted messages to the compacted topic(resolve conflicts by the single writer).
- brokers consume the compacted topic. (no needs to compact the messages separately)
Anything else?
No response
Raised a local-fork PR for the implementation reference: https://github.com/heesung-sn/pulsar/pull/12
Raised a PR to the apache/master branch. https://github.com/apache/pulsar/pull/18195
PIP Discussion email: https://lists.apache.org/thread/m721nc0vwzo3wxg0tv3tprfc6z7xs1tj PIP Vote email: https://lists.apache.org/thread/6y8407pw4fv21n2n0cbrvsspg5tok0h7
The issue had no activity for 30 days, mark with Stale label.