Subtle issue in ExtractFieldValues
The tests of ExtractFieldValues work well but when I use it, I saw the the field was not added.
The reason , I think is that process method modified the stream in place and does not create a new stream generator that modifies the input streams.
for name in multi_stream:
for instance in multi_stream[name]:
instance[self.to_field] = values_to_keep
return multi_stream
So if you add, before the return.
for name in multi_stream:
for instance in multi_stream[name]:
print(instance)
You will see that the instance (which is a actually new instance that is fetched again from the multi-stream[name], does not include the change )
I think the implementation should be similar to :
class SpreadSplit(InstanceOperatorWithMultiStreamAccess):
which has access to the multi stream, but then adds a single value to the instance.
class ExtractFieldValues(InstanceOperatorWithMultiStreamAccess):
field: str
stream_name: str
overall_top_frequency_percent: Optional[int] = 100
min_frequency_percent: Optional[int] = 0
to_field: str
process_every_value: Optional[bool] = False
def prepare(self):
self.local_cache = None
def verify(self):
return super().verify()
def process(
self, instance: Dict[str, object], multi_stream: MultiStream
) -> Dict[str, object]:
try:
if self.local_cache is None:
self.local_cache = calculate_extracted_values(multi stream)
instance[self.to_field] = self.local_cache
return instance
except Exception as e:
raise Exception(
f"Unable to fetch instances from '{self.source_stream}' to '{self.target_field}'"
) from e
Hi Yoav, very nice and important catch! I managed to reproduce, and generated a test for it. And solved with deep copies. Please see PR