elasticsearch-py
elasticsearch-py copied to clipboard
parallel_bulk() fails with "TypeError: 'NoneType' object is not callable"
This is my first time using ES and I apologise if I'm missing something obvious. I'm currently using streaming_bulk() and it's working fine but if I switch to using parallel_bulk(), I get this error.
Relevant code:
class Resident(InnerDoc):
id = Integer()
first_name = Text()
last_name = Text()
archived = Boolean()
class BaseReading(InnerDoc):
id = Integer(index=False)
auto = Boolean()
safe = Boolean()
when = Date()
class HeartRate(BaseReading):
value = Float()
class VitalSign(Document):
class Index:
name = 'vitals-*'
using = es
settings = {}
date = Date()
resident = Nested(Resident)
heart_rate = Nested(HeartRate)
def process_vitals(r_id, index_name, start):
today_str = start.strftime('%Y-%m-%d')
v = VitalSign({
'date': date(2017, 10, 9),
'resident': {'id': 12, 'first_name': 'Agatha', 'last_name': 'Christie', 'archived': False},
'heart_rate': [
{'id': 17, 'auto': False, 'when': datetime(2017, 10, 9, 1, 30), 'safe': False, 'value': 35}
]
})
yield {
'_op_type': 'index',
'_index': index_name,
'_type': 'doc',
'_id': f'{r_id}:{today_str}',
'_source': v.to_dict(),
}
generator_list = []
index_name = 'vitals-1'
start = datetime.today().date()
for r_id in range(1, 5):
g = process_vitals(r_id, index_name, start)
generator_list.append(g)
iterable = parallel_bulk(
es,
(i for g in generator_list for i in g),
index=index_name,
doc_type='doc',
raise_on_exception=False,
raise_on_error=False,
)
for ok, result in iterable:
if ok:
print('OK')
continue
Traceback:
File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/r3b311i0n/Projects/humient/backend/app/common/elasticsearch/vital_signs/full_index.py", line 177, in <module>
full_index()
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 697, in main
rv = self.invoke(ctx)
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 535, in invoke
return callback(*args, **kwargs)
File "/home/r3b311i0n/Projects/humient/backend/app/common/elasticsearch/vital_signs/full_index.py", line 157, in full_index
for ok, result in iterable:
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 306, in parallel_bulk
_chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py", line 699, in next
raise value
File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 305, in <lambda>
lambda bulk_chunk: list(_process_bulk_chunk(client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs)),
File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 95, in _process_bulk_chunk
resp = client.bulk('\n'.join(bulk_actions) + '\n', *args, **kwargs)
TypeError: 'NoneType' object is not callable
Env:
elasticsearch 6.3.0
elasticsearch-dsl 6.2.1
Flask-Elasticsearch 0.2.5
I need to know what your gendata is... just something dummy to reproduce, I can't reproduce with the code snippet provided. Thanks
Thanks for the response! I updated the OP.