fastparquet
fastparquet copied to clipboard
How to handle Nullable fields from Spark parquet (ValueError: cannot convert float NaN to integer)
Hi,
I've found mentions in the documentation for dealing with NULL/NaN when writing parquet files using fastparquet but very little with regard to reading parquet files. I'm trying to read a file that was written in Spark and has Nullable fields - I keep getting the following error when I want to read the whole file:
In [12]: pf = ParquetFile('historic_919161/part-00000-28ad9cb0-dcd2-4355-ae6c-ac3c7472c475.snappy.parquet', verify=True).to_pandas()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-12-08b5f59eff8e> in <module>()
----> 1 pf = ParquetFile('historic_919161/part-00000-28ad9cb0-dcd2-4355-ae6c-ac3c7472c475.snappy.parquet', verify=True).to_pandas()
/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in to_pandas(self, columns, categories, filters, index)
426 for (name, v) in views.items()}
427 self.read_row_group(rg, columns, categories, infile=f,
--> 428 index=index, assign=parts)
429 start += rg.num_rows
430 else:
/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/api.pyc in read_row_group(self, rg, columns, categories, infile, index, assign)
258 infile, rg, columns, categories, self.schema, self.cats,
259 self.selfmade, index=index, assign=assign,
--> 260 scheme=self.file_scheme)
261 if ret:
262 return df
/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade, index, assign, scheme)
343 raise RuntimeError('Going with pre-allocation!')
344 read_row_group_arrays(file, rg, columns, categories, schema_helper,
--> 345 cats, selfmade, assign=assign)
346
347 for cat in cats:
/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, selfmade, assign)
320 read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
321 selfmade=selfmade, assign=out[name],
--> 322 catdef=out.get(name+'-catdef', None))
323
324 if _is_map_like(schema_helper, column):
/USERPATH/anaconda/lib/python2.7/site-packages/fastparquet/core.pyc in read_col(column, schema_helper, infile, use_cat, grab_dict, selfmade, assign, catdef)
256 max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
257 part = assign[num:num+len(defi)]
--> 258 part[defi != max_defi] = my_nan
259 if d and not use_cat:
260 part[defi == max_defi] = dic[val]
ValueError: cannot convert float NaN to integer
Is there a way to coerce or specify the dtype of columns to try and avoid forcing NaN into an integer field? Or is the best practice to pre-process the file and change the datatype or fill NA values?
Huge apologies if this issue is addressed somewhere but any help would be hugely appreciated
fastparquet can, in theory, handle nullable int fields - they should become float columns in pandas. So something different is going on here. Can you print the schema according to spark, and the following from the python side:
pf = ParquetFile('...', verify=True)
print(pf.schema.text)
Do you know, or can you find out which column is causing the problem?
Thanks for the quick response. Apologies I haven't been able to get information back to you faster but I can't disclose the real parquet schema I'm using so I'm going to have to rename the fields.
If it's any help, though, it only seems to be happening when I try to use fields that contain NULLs which I've checked with:
pf.statistics['null_count']
Here's the Spark schema:
root
|-- current_phase: integer (nullable = true)
|-- bx: decimal(10,2) (nullable = true)
|-- by: decimal(10,2) (nullable = true)
|-- bz: decimal(10,2) (nullable = true)
|-- bspeed: decimal(10,2) (nullable = true)
|-- version_id: integer (nullable = true)
|-- id: integer (nullable = true)
|-- inprogress: integer (nullable = true)
|-- state: integer (nullable = true)
|-- target_id: integer (nullable = true)
|-- group_id: integer (nullable = true)
|-- individual_id: integer (nullable = true)
|-- x: decimal(10,2) (nullable = true)
|-- y: decimal(10,2) (nullable = true)
|-- z: decimal(10,2) (nullable = true)
|-- target_properties: map (nullable = true)
| |-- key: integer
| |-- value: float (valueContainsNull = true)
|-- start_id: integer (nullable = true)
|-- start_phase: integer (nullable = true)
|-- end_id: integer (nullable = true)
|-- end_phase: integer (nullable = true)
|-- x_size: decimal(10,2) (nullable = true)
|-- y_size: decimal(10,2) (nullable = true)
|-- min: integer (nullable = true)
|-- bx_norm: decimal(38,18) (nullable = true)
|-- by_norm: decimal(38,18) (nullable = true)
|-- x_norm: decimal(38,18) (nullable = true)
|-- y_norm: decimal(38,18) (nullable = true)
|-- id_in_phase: integer (nullable = true)
pf.schema.text:
- spark_schema:
| - current_phase: INT32, OPTIONAL
| - bx: INT64, DECIMAL, OPTIONAL
| - by: INT64, DECIMAL, OPTIONAL
| - bz: INT64, DECIMAL, OPTIONAL
| - bspeed: INT64, DECIMAL, OPTIONAL
| - version_id: INT32, OPTIONAL
| - id: INT32, OPTIONAL
| - inprogress: INT32, OPTIONAL
| - state: INT32, OPTIONAL
| - target_id: INT32, OPTIONAL
| - group_id: INT32, OPTIONAL
| - individual_id: INT32, OPTIONAL
| - x: INT64, DECIMAL, OPTIONAL
| - y: INT64, DECIMAL, OPTIONAL
| - z: INT64, DECIMAL, OPTIONAL
| - target_properties: MAP, OPTIONAL
| - key_value: REPEATED
| | - key: INT32, REQUIRED
| - value: FLOAT, OPTIONAL
| - start_id: INT32, OPTIONAL
| - start_phase: INT32, OPTIONAL
| - end_id: INT32, OPTIONAL
| - end_phase: INT32, OPTIONAL
| - x_size: INT64, DECIMAL, OPTIONAL
| - y_size: INT64, DECIMAL, OPTIONAL
| - min: INT32, OPTIONAL
| - bx_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - by_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - x_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
| - y_norm: FIXED_LEN_BYTE_ARRAY, DECIMAL, OPTIONAL
- id_in_phase: INT32, OPTIONAL
The columns that appear to be causing the problems are these:
| - start_id: INT32, OPTIONAL
| - start_phase: INT32, OPTIONAL
| - end_id: INT32, OPTIONAL
| - end_phase: INT32, OPTIONAL
| - x_size: INT64, DECIMAL, OPTIONAL
| - y_size: INT64, DECIMAL, OPTIONAL
| - min: INT32, OPTIONAL
So is it related to having NULLs in integer columns? Other columns don't have NULL values
Please try using the columns=
keyword to see if it's one particular column showing the problem, or all. I notice you have INT/DECIMAL types in here, which are pretty rare.
on my reduced data set (approx 200k rows) start_id was the only column that caused problems but id_in_phase caused a lot of problems on the full data (4 million rows)
@martindurant I am currently in a similar situation where I am trying to load a dataframe create by spark with a lot of nullable columns and I get the
ValueError: cannot convert float NaN to integer
@rcammisola-perform did you figure out a way to overcome this?
@PGryllos , since I don't know what the root cause is here, I'm afraid I can only ask you to go through investigative steps like the ones above, to try to find out what's going on.
@martindurant I can see that if I try to load specific columns with small chunks of the data it succeeds most of the time; so I assume there are sparse problems with the input data; not sure if it's coming from missing values or if I have to enable some explicit type conversion. Do you have any pointers for what how I can look into it?
@martindurant if I try using the pyarrow engine I get an NotImplementedError
coming from a call to to_pandas_dtype
;
looks like this
NotImplementedError: struct<true_secs: int32, tz: int32, os: string>
Even if there are no NULLs in a small section of the data, the dtype of the column will be float if there could be NULLs anywhere along the column, see api.ParquetFile._dtypes
: any any row-group doesn't have statistics, or has statistics but doesn't have num_nulls, or num-nulls is ever non-zero, then the whole column is float. So it seems that either this logic is failing, or not running at all. A few well-placed debugger break-points should figure out which.
So your data is nested? fastparquet explicitly does not handle that, nor does pyarrow (although they should!). Since the struct is simple, you may want to recode on the spark side to make three separate columns instead.
oops :/ didn't realise that. So there is no way to read in nested structures? Unfortunately restructuring spark is not an option. The schema is fairly big and with a lot of hierarchy; (The construction is part of ETL that takes in collections of thrift objects and converts them automatically to parquet)
does that mean that no parquet with such structure can be loaded in a dask dataframe?
the doc here seems to state that fastparquet can read nested schemas https://fastparquet.readthedocs.io/en/latest/details.html#reading-nested-schema
No, there currently isn't. This "flat" case would be doable: you'll notice that the .schema
attribute correctly imputes the nested nature and gives the columns names of the form 'struct.field'
, but it seems that somewhere this is falling down. The trouble is, that there are multiple levels on which you can have NULL: either a component of a struct in a row is NULL or the whole struct is NULL. That's probably why the dtype has been set wrong (you have one type of NULL and not the other).
It probably isn't a massive amount of work to cope with this kind of data, but it is formally out of scope for fastparquet, and I don't intend to work on it. I am a little surprised that pyarrow doesn't load the data correctly. You may want to try the highly experimental library awkward array.
the doc here seems to state that fastparquet can read nested schemas
It says that fastparquet can read nested schema that fit the very specific forms presented. Yours maybe meets this.
If you put float columns for ALL integer columns in api.ParquetFile._dtypes
:
if dt.kind in ['i', 'b']:
dtype[col] = np.dtype('f8')
does loading work?
@martindurant it does actually! still a bit confused about what exactly this means.
I am keen on putting in a little work myself to make our ingestion work with Dask; I have been investigating Dask the past week as part of my effort to unite the repos that we currently have to facilitate the training of models under a single easily versioned and reproducible data flow; without the dependency of spark; Spark ftm makes the development effort cumbersome and not contained easily in one place. So If I can find a way to feed our etl results directly into our training pipelines that would be already a nice benefit. The results of our ETL pipelines unfortunately are complicated parquet structures (:
In that case, it seems like the logic change is needed only in the body of that loop: not just to look for the possibility of NULLs in the column itself, but also in the struct that contains it.
how do you propose I go from here? makes sense to investigate and open a pr?
btw big thanks for taking the time :)
I think that within the if dt.kind in ['i', 'b']:
block, if it is a nested field, then int should go to float if the parent in the schema is OPTIONAL. That's probably safest, I'm not sure whether we can expect the parent to have any statistics to look up.
Maybe it's enough to call if not self.schema.is_required(col)
, worth trying. Otherwise you'd have to extract the column's schema element and the parent's from self.schema
. That depends on the dotted naming convention, struct.field, maybe too simplistic.
@martindurant what I understand is happening is that for every column this loop checks whether the column may have null values based on heuristic checks on meta_data and if it does it converts the column to float. In nested cases the checks fail for some reason ? how does it relate to the parent being optional? (In our case the parent of the whole structure is optional along with many other fields in the different parts of the schema)
the chunk.meta_data.statistics
does it refer to the parent of the chunk?
also the conversion to float is it generic first step of dealing with incoming null values?
In our case the parent of the whole structure is optional
This is exactly the problem. Maybe the int field is required, and you never have a situation where struct.field=NULL - these are the conditions that the block of code checks for when looking at the statistics. However, you maybe can still have struct=NULL. So the REQUIRED/OPTIONAL status of the parent is the difference here.
Actually, it occurs to me that you could also check (chunk.meta_data.num_values == rg.num_values)
. If they equal for all chunks, no NULLs.
the chunk.meta_data.statistics does it refer to the parent of the chunk?
No, to the chunk itself - we have no statistics about the parent struct
also the conversion to float is it generic first step
It is the only thing we can do with pandas, which has no way (yet) to represent an int column with missing values.
Maybe the int field is required, and you never have a situation where ...
ah yea that's definitely happening; the parent is optional but nested fields are required in several layers of the schema.
Actually, it occurs to me that you could also check (chunk.meta_data.num_values == rg.num_values). If they equal for all chunks, no NULLs.
you mean rg.num_rows
? this check doesn't work . Looking into the previous suggestions. I am thinking when the parent is Null then there is no chunks to iterate over probably right?
@martindurant the
if not self.schema.is_required(col):
dtype[col] = np.dtype('f8')
seems to work
you mean
rg.num_rows
yes, sorry.
I'll have to think a little bit about whether the simple fix is appropriate or whether it's overly pessimistic. In any case, the three sizes of float in the if num_nulls:
block should still apply, since there's no need to use float64 for bools-with-NULL or int32-with-null.
You would be very welcome to submit this as a PR, if you like, and ideally include a small file in the test-data folder which demonstrates the problem before the PR and the fix, with an extra test in tests/test_api.py.
I will gladly take the time to do it;
whether the simple fix is appropriate or whether it's overly pessimistic.
Seems a bit pessimistic indeed :/ Optional columns may be frequent without necessarily bearing null data. To me it's still not clear why the null struct is not captured in the statistics
also I don't understand the check:
if chunk.meta_data.statistics.null_count:
num_nulls = True
break
since the exact above check checks for null_count == None this is only checking if null_count == 0. Shouldn't it be doing the exact opposite? check if null count is not zero?
null_count is None
is not the same as == 0: it means null count is not given, so we cannot assume it is zero.
yea I understand that; what I mean is that they way the check above is structure it's explicitly checking for count == 0. which is counter intuitive to me. don't we want to apply the conversion if null values are found?
it's explicitly checking for count == 0.
No, it checks for None
, i.e., information unavailable. If null_count were 0, it would not trigger any of the if-conditions.
uh sorry excuse my confusion (here is already very late :P ) this check indeed checks for not None and for not 0.
if it is only checking for None then what is the purpose the exact above check
if chunk.meta_data.statistics.null_count is None:
num_nulls = True
break