io icon indicating copy to clipboard operation
io copied to clipboard

Pubsub performance extremely slow

Open nbren12 opened this issue 3 years ago • 0 comments

StreamDataset.from_pubsub is about 50-100x slower than a simple multi-threaded implementation with the pubsub python SDK. I can easily get 100 MB/s throughput with the latter, but only get 2 MB/s with tfio. Is this expected?

I loaded up a topic with 1 MB messages and ran the following script on a google cloud VM:

import tensorflow_io as tfio
import tensorflow as tf
import time
import queue
import os
from google.cloud import pubsub_v1

print("tfio:", tfio.version.VERSION)
# tfio: 0.24.0
print("tensorflow:", tf.version.VERSION)
# tensorflow: 2.8.0


q = queue.Queue(maxsize=100)


def callback(message):
    message.ack()
    q.put(message.data)


def _gen(subscription):
    with pubsub_v1.SubscriberClient() as subscriber:
        future = subscriber.subscribe(subscription, callback)
        old = time.time()
        while True:
            a = q.get()
            yield a


def from_pubsub_via_python_sdk(sub):
    return tf.data.Dataset.from_generator(
        lambda: _gen(sub), output_signature=tf.TensorSpec(shape=(), dtype=tf.string)
    )


def from_pubsub_via_tfio(sub):
    return (
        tfio.experimental.IODataset.stream()
        .from_pubsub(
            "projects/vcm-ml/subscriptions/tensorflow", endpoint="pubsub.googleapis.com"
        )
        .map(lambda x: x.data)
    )


def benchmark(ds):
    n = 0
    old = -1
    for a in ds:
        # start time inside loop since there is some initial spinup
        if old == -1:
            old = time.time()
        n += len(a.numpy())
        if n >= 50_000_000:
            new = time.time()
            print(new - old, "sec", n / (new - old) / 2 ** 20, "Mb/s", len(a.numpy()))
            break


SUB = "projects/myproj/subscriptions/mysub"

benchmark(from_pubsub_via_tfio(SUB))
# 16.859482049942017 sec 2.8283025349806934 Mb/s 1000000
benchmark(from_pubsub_via_python_sdk(SUB))
# 0.44029998779296875 sec 108.29824470204987 Mb/s 1000000

I tried parallelizing the tfio version using tf.data.Dataset.range(10).flat_map(lambda x: from_pubsub_tfio(sub), num_parallel_calls=10) but it wasn't any faster.

nbren12 avatar Mar 21 '22 07:03 nbren12