zio-pulsar icon indicating copy to clipboard operation
zio-pulsar copied to clipboard

Apache Pulsar client for Scala with ZIO and ZIO Streams integration.

ZIO Pulsar

CI Release Snapshot
CI Release Artifacts Snapshot Artifacts

Purely functional Scala wrapper over the official Pulsar client.

  • Type-safe (utilizes Scala type system to reduce runtime exceptions present in the official Java client)
  • Streaming-enabled (naturally integrates with ZIO Streams)
  • ZIO integrated (uses common ZIO primitives like ZIO effect and ZManaged to reduce the boilerplate and increase expressiveness)

Compatibility

ZIO Pulsar is a Scala 3 library, so it's compatible with Scala 3 applications as well as Scala 2.13.6+ (see forward compatibility for more information.

Getting started

Add the following dependency to your build.sbt file:

Scala 3

libraryDependencies += "com.github.jczuchnowski" %% "zio-pulsar" % zioPulsarVersion

Scala 2.13.6+ (sbt 1.5.x)

libraryDependencies += 
  ("com.github.jczuchnowski" %% "zio-pulsar" % zioPulsarVersion).cross(CrossVersion.for2_13Use3)

ZIO Pulsar also needs ZIO and ZIO Streams to be provided:

libraryDependencies ++= Seq(
  "dev.zio" %% "zio"         % zioVersion,
  "dev.zio" %% "zio-streams" % zioVersion
)

Simple example of consumer and producer:

import org.apache.pulsar.client.api.{ PulsarClientException, Schema }
import zio._
import zio.pulsar._

object Main extends App:

  val pulsarClient = PulsarClient.live("localhost", 6650)

  val topic = "my-topic"

  val app: ZManaged[PulsarClient, PulsarClientException, Unit] =
    for
      builder  <- ConsumerBuilder.make(Schema.STRING).toManaged_
      consumer <- builder
                    .topic(topic)
                    .subscription(
                      Subscription(
                        "my-subscription", 
                        SubscriptionType.Shared))
                    .build
      producer <- Producer.make(topic, Schema.STRING)
      _        <- producer.send("Hello!").toManaged_
      m        <- consumer.receive.toManaged_
    yield ()
    
  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    app.provideCustomLayer(pulsarClient).useNow.exitCode

Running examples locally

To try the examples from the examples subproject you'll need a Pulsar instance running locally. You can set one up using docker:

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  --network pulsar \
  apachepulsar/pulsar:2.7.0 \
  bin/pulsar standalone