fastparquet icon indicating copy to clipboard operation
fastparquet copied to clipboard

How to handle Nullable fields from Spark parquet (ValueError: cannot convert float NaN to integer)

Open rcammisola-perform opened this issue 6 years ago • 39 comments

Hi,

I've found mentions in the documentation for dealing with NULL/NaN when writing parquet files using fastparquet but very little with regard to reading parquet files. I'm trying to read a file that was written in Spark and has Nullable fields - I keep getting the following error when I want to read the whole file:

In [12]: pf = ParquetFile('historic_919161/part-00000-28ad9cb0-dcd2-4355-ae6c-ac3c7472c475.snappy.parquet', verify=True).to_pandas()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-12-08b5f59eff8e> in <module>()
----> 1 pf = ParquetFile('historic_919161/part-00000-28ad9cb0-dcd2-4355-ae6c-ac3c7472c475.snappy.parquet', verify=True).to_pandas()

/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in to_pandas(self, columns, categories, filters, index)
    426                              for (name, v) in views.items()}
    427                     self.read_row_group(rg, columns, categories, infile=f,
--> 428                                         index=index, assign=parts)
    429                     start += rg.num_rows
    430         else:

/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in read_row_group(self, rg, columns, categories, infile, index, assign)
    258                 infile, rg, columns, categories, self.schema, self.cats,
    259                 self.selfmade, index=index, assign=assign,
--> 260                 scheme=self.file_scheme)
    261         if ret:
    262             return df

/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade, index, assign, scheme)
    343         raise RuntimeError('Going with pre-allocation!')
    344     read_row_group_arrays(file, rg, columns, categories, schema_helper,
--> 345                           cats, selfmade, assign=assign)
    346 
    347     for cat in cats:

/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, selfmade, assign)
    320         read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
    321                  selfmade=selfmade, assign=out[name],
--> 322                  catdef=out.get(name+'-catdef', None))
    323 
    324         if _is_map_like(schema_helper, column):

/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_col(column, schema_helper, infile, use_cat, grab_dict, selfmade, assign, catdef)
    256             max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
    257             part = assign[num:num+len(defi)]
--> 258             part[defi != max_defi] = my_nan
    259             if d and not use_cat:
    260                 part[defi == max_defi] = dic[val]

ValueError: cannot convert float NaN to integer

Is there a way to coerce or specify the dtype of columns to try and avoid forcing NaN into an integer field? Or is the best practice to pre-process the file and change the datatype or fill NA values?

Huge apologies if this issue is addressed somewhere but any help would be hugely appreciated

rcammisola-perform avatar Sep 11 '18 10:09 rcammisola-perform

fastparquet can, in theory, handle nullable int fields - they should become float columns in pandas. So something different is going on here. Can you print the schema according to spark, and the following from the python side:

pf = ParquetFile('...', verify=True)
print(pf.schema.text)

Do you know, or can you find out which column is causing the problem?

martindurant avatar Sep 11 '18 13:09 martindurant

Thanks for the quick response. Apologies I haven't been able to get information back to you faster but I can't disclose the real parquet schema I'm using so I'm going to have to rename the fields.

If it's any help, though, it only seems to be happening when I try to use fields that contain NULLs which I've checked with:

pf.statistics['null_count']

rcammisola-perform avatar Sep 13 '18 10:09 rcammisola-perform

Here's the Spark schema:

