scio icon indicating copy to clipboard operation
scio copied to clipboard

Add timeout to sc.distCache

Open chuwy opened this issue 3 years ago • 3 comments

sc.distCache can be used to download some files and distribute them across workers. However, currently it can be done only once when DistCache.apply() is called.

I need to update local DB files every day/month, which seems to be impossible with current implementation.

My proposal is to replace current lazy val with synchronised var and timeout logic.

One outstanding question I have is how to avoid a situation, where problem caused by all workers trying to download a heavy asset at the same moment. We can solve that by adding randomised period (0 - 60 sec) in which workers update the asset.

Happy to contribute the implementation if this is something that can be considered.

chuwy avatar Jul 30 '20 10:07 chuwy

This is a streaming job I assume? By timeout, do you mean overwriting the same file URIs and have DistCache instances re-downloading them? This might lead to race condition and other issues though.

OTOH all workers downloading the same assert shouldn't be an issue assuming it's on GCS since GCS can usually handle such loads no problem?

I wonder if it's better handled this way:

  • Having a refreshing side input (example) that checks an assert
  • Trigger new events when there's an update, together with the new URI, assuming each version is an immutable snapshot
  • Have a custom DoFn that manages this shared asset, and update it upon receiving the event from the above step

This would also work for non-file-based asserts like a database lookup, since the resource loading mechanism can be abstracted away. WDYT?

For the abstraction, we'd need something like this:

  • A resource ID type R, i.e. URI/String of the GCS path or DB connection
  • A def checkForUpdate(): (R, Boolean), where _._2 indicates an update is available
  • A resource type T, i.e. Map[String, String] that is loaded from the resource file/DB
  • A reload resource method, i.e. def load(resourceId: R): T when checkForUpdate returns true

nevillelyh avatar Aug 03 '20 19:08 nevillelyh

This is a streaming job I assume?

Yup, that right.

Trigger new events when there's an update, together with the new URI, assuming each version is an immutable snapshot

Unfortunately that's not the case at least for our job. Most of DBs we use are updated by 3rd parties in-place. But coudln't the refreshing output be time-based?

Nevertheless, it feels like a feasible approach to us - I'll try to implement it that way.

As of abstraction, the same problem arises checkForUpdate(): (R, Boolean) would not work for our use case as DBs have static URI.

chuwy avatar Aug 10 '20 15:08 chuwy

Actually we could make checkForUpdate() return Option[R], where R would be a new URI for immutable snapshots, but the same DB connection for your case? The method will abstract away update logic, e.g. sort by timestamp or checking a DB field. If it returns None, the DoFn can just ignore and reuse the existing resource T instant instance.

nevillelyh avatar Aug 10 '20 17:08 nevillelyh