jina
jina copied to clipboard
Can't catch error, process continues even after error happened
Describe the bug
from datetime import datetime
from jina import Flow, Executor, DocumentArray, Document
class MyExecutor(Executor):
def foo(self, **kwargs):
print('foo')
f = Flow().add(uses=MyExecutor)
with f:
da = DocumentArray([Document(text='hello', tags={'date': datetime.now()})])
res = f.post(on='/', inputs=da) # process should stop here and raise an exception
print('***** failed on previous line but still prints this')
f.post
failed because of some protobuf serialisation error but code continues to run.
Describe how you solve it
Throw exception and exit.
EDIT:
The client should not throw an exception because one wrong document array should not break the entire post call.
Nevertheless users should be able to catch this exception and act on it (example exit their program or retry to send)
Therefore the best would that this error is raised in the on_error callback.
this is not so weird. A request failing is not necessarily a problem of the complete Flow
. Can you show the exact stack trace?
My use case is this: if data is indexed successfully, I save the ID of the last indexed data. I want to index only the newer entries from the old ones next time. In this case, the function finished successfully, the last row's id was saved, but the indexing process failed. As a result, I am missing batches from the indexing process.
If it throws an error, I am gonna fix my bug and re-run it. It will not miss data.
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:64853 │
│ 🔒 Private 192.168.3.59:64853 │
│ 🌍 Public 87.191.159.105:64853 │
╰──────────────────────────────────────────╯
CRITI… JINA@42940 inputs is not valid! ValueError('Field `tags` is problematic', [09/07/22 13:31:41]
'Unexpected type')
╭─────────────────── Traceback (most recent call last) ────────────────────╮
│ ...venv/lib/python… │
│ in request_generator │
│ │
│ 69 │ │ │ if not isinstance(data, Iterable): │
│ 70 │ │ │ │ data = [data] │
│ 71 │ │ │ for batch in batch_iterator(data, request_size): │
│ ❱ 72 │ │ │ │ yield _new_data_request_from_batch( │
│ 73 │ │ │ │ │ _kwargs=kwargs, │
│ 74 │ │ │ │ │ batch=batch, │
│ 75 │ │ │ │ │ data_type=data_type, │
│ │
│ ...venv/lib/python… │
│ in _new_data_request_from_batch │
│ │
│ 12 │ req = _new_data_request(endpoint, target, parameters) │
│ 13 │ │
│ 14 │ # add docs fields │
│ ❱ 15 │ _add_docs(req, batch, data_type, _kwargs) │
│ 16 │ │
│ 17 │ return req │
│ 18 │
│ │
│ ...venv/lib/python… │
│ in _add_docs │
│ │
│ 69 │ │ else: │
│ 70 │ │ │ d, data_type = _new_doc_from_data(content, data_type, * │
│ 71 │ │ │ da.append(d) │
│ ❱ 72 │ req.data.docs = da │
│ 73 │
│ │
│ ...venv/lib/python… │
│ in docs │
│ │
│ 51 │ │ │ │
│ 52 │ │ │ :param value: a DocumentArray │
│ 53 │ │ │ """ │
│ ❱ 54 │ │ │ self.set_docs_convert_arrays(value, None) │
│ 55 │ │ │
│ 56 │ │ def set_docs_convert_arrays( │
│ 57 │ │ │ self, value: DocumentArray, ndarray_type: Optional[str │
│ │
│ ...venv/lib/python… │
│ in set_docs_convert_arrays │
│ │
│ 64 │ │ │ if value is not None: │
│ 65 │ │ │ │ self._loaded_doc_array = None │
│ 66 │ │ │ │ self._content.docs.CopyFrom( │
│ ❱ 67 │ │ │ │ │ value.to_protobuf(ndarray_type=ndarray_type) │
│ 68 │ │ │ │ ) │
│ 69 │ │ │
│ 70 │ │ @property │
│ │
│ ...venv/lib/python… │
│ in to_protobuf │
│ │
│ 351 │ │ │
│ 352 │ │ dap = DocumentArrayProto() │
│ 353 │ │ for d in self: │
│ ❱ 354 │ │ │ dap.docs.append(d.to_protobuf(ndarray_type)) │
│ 355 │ │ return dap │
│ 356 │ │
│ 357 │ @classmethod │
│ │
│ ...venv/lib/python… │
│ in to_protobuf │
│ │
│ 20 │ │ """ │
│ 21 │ │ from docarray.proto.io import flush_proto │
│ 22 │ │ │
│ ❱ 23 │ │ return flush_proto(self, ndarray_type) │
│ 24 │
│ │
│ ...venv/lib/python… │
│ in flush_proto │
│ │
│ 50 │ │ │ │ │ docs = getattr(pb_msg, key) │
│ 51 │ │ │ │ │ docs.append(d.to_protobuf()) │
│ 52 │ │ │ elif key == 'tags': │
│ ❱ 53 │ │ │ │ pb_msg.tags.update(value) │
│ 54 │ │ │ elif key == '_metadata': │
│ 55 │ │ │ │ pb_msg._metadata.update(value) │
│ 56 │ │ │ elif key in ('scores', 'evaluations'): │
│ │
│ ...venv/lib/python… │
│ in update │
│ │
│ 817 │
│ 818 def update(self, dictionary): # pylint: disable=invalid-name │
│ 819 │ for key, value in dictionary.items(): │
│ ❱ 820 │ _SetStructValue(self.fields[key], value) │
│ 821 │
│ 822 collections.abc.MutableMapping.register(Struct) │
│ 823 │
│ │
│ ...venv/lib/python… │
│ in _SetStructValue │
│ │
│ 748 │ struct_value.list_value.Clear() │
│ 749 │ struct_value.list_value.extend(value) │
│ 750 else: │
│ ❱ 751 │ raise ValueError('Unexpected type') │
│ 752 │
│ 753 │
│ 754 def _GetStructValue(struct_value): │
╰──────────────────────────────────────────────────────────────────────────╯
ValueError: ('Field `tags` is problematic', 'Unexpected type')
***** failed on previous line but still prints this
I want the process to fail or at least stop. The on_error
and on_done
callbacks are not invoked.
If this is expected behavior, how can I know whether my request failed or not?
You can use check_input
method to make sure inputs
will be valid.
Can I ask for a reproducible example of some input that gives this behavior?