io
io copied to clipboard
Pubsub performance extremely slow
StreamDataset.from_pubsub is about 50-100x slower than a simple multi-threaded implementation with the pubsub python SDK. I can easily get 100 MB/s throughput with the latter, but only get 2 MB/s with tfio. Is this expected?
I loaded up a topic with 1 MB messages and ran the following script on a google cloud VM:
import tensorflow_io as tfio
import tensorflow as tf
import time
import queue
import os
from google.cloud import pubsub_v1
print("tfio:", tfio.version.VERSION)
# tfio: 0.24.0
print("tensorflow:", tf.version.VERSION)
# tensorflow: 2.8.0
q = queue.Queue(maxsize=100)
def callback(message):
message.ack()
q.put(message.data)
def _gen(subscription):
with pubsub_v1.SubscriberClient() as subscriber:
future = subscriber.subscribe(subscription, callback)
old = time.time()
while True:
a = q.get()
yield a
def from_pubsub_via_python_sdk(sub):
return tf.data.Dataset.from_generator(
lambda: _gen(sub), output_signature=tf.TensorSpec(shape=(), dtype=tf.string)
)
def from_pubsub_via_tfio(sub):
return (
tfio.experimental.IODataset.stream()
.from_pubsub(
"projects/vcm-ml/subscriptions/tensorflow", endpoint="pubsub.googleapis.com"
)
.map(lambda x: x.data)
)
def benchmark(ds):
n = 0
old = -1
for a in ds:
# start time inside loop since there is some initial spinup
if old == -1:
old = time.time()
n += len(a.numpy())
if n >= 50_000_000:
new = time.time()
print(new - old, "sec", n / (new - old) / 2 ** 20, "Mb/s", len(a.numpy()))
break
SUB = "projects/myproj/subscriptions/mysub"
benchmark(from_pubsub_via_tfio(SUB))
# 16.859482049942017 sec 2.8283025349806934 Mb/s 1000000
benchmark(from_pubsub_via_python_sdk(SUB))
# 0.44029998779296875 sec 108.29824470204987 Mb/s 1000000
I tried parallelizing the tfio version using tf.data.Dataset.range(10).flat_map(lambda x: from_pubsub_tfio(sub), num_parallel_calls=10) but it wasn't any faster.