petastorm icon indicating copy to clipboard operation
petastorm copied to clipboard

make_spark_converter returns Numpy in binary serialized format.

Open apatsekin opened this issue 4 years ago • 0 comments

Hey guys! If I combine two examples from Tutorial:

  1. to create Spark dataframe with Petastorm schema
  2. then try to convert it to TF Dataset

Numpy array field during iteration of TF Dataset is returned like: b"\x93NUMPY\x01\x00v\x00{'descr': '<i4', 'fortran_order': False, 'shape': (5, 5), } which looks like serialized binary NPY format.

How to decode it properly?

import numpy as np
from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from petastorm.spark import SparkDatasetConverter, make_spark_converter

HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('test_arr', np.int32, (5, 5), NdarrayCodec(), False),
])


def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'test_arr': np.random.randint(1, 9, size=(5, 5), dtype=np.int32)}


def generate_hello_world_dataset(output_url='file:///tmp/hello_world_dataset'):
    rows_count = 10
    rowgroup_size_mb = 256
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
        rows_rdd = sc.parallelize(range(rows_count)) \
            .map(row_generator) \
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        return spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema())


spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sparkDf = generate_hello_world_dataset('file:///tmp/test_dataset.spark')
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///tmp/tf_dataset_cache')
converter = make_spark_converter(sparkDf)

with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
    for elem in dataset:
        print(elem.test_arr.numpy())

I came up with this, but not sure it's intended approach:

with converter.make_tf_dataset(batch_size=1, num_epochs=1) as dataset:
    # parsed_dataset = dataset.map(_parse_function)
    for elem in dataset:
        bytes_ = elem.test_arr.numpy()[0]
        print(NdarrayCodec().decode(None, bytes_))

apatsekin avatar Aug 06 '20 18:08 apatsekin