petastorm
petastorm copied to clipboard
make_spark_converter returns Numpy in binary serialized format.
Hey guys! If I combine two examples from Tutorial:
- to create Spark dataframe with Petastorm schema
- then try to convert it to TF Dataset
Numpy array field during iteration of TF Dataset is returned like:
b"\x93NUMPY\x01\x00v\x00{'descr': '<i4', 'fortran_order': False, 'shape': (5, 5), }
which looks like serialized binary NPY format.
How to decode it properly?
import numpy as np
from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.spark import SparkDatasetConverter, make_spark_converter
HelloWorldSchema = Unischema('HelloWorldSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('test_arr', np.int32, (5, 5), NdarrayCodec(), False),
])
def row_generator(x):
"""Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
return {'id': x,
'test_arr': np.random.randint(1, 9, size=(5, 5), dtype=np.int32)}
def generate_hello_world_dataset(output_url='file:///tmp/hello_world_dataset'):
rows_count = 10
rowgroup_size_mb = 256
with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count)) \
.map(row_generator) \
.map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
return spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema())
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sparkDf = generate_hello_world_dataset('file:///tmp/test_dataset.spark')
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///tmp/tf_dataset_cache')
converter = make_spark_converter(sparkDf)
with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
for elem in dataset:
print(elem.test_arr.numpy())
I came up with this, but not sure it's intended approach:
with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
# parsed_dataset = dataset.map(_parse_function)
for elem in dataset:
bytes_ = elem.test_arr.numpy()[0]
print(NdarrayCodec().decode(None, bytes_))