exporters
exporters copied to clipboard
Add Avro formatter
This PR adds a new formatter to format data in the Avro format, using fastavro as the backend.
This format is designed to be parallelized. To allow for streaming write (which seems to be the only interface allowed to formatters, with their header and footer apis), using a sync_interval of 0 forces fastavro's Writer to add a block end (basically a 16bit random string) after every record write. This ensures the validity of the output created by the exporters Writers.
Not sure if Travis' failure has to do with my PR (tests passed locally).
./tests/test_formatters.py:172:71: E203 whitespace before ','
./tests/test_formatters.py:197:1: W391 blank line at end of file
make: *** [test] Error 1
ERROR: InvocationError: '/usr/bin/make test'```
I see some pep related flags that I can change right now.
I tried running it with the following config:
{
"exporter_options": {
"formatter": {
"name": "exporters.export_formatter.AvroExportFormatter",
"options": {
"schema": "{\"namespace\": \"random.names\", \"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [{\"type\": \"string\", \"name\": \"country_code\"}, {\"type\": \"string\", \"name\": \"city\"}, {\"default\": \"N/A\", \"type\": \"string\", \"name\": \"state\"}, {\"default\": 0, \"type\": \"int\", \"name\": \"value\"}]}"
}
}
},
"reader": {
"name": "exporters.readers.random_reader.RandomReader",
"options": {
"number_of_items": 100,
"batch_size": 10
}
},
"writer": {
"name": "exporters.writers.fs_writer.FSWriter",
"options": {
"filebase": "/tmp/avro_format_test_",
"items_per_buffer_write": 100
}
}
}
And it crashed with:
ERROR:export-pipeline: -- EXPORTMANAGER -- 'dict' object has no attribute 'encode'
Traceback (most recent call last):
File "bin/export.py", line 41, in <module>
run(args)
File "bin/export.py", line 36, in run
exporter.export()
File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 224, in export
self._run_pipeline()
File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 167, in _run_pipeline
self._run_pipeline_iteration()
File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 76, in _run_pipeline_iteration
self.writer.write_batch(batch=next_batch)
File "/home/elias/hacking/scrapinghub/exporters/exporters/writers/base_writer.py", line 111, in write_batch
self.write_buffer.buffer(item)
File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/base.py", line 37, in buffer
self.items_group_files.add_item_to_file(item, key)
File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/grouping.py", line 115, in add_item_to_file
buffer_file.add_item_to_file(item)
File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/grouping.py", line 84, in add_item_to_file
content = self.formatter.format(item)
File "/home/elias/hacking/scrapinghub/exporters/exporters/export_formatter/avro_export_formatter.py", line 62, in format
self.writer.write(item)
File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 537, in write
write_data(self.io, record, self.schema)
File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 432, in write_data
return fn(fo, datum, schema)
File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 363, in write_record
name, field.get('default')), field['type'])
File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 432, in write_data
return fn(fo, datum, schema)
File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 178, in write_utf8
write_bytes(fo, utob(datum))
File "fastavro/_six.py", line 48, in fastavro._six.py2_utob (fastavro/_six.c:2462)
File "fastavro/_six.py", line 49, in fastavro._six.py2_utob (fastavro/_six.c:2320)
AttributeError: 'dict' object has no attribute 'encode'
I'm probably doing something wrong, any help @manugarri ? :)
@eliasdorneles the issue you are facing is because that the schema you provided does not match what RandomReader produces.
The items produced by it look like this:
{'city': {'name': u'alicante', 'district': u'dist3'}, 'state': u'val\xe9ncia', 'value': 1301, 'country_code': u'us', 'key': 0}
So you would need the following avro schema to define those items:
[
{
"type": "record",
"name": "City",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "district",
"type": "string"
}
]
},
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "city",
"type": "City"
},
{
"name": "state",
"type": "string"
},
{
"name": "value",
"type": "int",
"default": 0
},
{
"name": "key",
"type": "int"
},
{
"name": "country_code",
"type": "string"
}
]
}
]
Basically, a City
type inside the general Item record. I tested it and does work.
@manugarri we are planning to update exporters to python 3 soon, if possible could you make sure your code reflects that? Thanks
@tsrdatatech I run 2to3 on my code and the only thing I see is the change in tests/test_formatters.py
- with self.assertRaisesRegexp(ConfigurationError, "requires at least one of"):
+ with self.assertRaisesRegex(ConfigurationError, "requires at least one of"):
CSVExportFormatter({})
Which is not really my code (its the test for CSVExportFormatter).
Testing on Py3 raises all kinds of errors from the library not related to this PR.
No worries, we will fix this.
awesome, thanks @tsrdatatech let me know if you need anything else on my side. I'd rather not have to keep my own fork when this PR can benefit the community :)
yw @manugarri, we should have the Python 3 version ready by early next week, I agree this PR should be able to be added in then. Thanks for your contribution.
Awesome, thanks @tsrdatatech and @eliasdorneles :)
Any advances on this guys?
@tribeclickgh There is a python 3 branch at https://github.com/scrapinghub/exporters/tree/python3 that is closer to being complete but we have not had time to finish it, your more than welcome to do that if you want. Not sure when we will have this complete at this time.