We can not use INT with missing values?
It seems that we can't use INT with missing values. For example, using the schema and the csv below would fail to validate:
schema.pbtxt:
feature {
name: "f1"
type: INT
bool_domain {
}
presence {
min_fraction: 0.5
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "f2"
type: INT
bool_domain {
}
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
train.csv:
f1,f2
0,0
,1
2,2
The validation I tried is below:
import pandas as pd
import tensorflow_data_validation as tfdv
df_train = pd.read_csv("train.csv")
schema = tfdv.load_schema_text("schema.pbtxt")
result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))
@sfujiwara, Can you please share the Error Message. Thanks!
@rmothukuru In my environment, I found an error below:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1349 if _fn_takes_side_inputs(fn):
-> 1350 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1351 else:
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
534 assert table.num_rows == 1
--> 535 return (table, validate_instance(table, options))
536
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
519 feature_statistics_list = (
--> 520 stats_impl.generate_statistics_in_memory(instance, options))
521 anomalies = validate_statistics(feature_statistics_list, options.schema,
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
732 partial_stats = generate_partial_statistics_in_memory(
--> 733 table, options, stats_generators)
734 return extract_statistics_output(partial_stats, stats_generators)
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
712 result.append(
--> 713 generator.add_input(generator.create_accumulator(), table))
714
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
179 feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180 flattened_values = leaf_array.flatten()
181 unweighted_counts = collections.Counter()
AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten'
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
<ipython-input-3-ed30d4744f14> in <module>
----> 1 result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/utils/validation_lib.py in validate_examples_in_csv(data_location, stats_options, column_names, delimiter, output_path, pipeline_options)
191 shard_name_template='',
192 coder=beam.coders.ProtoCoder(
--> 193 statistics_pb2.DatasetFeatureStatisticsList)))
194
195 return stats_gen_lib.load_statistics(output_path)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
425 def __exit__(self, exc_type, exc_val, exc_tb):
426 if not exc_type:
--> 427 self.run().wait_until_finish()
428
429 def visit(self, visitor):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
405 self.to_runner_api(use_fake_coders=True),
406 self.runner,
--> 407 self._options).run(False)
408
409 if self._options.view_as(TypeOptions).runtime_type_check:
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
418 finally:
419 shutil.rmtree(tmpdir)
--> 420 return self.runner.run_pipeline(self, self._options)
421
422 def __enter__(self):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
127 runner = BundleBasedDirectRunner()
128
--> 129 return runner.run_pipeline(pipeline, options)
130
131
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_pipeline(self, pipeline, options)
369
370 self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
--> 371 default_environment=self._default_environment))
372 return self._latest_run_result
373
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_via_runner_api(self, pipeline_proto)
376 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
377 # the teststream (if any), and all the stages).
--> 378 return self.run_stages(stage_context, stages)
379
380 @contextlib.contextmanager
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_stages(self, stage_context, stages)
458 stage,
459 pcoll_buffers,
--> 460 stage_context.safe_coders)
461 metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
462 monitoring_infos_by_stage[stage.name] = (
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in _run_stage(self, worker_handler_factory, pipeline_components, stage, pcoll_buffers, safe_coders)
736 num_workers=self._num_workers)
737
--> 738 result, splits = bundle_manager.process_bundle(data_input, data_output)
739
740 def input_for(ptransform_id, input_id):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
1717 self._get_input_coder_impl, self._bundle_descriptor,
1718 self._progress_frequency, self._registered).process_bundle(
-> 1719 part, expected_outputs), part_inputs):
1720
1721 split_result_list += split_result
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result_iterator()
596 # Careful not to keep a reference to the popped future
597 if timeout is None:
--> 598 yield fs.pop().result()
599 else:
600 yield fs.pop().result(end_time - time.monotonic())
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
433 raise CancelledError()
434 elif self._state == FINISHED:
--> 435 return self.__get_result()
436 else:
437 raise TimeoutError()
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
--> 384 raise self._exception
385 else:
386 return self._result
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in <lambda>(part)
1717 self._get_input_coder_impl, self._bundle_descriptor,
1718 self._progress_frequency, self._registered).process_bundle(
-> 1719 part, expected_outputs), part_inputs):
1720
1721 split_result_list += split_result
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
1655 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1656 process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1657 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1658
1659 split_results = []
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in push(self, request)
1083 self._uid_counter += 1
1084 request.instruction_id = 'control_%s' % self._uid_counter
-> 1085 response = self.worker.do_instruction(request)
1086 return ControlFuture(request.instruction_id, response)
1087
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
341 # E.g. if register is set, this will call self.register(request.register))
342 return getattr(self, request_type)(getattr(request, request_type),
--> 343 request.instruction_id)
344 else:
345 raise NotImplementedError
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
367 with self.maybe_profile(instruction_id):
368 delayed_applications, requests_finalization = (
--> 369 bundle_processor.process_bundle(instruction_id))
370 response = beam_fn_api_pb2.InstructionResponse(
371 instruction_id=instruction_id,
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
661 instruction_id, expected_transforms):
662 input_op_by_transform_id[
--> 663 data.ptransform_id].process_encoded(data.data)
664
665 # Finish all operations.
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
141 decoded_value = self.windowed_coder_impl.decode_from_stream(
142 input_stream, True)
--> 143 self.output(decoded_value)
144
145 def try_split(self, fraction_of_remainder, total_buffer_size):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
444 if traceback == Ellipsis:
445 _, _, traceback = sys.exc_info()
--> 446 raise exc.with_traceback(traceback)
447
448 else:
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1348 'Received %r instead.' % (fn))
1349 if _fn_takes_side_inputs(fn):
-> 1350 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1351 else:
1352 wrapper = lambda x: [fn(x)]
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
533 # Verify that we have a single row.
534 assert table.num_rows == 1
--> 535 return (table, validate_instance(table, options))
536
537
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
518 raise ValueError('options must include a schema.')
519 feature_statistics_list = (
--> 520 stats_impl.generate_statistics_in_memory(instance, options))
521 anomalies = validate_statistics(feature_statistics_list, options.schema,
522 environment)
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
731 stats_generators = get_generators(options, in_memory=True) # type: List[stats_generator.CombinerStatsGenerator]
732 partial_stats = generate_partial_statistics_in_memory(
--> 733 table, options, stats_generators)
734 return extract_statistics_output(partial_stats, stats_generators)
735
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
711 for generator in stats_generators:
712 result.append(
--> 713 generator.add_input(generator.create_accumulator(), table))
714
715 return result
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
178 if (feature_path in self._categorical_features or
179 feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180 flattened_values = leaf_array.flatten()
181 unweighted_counts = collections.Counter()
182 # Compute unweighted counts.
AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten' [while running 'DetectAnomalies/DetectAnomaliesInExamples']
Hi Shuhei -- This is due to a bug in our handling of null arrays in the top-k and uniques combiner stats generator. We will have a fix out shortly for this. Thanks for bringing it to our attention!
Thanks! Do you have a plan to support nullable INT? I think there are some difficulties:
- NumPy does not allow nullable int (
np.nanis only float) - Though now pandas supports nullable int
'Int64',pd.read_csvand other functions convert int with missing values to float in default
I'm very happy if
- generate and validate schema on
pd.DataFramewhich have nullable int ('Int64') column work correctly- That is, type is INT and min_fraction is less than 1.0
- generate and validate schema on CSV with nullable int work correctly
- it does not depend on pandas and NumPy?
@sfujiwara
This is tfdv.generate_statistics_from_dataframe a utility function for users with in-memory data represented as a pandas DataFrame and as you know pandas dataframe supports nullable int so that way you can try to use it. for your reference I have attached link here so please look into it and see if it helps for you
Thank you!
Hi, @sfujiwara
Closing this issue due to inactivity. Please feel free to reopen if this still exists. Thank you!