root
 |-- current_phase: integer (nullable = true)
 |-- bx: decimal(10,2) (nullable = true)
 |-- by: decimal(10,2) (nullable = true)
 |-- bz: decimal(10,2) (nullable = true)
 |-- bspeed: decimal(10,2) (nullable = true)
 |-- version_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- inprogress: integer (nullable = true)
 |-- state: integer (nullable = true)
 |-- target_id: integer (nullable = true)
 |-- group_id: integer (nullable = true)
 |-- individual_id: integer (nullable = true)
 |-- x: decimal(10,2) (nullable = true)
 |-- y: decimal(10,2) (nullable = true)
 |-- z: decimal(10,2) (nullable = true)
 |-- target_properties: map (nullable = true)
 |    |-- key: integer
 |    |-- value: float (valueContainsNull = true)
 |-- start_id: integer (nullable = true)
 |-- start_phase: integer (nullable = true)
 |-- end_id: integer (nullable = true)
 |-- end_phase: integer (nullable = true)
 |-- x_size: decimal(10,2) (nullable = true)
 |-- y_size: decimal(10,2) (nullable = true)
 |-- min: integer (nullable = true)
 |-- bx_norm: decimal(38,18) (nullable = true)
 |-- by_norm: decimal(38,18) (nullable = true)
 |-- x_norm: decimal(38,18) (nullable = true)
 |-- y_norm: decimal(38,18) (nullable = true)
 |-- id_in_phase: integer (nullable = true)

pf.schema.text:

- spark_schema: 
| - current_phase: INT32, OPTIONAL
| - bx: INT64, DECIMAL, OPTIONAL
| - by: INT64, DECIMAL, OPTIONAL
| - bz: INT64, DECIMAL, OPTIONAL
| - bspeed: INT64, DECIMAL, OPTIONAL
| - version_id: INT32, OPTIONAL
| - id: INT32, OPTIONAL
| - inprogress: INT32, OPTIONAL
| - state: INT32, OPTIONAL
| - target_id: INT32, OPTIONAL
| - group_id: INT32, OPTIONAL
| - individual_id: INT32, OPTIONAL
| - x: INT64, DECIMAL, OPTIONAL
| - y: INT64, DECIMAL, OPTIONAL
| - z: INT64, DECIMAL, OPTIONAL
| - target_properties: MAP, OPTIONAL
|   - key_value: REPEATED
|   | - key: INT32, REQUIRED
|     - value: FLOAT, OPTIONAL
| - start_id: INT32, OPTIONAL
| - start_phase: INT32, OPTIONAL
| - end_id: INT32, OPTIONAL
| - end_phase: INT32, OPTIONAL
| - x_size: INT64, DECIMAL, OPTIONAL
| - y_size: INT64, DECIMAL, OPTIONAL
| - min: INT32, OPTIONAL
| - bx_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - by_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - x_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - y_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
  - id_in_phase: INT32, OPTIONAL

The columns that appear to be causing the problems are these:

| - start_id: INT32, OPTIONAL
| - start_phase: INT32, OPTIONAL
| - end_id: INT32, OPTIONAL
| - end_phase: INT32, OPTIONAL
| - x_size: INT64, DECIMAL, OPTIONAL
| - y_size: INT64, DECIMAL, OPTIONAL
| - min: INT32, OPTIONAL

So is it related to having NULLs in integer columns? Other columns don't have NULL values

rcammisola-perform avatar Sep 14 '18 10:09 rcammisola-perform

Please try using the columns= keyword to see if it's one particular column showing the problem, or all. I notice you have INT/DECIMAL types in here, which are pretty rare.

martindurant avatar Sep 14 '18 13:09 martindurant

on my reduced data set (approx 200k rows) start_id was the only column that caused problems but id_in_phase caused a lot of problems on the full data (4 million rows)

rcammisola-perform avatar Sep 14 '18 15:09 rcammisola-perform

@martindurant I am currently in a similar situation where I am trying to load a dataframe create by spark with a lot of nullable columns and I get the

ValueError: cannot convert float NaN to integer

@rcammisola-perform did you figure out a way to overcome this?

PGryllos avatar Nov 04 '18 16:11 PGryllos

@PGryllos , since I don't know what the root cause is here, I'm afraid I can only ask you to go through investigative steps like the ones above, to try to find out what's going on.

martindurant avatar Nov 04 '18 17:11 martindurant

@martindurant I can see that if I try to load specific columns with small chunks of the data it succeeds most of the time; so I assume there are sparse problems with the input data; not sure if it's coming from missing values or if I have to enable some explicit type conversion. Do you have any pointers for what how I can look into it?

PGryllos avatar Nov 04 '18 18:11 PGryllos

