data-validation
data-validation copied to clipboard
We can not use INT with missing values?
It seems that we can't use INT with missing values. For example, using the schema and the csv below would fail to validate:
schema.pbtxt:
feature {
name: "f1"
type: INT
bool_domain {
}
presence {
min_fraction: 0.5
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "f2"
type: INT
bool_domain {
}
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
train.csv:
f1,f2
0,0
,1
2,2
The validation I tried is below:
import pandas as pd
import tensorflow_data_validation as tfdv
df_train = pd.read_csv("train.csv")
schema = tfdv.load_schema_text("schema.pbtxt")
result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))
@sfujiwara, Can you please share the Error Message. Thanks!
@rmothukuru In my environment, I found an error below:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1349 if _fn_takes_side_inputs(fn):
-> 1350 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1351 else:
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
534 assert table.num_rows == 1
--> 535 return (table, validate_instance(table, options))
536
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
519 feature_statistics_list = (
--> 520 stats_impl.generate_statistics_in_memory(instance, options))
521 anomalies = validate_statistics(feature_statistics_list, options.schema,
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
732 partial_stats = generate_partial_statistics_in_memory(
--> 733 table, options, stats_generators)
734 return extract_statistics_output(partial_stats, stats_generators)
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
712 result.append(
--> 713 generator.add_input(generator.create_accumulator(), table))
714
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
179 feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180 flattened_values = leaf_array.flatten()
181 unweighted_counts = collections.Counter()
AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten'
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
<ipython-input-3-ed30d4744f14> in <module>
----> 1 result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/utils/validation_lib.py in validate_examples_in_csv(data_location, stats_options, column_names, delimiter, output_path, pipeline_options)
191 shard_name_template='',
192 coder=beam.coders.ProtoCoder(
--> 193 statistics_pb2.DatasetFeatureStatisticsList)))
194
195 return stats_gen_lib.load_statistics(output_path)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
425 def __exit__(self, exc_type, exc_val, exc_tb):
426 if not exc_type:
--> 427 self.run().wait_until_finish()
428
429 def visit(self, visitor):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
405 self.to_runner_api(use_fake_coders=True),
406 self.runner,
--> 407 self._options).run(False)
408
409 if self._options.view_as(TypeOptions).runtime_type_check:
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
418 finally:
419 shutil.rmtree(tmpdir)
--> 420 return self.runner.run_pipeline(self, self._options)
421
422 def __enter__(self):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
127 runner = BundleBasedDirectRunner()
128
--> 129 return runner.run_pipeline(pipeline, options)
130
131
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_pipeline(self, pipeline, options)
369
370 self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
--> 371 default_environment=self._default_environment))
372 return self._latest_run_result
373
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_via_runner_api(self, pipeline_proto)
376 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
377 # the teststream (if any), and all the stages).
--> 378 return self.run_stages(stage_context, stages)
379
380 @contextlib.contextmanager
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_stages(self, stage_context, stages)
458 stage,
459 pcoll_buffers,
--> 460 stage_context.safe_coders)
461 metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
462 monitoring_infos_by_stage[stage.name] = (
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in _run_stage(self, worker_handler_factory, pipeline_components, stage, pcoll_buffers, safe_coders)
736 num_workers=self._num_workers)
737
--> 738 result, splits = bundle_manager.process_bundle(data_input, data_output)
739
740 def input_for(ptransform_id, input_id):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
1717 self._get_input_coder_impl, self._bundle_descriptor,
1718 self._progress_frequency, self._registered).process_bundle(
-> 1719 part, expected_outputs), part_inputs):
1720
1721 split_result_list += split_result
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result_iterator()
596 # Careful not to keep a reference to the popped future
597 if timeout is None:
--> 598 yield fs.pop().result()
599 else:
600 yield fs.pop().result(end_time - time.monotonic())
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
433 raise CancelledError()
434 elif self._state == FINISHED:
--> 435 return self.__get_result()
436 else:
437 raise TimeoutError()
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
--> 384 raise self._exception
385 else:
386 return self._result
~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in <lambda>(part)
1717 self._get_input_coder_impl, self._bundle_descriptor,
1718 self._progress_frequency, self._registered).process_bundle(
-> 1719 part, expected_outputs), part_inputs):
1720
1721 split_result_list += split_result
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
1655 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1656 process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1657 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1658
1659 split_results = []
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in push(self, request)
1083 self._uid_counter += 1
1084 request.instruction_id = 'control_%s' % self._uid_counter
-> 1085 response = self.worker.do_instruction(request)
1086 return ControlFuture(request.instruction_id, response)
1087
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
341 # E.g. if register is set, this will call self.register(request.register))
342 return getattr(self, request_type)(getattr(request, request_type),
--> 343 request.instruction_id)
344 else:
345 raise NotImplementedError
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
367 with self.maybe_profile(instruction_id):
368 delayed_applications, requests_finalization = (
--> 369 bundle_processor.process_bundle(instruction_id))
370 response = beam_fn_api_pb2.InstructionResponse(
371 instruction_id=instruction_id,
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
661 instruction_id, expected_transforms):
662 input_op_by_transform_id[
--> 663 data.ptransform_id].process_encoded(data.data)
664
665 # Finish all operations.
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
141 decoded_value = self.windowed_coder_impl.decode_from_stream(
142 input_stream, True)
--> 143 self.output(decoded_value)
144
145 def try_split(self, fraction_of_remainder, total_buffer_size):
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/src/sandbox/venv/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
444 if traceback == Ellipsis:
445 _, _, traceback = sys.exc_info()
--> 446 raise exc.with_traceback(traceback)
447
448 else:
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1348 'Received %r instead.' % (fn))
1349 if _fn_takes_side_inputs(fn):
-> 1350 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1351 else:
1352 wrapper = lambda x: [fn(x)]
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
533 # Verify that we have a single row.
534 assert table.num_rows == 1
--> 535 return (table, validate_instance(table, options))
536
537
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
518 raise ValueError('options must include a schema.')
519 feature_statistics_list = (
--> 520 stats_impl.generate_statistics_in_memory(instance, options))
521 anomalies = validate_statistics(feature_statistics_list, options.schema,
522 environment)
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
731 stats_generators = get_generators(options, in_memory=True) # type: List[stats_generator.CombinerStatsGenerator]
732 partial_stats = generate_partial_statistics_in_memory(
--> 733 table, options, stats_generators)
734 return extract_statistics_output(partial_stats, stats_generators)
735
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
711 for generator in stats_generators:
712 result.append(
--> 713 generator.add_input(generator.create_accumulator(), table))
714
715 return result
~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
178 if (feature_path in self._categorical_features or
179 feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180 flattened_values = leaf_array.flatten()
181 unweighted_counts = collections.Counter()
182 # Compute unweighted counts.
AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten' [while running 'DetectAnomalies/DetectAnomaliesInExamples']
Hi Shuhei -- This is due to a bug in our handling of null arrays in the top-k and uniques combiner stats generator. We will have a fix out shortly for this. Thanks for bringing it to our attention!
Thanks! Do you have a plan to support nullable INT? I think there are some difficulties:
- NumPy does not allow nullable int (
np.nan
is only float) - Though now pandas supports nullable int
'Int64'
,pd.read_csv
and other functions convert int with missing values to float in default
I'm very happy if
- generate and validate schema on
pd.DataFrame
which have nullable int ('Int64'
) column work correctly- That is, type is INT and min_fraction is less than 1.0
- generate and validate schema on CSV with nullable int work correctly
- it does not depend on pandas and NumPy?
@sfujiwara
This is tfdv.generate_statistics_from_dataframe
a utility function for users with in-memory data represented as a pandas DataFrame and as you know pandas dataframe supports nullable int so that way you can try to use it. for your reference I have attached link here so please look into it and see if it helps for you
Thank you!
Hi, @sfujiwara
Closing this issue due to inactivity. Please feel free to reopen if this still exists. Thank you!