elasticsearch-py icon indicating copy to clipboard operation
elasticsearch-py copied to clipboard

parallel_bulk() fails with "TypeError: 'NoneType' object is not callable"

Open r3b311i0n opened this issue 7 years ago • 2 comments

This is my first time using ES and I apologise if I'm missing something obvious. I'm currently using streaming_bulk() and it's working fine but if I switch to using parallel_bulk(), I get this error.

Relevant code:

class Resident(InnerDoc):
    id = Integer()
    first_name = Text()
    last_name = Text()
    archived = Boolean()


class BaseReading(InnerDoc):
    id = Integer(index=False)
    auto = Boolean()
    safe = Boolean() 
    when = Date()


class HeartRate(BaseReading):
    value = Float()


class VitalSign(Document):
    class Index:
        name = 'vitals-*'
        using = es
        settings = {}

    date = Date()
    resident = Nested(Resident)
    heart_rate = Nested(HeartRate)
def process_vitals(r_id, index_name, start):
    today_str = start.strftime('%Y-%m-%d')
    v = VitalSign({
        'date': date(2017, 10, 9),
        'resident': {'id': 12, 'first_name': 'Agatha', 'last_name': 'Christie', 'archived': False},
        'heart_rate': [
            {'id': 17, 'auto': False, 'when': datetime(2017, 10, 9, 1, 30), 'safe': False, 'value': 35}
        ]
    })
    yield {
        '_op_type': 'index',
        '_index':   index_name,
        '_type':    'doc',
        '_id':      f'{r_id}:{today_str}',
        '_source':  v.to_dict(),
    }

generator_list = []
index_name = 'vitals-1'
start = datetime.today().date()
for r_id in range(1, 5):
    g = process_vitals(r_id, index_name, start)
    generator_list.append(g)
iterable = parallel_bulk(
    es,
    (i for g in generator_list for i in g),
    index=index_name,
    doc_type='doc',
    raise_on_exception=False,
    raise_on_error=False,
)

for ok, result in iterable:
    if ok:
        print('OK')
        continue

Traceback:

  File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/r3b311i0n/Projects/humient/backend/app/common/elasticsearch/vital_signs/full_index.py", line 177, in <module>
    full_index()
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/r3b311i0n/Projects/humient/backend/app/common/elasticsearch/vital_signs/full_index.py", line 157, in full_index
    for ok, result in iterable:
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 306, in parallel_bulk
    _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
  File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py", line 699, in next
    raise value
  File "/home/r3b311i0n/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 305, in <lambda>
    lambda bulk_chunk: list(_process_bulk_chunk(client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs)),
  File "/home/r3b311i0n/.pyenv/versions/viduthi-backend/lib/python3.6/site-packages/elasticsearch/helpers/__init__.py", line 95, in _process_bulk_chunk
    resp = client.bulk('\n'.join(bulk_actions) + '\n', *args, **kwargs)
TypeError: 'NoneType' object is not callable

Env:

elasticsearch 6.3.0
elasticsearch-dsl 6.2.1
Flask-Elasticsearch 0.2.5

r3b311i0n avatar Nov 02 '18 09:11 r3b311i0n

I need to know what your gendata is... just something dummy to reproduce, I can't reproduce with the code snippet provided. Thanks

fxdgear avatar Nov 16 '18 19:11 fxdgear

Thanks for the response! I updated the OP.

r3b311i0n avatar Nov 18 '18 22:11 r3b311i0n