@martindurant if I try using the pyarrow engine I get an NotImplementedError coming from a call to to_pandas_dtype;

looks like this

NotImplementedError: struct<true_secs: int32, tz: int32, os: string>

PGryllos avatar Nov 04 '18 18:11 PGryllos

Even if there are no NULLs in a small section of the data, the dtype of the column will be float if there could be NULLs anywhere along the column, see api.ParquetFile._dtypes: any any row-group doesn't have statistics, or has statistics but doesn't have num_nulls, or num-nulls is ever non-zero, then the whole column is float. So it seems that either this logic is failing, or not running at all. A few well-placed debugger break-points should figure out which.

martindurant avatar Nov 04 '18 18:11 martindurant

So your data is nested? fastparquet explicitly does not handle that, nor does pyarrow (although they should!). Since the struct is simple, you may want to recode on the spark side to make three separate columns instead.

martindurant avatar Nov 04 '18 18:11 martindurant

oops :/ didn't realise that. So there is no way to read in nested structures? Unfortunately restructuring spark is not an option. The schema is fairly big and with a lot of hierarchy; (The construction is part of ETL that takes in collections of thrift objects and converts them automatically to parquet)

does that mean that no parquet with such structure can be loaded in a dask dataframe?

PGryllos avatar Nov 04 '18 18:11 PGryllos

the doc here seems to state that fastparquet can read nested schemas https://fastparquet.readthedocs.io/en/latest/details.html#reading-nested-schema

PGryllos avatar Nov 04 '18 18:11 PGryllos

No, there currently isn't. This "flat" case would be doable: you'll notice that the .schema attribute correctly imputes the nested nature and gives the columns names of the form 'struct.field', but it seems that somewhere this is falling down. The trouble is, that there are multiple levels on which you can have NULL: either a component of a struct in a row is NULL or the whole struct is NULL. That's probably why the dtype has been set wrong (you have one type of NULL and not the other).

It probably isn't a massive amount of work to cope with this kind of data, but it is formally out of scope for fastparquet, and I don't intend to work on it. I am a little surprised that pyarrow doesn't load the data correctly. You may want to try the highly experimental library awkward array.

the doc here seems to state that fastparquet can read nested schemas

It says that fastparquet can read nested schema that fit the very specific forms presented. Yours maybe meets this.

If you put float columns for ALL integer columns in api.ParquetFile._dtypes:

if dt.kind in ['i', 'b']:
    dtype[col] = np.dtype('f8')

does loading work?

martindurant avatar Nov 04 '18 18:11 martindurant

@martindurant it does actually! still a bit confused about what exactly this means.

I am keen on putting in a little work myself to make our ingestion work with Dask; I have been investigating Dask the past week as part of my effort to unite the repos that we currently have to facilitate the training of models under a single easily versioned and reproducible data flow; without the dependency of spark; Spark ftm makes the development effort cumbersome and not contained easily in one place. So If I can find a way to feed our etl results directly into our training pipelines that would be already a nice benefit. The results of our ETL pipelines unfortunately are complicated parquet structures (:

PGryllos avatar Nov 04 '18 18:11 PGryllos

In that case, it seems like the logic change is needed only in the body of that loop: not just to look for the possibility of NULLs in the column itself, but also in the struct that contains it.

martindurant avatar Nov 04 '18 19:11 martindurant

how do you propose I go from here? makes sense to investigate and open a pr?

btw big thanks for taking the time :)

PGryllos avatar Nov 04 '18 19:11 PGryllos

I think that within the if dt.kind in ['i', 'b']: block, if it is a nested field, then int should go to float if the parent in the schema is OPTIONAL. That's probably safest, I'm not sure whether we can expect the parent to have any statistics to look up.

Maybe it's enough to call if not self.schema.is_required(col), worth trying. Otherwise you'd have to extract the column's schema element and the parent's from self.schema. That depends on the dotted naming convention, struct.field, maybe too simplistic.

martindurant avatar Nov 04 '18 19:11 martindurant

