[Bug]: Python TypeError when converting Avro `logicalType` `timestamp-millis` to Beam Schema
What happened?
When importing Avro files that have schemas with the field type:
{"type": "long", "logicalType": "timestamp-millis"}
an attempt to convert the collection to Beam Schemas (beam.Row) will fail with a TypeError in RowCoderImpl in the form:
apache_beam/coders/coder_impl.py:209: in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
???
apache_beam/coders/coder_impl.py:248: in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
???
apache_beam/coders/coder_impl.py:1824: in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
???
E TypeError: an integer is required [while running 'ReadFromAvro/Map(<lambda at avroio.py:633>)']
Avro files with timestamp-millis logical types are created for example when a BigQuery table data with column type TIMESTAMP gets exported to .avro files.
Unit test to repro the issue:
import datetime
import tempfile
import unittest
import apache_beam as beam
import fastavro
class TestBeamSchemaConversions(unittest.TestCase):
def test_convert_timestamp_millis(self):
"""Demonstrate bug: Avro-to-Beam schema conversion cannot handle timestamp-millis logical type."""
avro_schema = {
"type": "record",
"name": "Test",
"fields": [
{"name": "name", "type": "string"},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
],
}
# Write test records into a temp file using fastavro.
with tempfile.NamedTemporaryFile(delete=True) as input_avro_file:
fastavro.writer(
input_avro_file,
avro_schema,
[
{"name": "Alice", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
{"name": "Bob", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
],
validator=True,
)
input_avro_file.flush()
with self.assertRaises(TypeError) as context:
with beam.Pipeline() as p:
avro_records = p | beam.io.ReadFromAvro(input_avro_file.name, as_rows=True)
avro_records | beam.LogElements()
self.assertTrue("an integer is required" in str(context.exception))
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
Part of the solution could be in avro_value_to_beam_value()
Checking for the beam_type matching this FieldType:
logical_type {
urn: "beam:logical_type:millis_instant:v1"
representation {
atomic_type: INT64
}
}
The converter could return Timestamp.of to convert from datetime.datetime
.take-issue
.take-issue
.take-issue