data-validation icon indicating copy to clipboard operation
data-validation copied to clipboard

We can not use INT with missing values?

Open sfujiwara opened this issue 4 years ago • 4 comments

It seems that we can't use INT with missing values. For example, using the schema and the csv below would fail to validate:

schema.pbtxt:

feature {
  name: "f1"
  type: INT
  bool_domain {
  }
  presence {
    min_fraction: 0.5
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "f2"
  type: INT
  bool_domain {
  }
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}

train.csv:

f1,f2
0,0
,1
2,2

The validation I tried is below:

import pandas as pd
import tensorflow_data_validation as tfdv


df_train = pd.read_csv("train.csv")
schema = tfdv.load_schema_text("schema.pbtxt")

result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))

sfujiwara avatar Nov 15 '19 02:11 sfujiwara

@sfujiwara, Can you please share the Error Message. Thanks!

rmothukuru avatar Nov 15 '19 09:11 rmothukuru

@rmothukuru In my environment, I found an error below:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
   1349   if _fn_takes_side_inputs(fn):
-> 1350     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1351   else:

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
    534   assert table.num_rows == 1
--> 535   return (table, validate_instance(table, options))
    536 

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
    519   feature_statistics_list = (
--> 520       stats_impl.generate_statistics_in_memory(instance, options))
    521   anomalies = validate_statistics(feature_statistics_list, options.schema,

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
    732   partial_stats = generate_partial_statistics_in_memory(
--> 733       table, options, stats_generators)
    734   return extract_statistics_output(partial_stats, stats_generators)

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
    712     result.append(
--> 713         generator.add_input(generator.create_accumulator(), table))
    714 

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
    179           feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180         flattened_values = leaf_array.flatten()
    181         unweighted_counts = collections.Counter()

AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
<ipython-input-3-ed30d4744f14> in <module>
----> 1 result = tfdv.validate_examples_in_csv("train.csv", tfdv.StatsOptions(schema=schema))

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/utils/validation_lib.py in validate_examples_in_csv(data_location, stats_options, column_names, delimiter, output_path, pipeline_options)
    191             shard_name_template='',
    192             coder=beam.coders.ProtoCoder(
--> 193                 statistics_pb2.DatasetFeatureStatisticsList)))
    194 
    195   return stats_gen_lib.load_statistics(output_path)

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    425   def __exit__(self, exc_type, exc_val, exc_tb):
    426     if not exc_type:
--> 427       self.run().wait_until_finish()
    428 
    429   def visit(self, visitor):

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    405           self.to_runner_api(use_fake_coders=True),
    406           self.runner,
--> 407           self._options).run(False)
    408 
    409     if self._options.view_as(TypeOptions).runtime_type_check:

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    418       finally:
    419         shutil.rmtree(tmpdir)
--> 420     return self.runner.run_pipeline(self, self._options)
    421 
    422   def __enter__(self):

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    127       runner = BundleBasedDirectRunner()
    128 
--> 129     return runner.run_pipeline(pipeline, options)
    130 
    131 

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_pipeline(self, pipeline, options)
    369 
    370     self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
--> 371         default_environment=self._default_environment))
    372     return self._latest_run_result
    373 

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_via_runner_api(self, pipeline_proto)
    376     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    377     #   the teststream (if any), and all the stages).
--> 378     return self.run_stages(stage_context, stages)
    379 
    380   @contextlib.contextmanager

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in run_stages(self, stage_context, stages)
    458               stage,
    459               pcoll_buffers,
--> 460               stage_context.safe_coders)
    461           metrics_by_stage[stage.name] = stage_results.process_bundle.metrics
    462           monitoring_infos_by_stage[stage.name] = (

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in _run_stage(self, worker_handler_factory, pipeline_components, stage, pcoll_buffers, safe_coders)
    736         num_workers=self._num_workers)
    737 
--> 738     result, splits = bundle_manager.process_bundle(data_input, data_output)
    739 
    740     def input_for(ptransform_id, input_id):

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
   1717           self._get_input_coder_impl, self._bundle_descriptor,
   1718           self._progress_frequency, self._registered).process_bundle(
-> 1719               part, expected_outputs), part_inputs):
   1720 
   1721         split_result_list += split_result

~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result_iterator()
    596                     # Careful not to keep a reference to the popped future
    597                     if timeout is None:
--> 598                         yield fs.pop().result()
    599                     else:
    600                         yield fs.pop().result(end_time - time.monotonic())

~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.pyenv/versions/3.7.4/lib/python3.7/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in <lambda>(part)
   1717           self._get_input_coder_impl, self._bundle_descriptor,
   1718           self._progress_frequency, self._registered).process_bundle(
-> 1719               part, expected_outputs), part_inputs):
   1720 
   1721         split_result_list += split_result

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in process_bundle(self, inputs, expected_outputs)
   1655         process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
   1656             process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1657     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1658 
   1659     split_results = []

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py in push(self, request)
   1083       self._uid_counter += 1
   1084       request.instruction_id = 'control_%s' % self._uid_counter
