scio
scio copied to clipboard
Add timeout to sc.distCache
sc.distCache
can be used to download some files and distribute them across workers. However, currently it can be done only once when DistCache.apply()
is called.
I need to update local DB files every day/month, which seems to be impossible with current implementation.
My proposal is to replace current lazy val with synchronised var
and timeout logic.
One outstanding question I have is how to avoid a situation, where problem caused by all workers trying to download a heavy asset at the same moment. We can solve that by adding randomised period (0 - 60 sec) in which workers update the asset.
Happy to contribute the implementation if this is something that can be considered.
This is a streaming job I assume? By timeout, do you mean overwriting the same file URIs and have DistCache
instances re-downloading them? This might lead to race condition and other issues though.
OTOH all workers downloading the same assert shouldn't be an issue assuming it's on GCS since GCS can usually handle such loads no problem?
I wonder if it's better handled this way:
- Having a refreshing side input (example) that checks an assert
- Trigger new events when there's an update, together with the new URI, assuming each version is an immutable snapshot
- Have a custom
DoFn
that manages this shared asset, and update it upon receiving the event from the above step
This would also work for non-file-based asserts like a database lookup, since the resource loading mechanism can be abstracted away. WDYT?
For the abstraction, we'd need something like this:
- A resource ID type
R
, i.e.URI
/String
of the GCS path or DB connection - A
def checkForUpdate(): (R, Boolean)
, where_._2
indicates an update is available - A resource type
T
, i.e.Map[String, String]
that is loaded from the resource file/DB - A reload resource method, i.e.
def load(resourceId: R): T
whencheckForUpdate
returnstrue
This is a streaming job I assume?
Yup, that right.
Trigger new events when there's an update, together with the new URI, assuming each version is an immutable snapshot
Unfortunately that's not the case at least for our job. Most of DBs we use are updated by 3rd parties in-place. But coudln't the refreshing output be time-based?
Nevertheless, it feels like a feasible approach to us - I'll try to implement it that way.
As of abstraction, the same problem arises checkForUpdate(): (R, Boolean)
would not work for our use case as DBs have static URI.
Actually we could make checkForUpdate()
return Option[R]
, where R
would be a new URI for immutable snapshots, but the same DB connection for your case? The method will abstract away update logic, e.g. sort by timestamp or checking a DB field. If it returns None
, the DoFn
can just ignore and reuse the existing resource T
instant instance.