@martindurant what I understand is happening is that for every column this loop checks whether the column may have null values based on heuristic checks on meta_data and if it does it converts the column to float. In nested cases the checks fail for some reason ? how does it relate to the parent being optional? (In our case the parent of the whole structure is optional along with many other fields in the different parts of the schema)

PGryllos avatar Nov 04 '18 19:11 PGryllos

the chunk.meta_data.statistics does it refer to the parent of the chunk?

also the conversion to float is it generic first step of dealing with incoming null values?

PGryllos avatar Nov 04 '18 19:11 PGryllos

In our case the parent of the whole structure is optional

This is exactly the problem. Maybe the int field is required, and you never have a situation where struct.field=NULL - these are the conditions that the block of code checks for when looking at the statistics. However, you maybe can still have struct=NULL. So the REQUIRED/OPTIONAL status of the parent is the difference here. Actually, it occurs to me that you could also check (chunk.meta_data.num_values == rg.num_values). If they equal for all chunks, no NULLs.

martindurant avatar Nov 04 '18 19:11 martindurant

the chunk.meta_data.statistics does it refer to the parent of the chunk?

No, to the chunk itself - we have no statistics about the parent struct

also the conversion to float is it generic first step

It is the only thing we can do with pandas, which has no way (yet) to represent an int column with missing values.

martindurant avatar Nov 04 '18 19:11 martindurant

Maybe the int field is required, and you never have a situation where ...

ah yea that's definitely happening; the parent is optional but nested fields are required in several layers of the schema.

Actually, it occurs to me that you could also check (chunk.meta_data.num_values == rg.num_values). If they equal for all chunks, no NULLs.

you mean rg.num_rows? this check doesn't work . Looking into the previous suggestions. I am thinking when the parent is Null then there is no chunks to iterate over probably right?

PGryllos avatar Nov 04 '18 19:11 PGryllos

@martindurant the

                if not self.schema.is_required(col):
                    dtype[col] = np.dtype('f8')

seems to work

PGryllos avatar Nov 04 '18 19:11 PGryllos

you mean rg.num_rows

yes, sorry.

I'll have to think a little bit about whether the simple fix is appropriate or whether it's overly pessimistic. In any case, the three sizes of float in the if num_nulls: block should still apply, since there's no need to use float64 for bools-with-NULL or int32-with-null.

You would be very welcome to submit this as a PR, if you like, and ideally include a small file in the test-data folder which demonstrates the problem before the PR and the fix, with an extra test in tests/test_api.py.

martindurant avatar Nov 04 '18 20:11 martindurant

I will gladly take the time to do it;

whether the simple fix is appropriate or whether it's overly pessimistic.

Seems a bit pessimistic indeed :/ Optional columns may be frequent without necessarily bearing null data. To me it's still not clear why the null struct is not captured in the statistics

also I don't understand the check:

                    if chunk.meta_data.statistics.null_count:
                        num_nulls = True
                        break

since the exact above check checks for null_count == None this is only checking if null_count == 0. Shouldn't it be doing the exact opposite? check if null count is not zero?

PGryllos avatar Nov 04 '18 20:11 PGryllos

null_count is None is not the same as == 0: it means null count is not given, so we cannot assume it is zero.

martindurant avatar Nov 04 '18 20:11 martindurant

yea I understand that; what I mean is that they way the check above is structure it's explicitly checking for count == 0. which is counter intuitive to me. don't we want to apply the conversion if null values are found?

PGryllos avatar Nov 04 '18 20:11 PGryllos

it's explicitly checking for count == 0.

No, it checks for None, i.e., information unavailable. If null_count were 0, it would not trigger any of the if-conditions.

martindurant avatar Nov 04 '18 20:11 martindurant

uh sorry excuse my confusion (here is already very late :P ) this check indeed checks for not None and for not 0.

if it is only checking for None then what is the purpose the exact above check

                    if chunk.meta_data.statistics.null_count is None:
                        num_nulls = True
                        break

PGryllos avatar Nov 04 '18 20:11 PGryllos