dbf
dbf copied to clipboard
Add function options to export()
Addition of optional options to ignore_errors when writing rows to files via export()
and to check for and remove null bytes if found in rows of data.
ignore_errors: Try/Catch blocks per row of DBF table. In the case a row causes an error and cannot be export, this option allows you to skip the line and continue exporting ignore_null_bytes: Option to check for and remove null bytes if found in a row of data
This looks good. Can you add a couple test cases to make sure the code is doing what it is supposed to?
I will run some test cases and get back to you.
I have attached a test case which best mimics the scenario in which I used the above. Few things to note;
- I ran some additional prints within the export() function and have not committed these prints to the repo, i will include these below.
- The error I was bypassing with ignore_errors is due to an encoding issue with x81, I do not seem to be able to insert this into a temp table due to the same issue. However the exception is a catch-all and should be easily testable
Hopefully this gives you an idea of what it is looking for. I found a few issues I committed fixes for, with iterables and indentations. Let me know if you need me to provide any more info.
def export(table_or_records, filename=None, field_names=None, format='csv', header=True, dialect='dbf', encoding=None, ignore_errors=False, remove_null_bytes=False):
"""
writes the records using CSV or tab-delimited format, using the filename
given if specified, otherwise the table name
if table_or_records is a collection of records (not an actual table) they
should all be of the same format
ignore_errors will skip rows which raise an exception and continue with the export
remove_null_bytes removes all null bytes from output \x00
"""
table = source_table(table_or_records[0])
if filename is None:
filename = table.filename
if field_names is None:
field_names = table.field_names
if isinstance(field_names, basestring):
field_names = [f.strip() for f in field_names.split(',')]
format = format.lower()
if format not in ('csv', 'tab', 'fixed'):
raise DbfError("export format: csv, tab, or fixed -- not %s" % format)
if format == 'fixed':
format = 'txt'
if encoding is None:
encoding = table.codepage.name
encoder = codecs.getencoder(encoding)
header_names = field_names
# encoding = table.codepage.name
# encoder = codecs.getencoder(encoding)
if isinstance(field_names[0], unicode):
header_names = [encoder(f) for f in field_names]
else:
header_names = field_names
base, ext = os.path.splitext(filename)
if ext.lower() in ('', '.dbf'):
filename = base + "." + format
with codecs.open(filename, 'w', encoding=encoding) as fd:
if format == 'csv':
csvfile = csv.writer(fd, dialect=dialect)
if header:
csvfile.writerow(header_names)
for record in table_or_records:
try:
fields = []
for fieldname in field_names:
data = record[fieldname]
if remove_null_bytes:
if '\x00' in str(data):
print(f'Removing Null Bytes from line{fieldname}: \n{record}')
data = data.replace('\x00', '')
fields.append(unicode(data))
csvfile.writerow(fields)
except Exception as e:
print(f'Skipping line \ndue to error\n{e} at \n{fieldname}')
if not ignore_errors:
raise e
continue
elif format == 'tab':
if header:
fd.write('\t'.join(header_names) + '\n')
for record in table_or_records:
try:
fields = []
for fieldname in field_names:
data = record[fieldname]
if remove_null_bytes:
if '\x00' in str(data):
data = data.replace('\x00', '')
fields.append(unicode(data))
fd.write('\t'.join(fields) + '\n')
except Exception as e:
if not ignore_errors:
raise e
continue
else: # format == 'fixed'
with codecs.open("%s_layout.txt" % os.path.splitext(filename)[0], 'w', encoding=encoding) as header:
header.write("%-15s Size\n" % "Field Name")
header.write("%-15s ----\n" % ("-" * 15))
sizes = []
for field in field_names:
size = table.field_info(field).length
sizes.append(size)
header.write("%-15s %3d\n" % (field, size))
header.write('\nTotal Records in file: %d\n' % len(table_or_records))
for record in table_or_records:
try:
fields = []
for i, fieldname in enumerate(field_names):
data = record[fieldname]
if remove_null_bytes:
if '\x00' in str(data):
data = data.replace('\x00', '')
fields.append("%-*s" % (sizes[i], data))
fd.write(''.join(fields) + '\n')
except Exception as e:
if not ignore_errors:
raise e
continue
return len(table_or_records)
Apologies for the delay. ignore_errors
and strip_nulls
are now supported (v0.99.009).