apache-pulsar-cheat-sheet
apache-pulsar-cheat-sheet copied to clipboard
This cheat sheet is filled with some handy tips, commands and code snippets to get you streaming data using Apache Pulsar in no time!
= Cheat Sheet for Apache Pulsar 🚀 :toc: :toc-placement!:
This cheat sheet is filled with some handy tips, commands and code snippets to get you streaming data using Apache Pulsar in no time!
Of course, this list isn't exhaustive at all. But contributing to this project is easy! Just send a pull request and make the list growth! Any feedback and bug reports are also greatly appreciated!
Did this list help you? Please ⭐ this repository to say 🙏 to the contributors!
toc::[]
== Use Pulsar in Docker
[source,bash]
$ docker run -it
-p 6650:6650
-p 8080:8080
--mount source=pulsardata,target=/pulsar/data
--mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar:latest
bin/pulsar standalone
== Basic CLI Commands
=== Create a tenant
[source,bash]
$ pulsar-admin tenants create demo
=== Create a namespace
[source,bash]
$ pulsar-admin namespaces create demo/tests
=== Create a partitioned-topic with defaults tenant/namespace (default/public)
[source,bash]
$ pulsar-admin topics create-partitioned-topic cheat_sheet_topic --partitions 3
Note: by default a topic is persistent, i.e. messages are persisted to storage nodes (bookies)
=== Create a partitioned-topic for a specific tenant and namespace.
[source,bash]
$ pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/cheat_sheet_topic --partitions 3
=== List partitioned-topics
[source,bash]
$ pulsar-admin topics list-partitioned-topics public/default
=== Get stats about a topic
[source,bash]
$ pulsar-admin topics stats persistent://public/default/cheat_sheet_topic
=== Produce messages
[source,bash]
$ pulsar-client produce cheat_sheet_topic --messages "first message, second message, third message"
=== Consume messages
[source,bash]
$ pulsar-client consume -s "my-subscription" cheat_sheet_topic -n 0 -p Earliest
=== Reset cursor to a specific time
[source,bash]
$ pulsar-admin persistent reset-cursor persistent://public/default/cheat_sheet_topic -s my-subscription --time '1d'
== Client API
=== Dependencies
==== Java (using Maven) [source,xml]
=== Client: Create basic PulsarClient
==== Java
[source,java]
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://broker:6650") .build();
=== Producer: Create basic Producer
==== Java
[source,java]
Producer<String> producer = client.newProducer(Schema.STRING) .topic("cheat_sheet_topic") .create();
=== Producer: Create Producer with RoundRobin Routing Mode
==== Java
[source,java]
Producer<String> producer = client.newProducer(Schema.STRING) .topic("cheat_sheet_topic") .hashingScheme(HashingScheme.Murmur3_32Hash) // default is Java.hashCode() .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) .create();
=== Producer: Create Producer with Single Partition Routing Mode
If no key is provided on the message, the producer will randomly pick one single partition and publish all the messages into that partition.
==== Java
[source,java]
Producer<String> producer = client.newProducer(Schema.STRING) .topic("cheat_sheet_topic") .hashingScheme(HashingScheme.Murmur3_32Hash) // default is Java.hashCode() .messageRoutingMode(MessageRoutingMode.SinglePartition) .create();
=== Producer: Create Producer with Custom Router
==== Java [source,java]
Producer<String> producer = client.newProducer(Schema.STRING) .topic("cheat_sheet_topic") .messageRoutingMode(MessageRoutingMode.CustomPartition) .messageRouter(new MessageRouter() { @Override public int choosePartition(Message<?> msg, TopicMetadata metadata) { String key = msg.getProperty("routing_key"); return MathUtils.signSafeMod(Murmur3_32Hash.getInstance().makeHash(key), metadata.numPartitions()); } }) .create();
=== Producer: Create Batching Producer
==== Java
[source,java]
Producer<String> producer = client.newProducer(Schema.STRING) .topic("cheat_sheet_topic") .enableBatching(true) .batchingMaxBytes(5 * 1024 * 1024) // 5MB .batchingMaxPublishDelay(200, TimeUnit.MILLISECONDS) .blockIfQueueFull(true) .sendTimeout(30, TimeUnit.SECONDS) .compressionType(CompressionType.ZSTD) .batcherBuilder(BatcherBuilder.KEY_BASED) .hashingScheme(HashingScheme.Murmur3_32Hash) .create();
=== Consumer: Create a durable Consumer with Key Shared subscription
==== Java [source,java]
try(Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic("cheat_sheet_topic") .subscriptionName("cheatSeetsubscription") .subscriptionMode(SubscriptionMode.Durable) .subscriptionType(SubscriptionType.Key_Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe() ) {
while (true) {
Message<String> message = consumer.receive();
try {
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
message.getKey(),
message.getValue(),
message.getTopicName(),
message.getMessageId().toString());
consumer.acknowledge(message);
} catch (Exception e) {
// Failed to process message, mark it for redelivery
consumer.negativeAcknowledge(message);
}
}
}
=== Consumer: Read a partitioned-topic from the beginning to last published message.
==== Java [source,java]
// Create a PulsarClient PulsarClient client = ...
// List all partitions for topic List<String> topics = client.getPartitionsForTopic("test_hello").get();
// Create as many readers as topic-partitions List<CompletableFuture<Reader<String>>> readers = topics.stream() .map(topic -> client.newReader(Schema.STRING) .topic(topic) .startMessageId(MessageId.earliest) .createAsync() ).collect(Collectors.toList());
// Create a fixed-sized Thread pool. ExecutorService service = Executors.newFixedThreadPool(readers.size());
// Submit one task for each reader for (CompletableFuture<Reader<String>> future : readers) { service.submit(() -> { try (Reader<String> reader = future.get()) { while (reader.hasMessageAvailable()) { Message<String> message = reader.readNext(); System.out.printf( "Message received: key=%s, value=%s, topic=%s, id=%s%n", message.getKey(), message.getValue(), message.getTopicName(), message.getMessageId().toString()); } System.err.printf("[%s]No message available for topic %s %n", Thread.currentThread().getName(), reader.getTopic()); } catch (IOException ignore) { } catch (Exception e) { throw new RuntimeException("Cannot get reader", e); } }); } service.shutdown(); service.awaitTermination(5, TimeUnit.MINUTES); client.close();
=== Consumer: Reset consumer subscription to either Earliest or Latest
==== Java [source,java]
public void resetSubscriptionOffsetsTo(final Consumer<?> consumer, final SubscriptionInitialPosition strategy) throws PulsarClientException { Objects.requireNonNull(consumer, "consumer cannot be null"); Objects.requireNonNull(strategy, "strategy cannot be null"); System.out.printf( "Resetting partition %s for subscription %s to %s position %n", consumer.getTopic(), consumer.getSubscription(), strategy ); consumer.seek(strategy == SubscriptionInitialPosition.Earliest ? MessageId.earliest : MessageId.latest); }
Note: this operation can only be done on non-partitioned topics.
== Admin API
=== Dependencies
==== Java (using Maven) [source,xml]
=== Admin: Create basic PulsarAdmin
==== Java [source,java]
PulsarAdmin admin = PulsarAdmin .builder() .serviceHttpUrl("http://localhost:8080") .build();
=== Admin: Check if topic exists
==== Java [source,java]
public boolean topicExists(final PulsarAdmin admin, final String topicName) throws PulsarAdminException { int partitionNum = admin.topics().getPartitionedTopicMetadata(topicName).partitions; if (partitionNum == 0) { try { admin.topics().getStats(topicName); } catch (PulsarAdminException.NotFoundException e) { return false; } } return true; }
=== Admin: Check if topic is partitioned
==== Java [source,java]
public boolean isTopicPartitioned(final PulsarAdmin admin, final String topicName) throws PulsarAdminException { return admin.topics().getPartitionedTopicMetadata(topicName).partitions > 0; }
=== Admin: Create topic
==== Java [source,java]
public void createTopic(final PulsarAdmin admin, final String topicName, final int defaultPartitionNum) throws PulsarAdminException { if (defaultPartitionNum > 0) admin.topics().createPartitionedTopic(topicName, defaultPartitionNum); else admin.topics().createNonPartitionedTopic(topicName); }
=== Admin: Delete a topic
==== Java [source,java]
public void deleteTopic(final PulsarAdmin admin, final String topicName) throws PulsarAdminException { if (isTopicPartitioned(admin, topic)) admin.topics().deletePartitionedTopic(topicName, true); else admin.topics().delete(topicName, true); }
== License Copyright 2020 StreamThoughts.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.