-> 1085     response = self.worker.do_instruction(request)
   1086     return ControlFuture(request.instruction_id, response)
   1087 

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    341       # E.g. if register is set, this will call self.register(request.register))
    342       return getattr(self, request_type)(getattr(request, request_type),
--> 343                                          request.instruction_id)
    344     else:
    345       raise NotImplementedError

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    367         with self.maybe_profile(instruction_id):
    368           delayed_applications, requests_finalization = (
--> 369               bundle_processor.process_bundle(instruction_id))
    370           response = beam_fn_api_pb2.InstructionResponse(
    371               instruction_id=instruction_id,

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    661             instruction_id, expected_transforms):
    662           input_op_by_transform_id[
--> 663               data.ptransform_id].process_encoded(data.data)
    664 
    665       # Finish all operations.

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    141       decoded_value = self.windowed_coder_impl.decode_from_stream(
    142           input_stream, True)
--> 143       self.output(decoded_value)
    144 
    145   def try_split(self, fraction_of_remainder, total_buffer_size):

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.Operation.output()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.receive()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/src/sandbox/venv/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
    444         if traceback == Ellipsis:
    445             _, _, traceback = sys.exc_info()
--> 446         raise exc.with_traceback(traceback)
    447 
    448 else:

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.DoFnRunner.process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-darwin.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/src/sandbox/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
   1348         'Received %r instead.' % (fn))
   1349   if _fn_takes_side_inputs(fn):
-> 1350     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1351   else:
   1352     wrapper = lambda x: [fn(x)]

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in _detect_anomalies_in_example(table, options)
    533   # Verify that we have a single row.
    534   assert table.num_rows == 1
--> 535   return (table, validate_instance(table, options))
    536 
    537 

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/api/validation_api.py in validate_instance(instance, options, environment)
    518     raise ValueError('options must include a schema.')
    519   feature_statistics_list = (
--> 520       stats_impl.generate_statistics_in_memory(instance, options))
    521   anomalies = validate_statistics(feature_statistics_list, options.schema,
    522                                   environment)

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_statistics_in_memory(table, options)
    731   stats_generators = get_generators(options, in_memory=True)  # type: List[stats_generator.CombinerStatsGenerator]
    732   partial_stats = generate_partial_statistics_in_memory(
--> 733       table, options, stats_generators)
    734   return extract_statistics_output(partial_stats, stats_generators)
    735 

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py in generate_partial_statistics_in_memory(table, options, stats_generators)
    711   for generator in stats_generators:
    712     result.append(
--> 713         generator.add_input(generator.create_accumulator(), table))
    714 
    715   return result

~/src/sandbox/venv/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/top_k_uniques_combiner_stats_generator.py in add_input(self, accumulator, input_table)
    178       if (feature_path in self._categorical_features or
    179           feature_type == statistics_pb2.FeatureNameStatistics.STRING):
--> 180         flattened_values = leaf_array.flatten()
    181         unweighted_counts = collections.Counter()
    182         # Compute unweighted counts.

AttributeError: 'pyarrow.lib.NullArray' object has no attribute 'flatten' [while running 'DetectAnomalies/DetectAnomaliesInExamples']

sfujiwara avatar Nov 21 '19 01:11 sfujiwara

Hi Shuhei -- This is due to a bug in our handling of null arrays in the top-k and uniques combiner stats generator. We will have a fix out shortly for this. Thanks for bringing it to our attention!

caveness avatar Nov 21 '19 14:11 caveness

Thanks! Do you have a plan to support nullable INT? I think there are some difficulties:

  • NumPy does not allow nullable int (np.nan is only float)
  • Though now pandas supports nullable int 'Int64', pd.read_csv and other functions convert int with missing values to float in default

I'm very happy if

  • generate and validate schema on pd.DataFrame which have nullable int ('Int64') column work correctly
    • That is, type is INT and min_fraction is less than 1.0
  • generate and validate schema on CSV with nullable int work correctly
    • it does not depend on pandas and NumPy?

sfujiwara avatar Nov 21 '19 19:11 sfujiwara

@sfujiwara

This is tfdv.generate_statistics_from_dataframe a utility function for users with in-memory data represented as a pandas DataFrame and as you know pandas dataframe supports nullable int so that way you can try to use it. for your reference I have attached link here so please look into it and see if it helps for you

Thank you!

gaikwadrahul8 avatar Nov 07 '22 00:11 gaikwadrahul8

Hi, @sfujiwara

Closing this issue due to inactivity. Please feel free to reopen if this still exists. Thank you!

gaikwadrahul8 avatar Nov 22 '22 09:11 gaikwadrahul8