Flatterer on Azure Databricks - OOM Error
I'm getting the following error while running a large json file through flatterer on Azure Databricks:
the python process exited with exit code 137 (sigkill: killed). this may have been caused by an oom error. check your command's memory usage.
The process runs for almost 4hrs before this error pops up. I am using a single-node cluster with 8 vcpus and 64gb of RAM (with photon enabled). Running the same exact code and json file on a Win11 VM (8 vcpus, 32gb RAM) works fine and finishes in around the same 4hrs.
Here is the file that I am working with (publicly accessible data): https://uhc-tic-mrf.azureedge.net/public-mrf/2023-10-01/2023-10-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Choice-Plus-POS_8_in-network-rates.json.gz
The error is a bit cryptic and hints at OOM but maybe it's something else? Any help would be appreciated. Please let me know if you need any other details.
@alevites could you give me the options/code you are using for this file?
flatterer aims to be memory efficient for large files, so it is interesting that this fails.
I have not used azure databricks, but many notebook style services, use a ram disk for local storage, so it might be that the output is too big for the instance.
thanks for the quick response! here's the code and some thoughts on what I think is happening:
import flatterer
from datetime import datetime
from urllib.parse import urlparse
import urllib.request
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
url = 'https://uhc-tic-mrf.azureedge.net/public-mrf/2023-10-01/2023-10-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_Choice-Plus-POS_8_in-network-rates.json.gz'
split = urlparse(url)
filename = "/tmp/" + split.path.split("/")[-1]
urllib.request.urlretrieve(url, filename)
inputPath = filename
outputPath = "/tmp/output/"
output = flatterer.flatten(inputPath, outputPath, csv=False, parquet=True, force=True)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
I also found the following blurb from Azure:
For Azure Databricks Filesystem (DBFS) - Support only files less than 2GB in size. Note: If you use local file I/O APIs to read or write files larger than 2GB you might see corrupted files. Instead, access files larger than 2GB using the DBFS CLI, dbutils.fs, or Spark APIs or use the /dbfs/ml folder.
One of the resulting parquet tables are certainly larger than 2GB, so I think this could be the issue.
I am going to try mounting ADLS in Databricks, which would bypass this limit.
@alevites Tried this file out, and it was taking up around 10GB of memory for me.
This is due to fairly large objects (around 10MB) and flatterer has an internal queue of maximum size 1000. This queue size was optimal for performance for the data I tested on, but it was on data that had max 1MB objects.
I also tested the data out, setting the queue to max 100, and it went down as expectedly to under 1GB. I might give the users a way to tweak this themselves or add a low_memory mode which would set a much lower queue size of say 2 * the number of threads.
Thanks for the update! I will try on Databricks again if/when you make that update.
Also, I tried mounting Azure blob storage to Databricks and flatterer does connect and attempt to write to it but then throws this error:
RuntimeError: Could not remove directory /dbfs/mnt/test/output/
Caused by: Resource temporarily unavailable (os error 11)
Location: /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/convert/mod.rs:716:9
Oddly enough, the "output" folder gets created and "tmp" has a few CSVs created inside of it, but no other files/folders get created (no "csv" folder", no "parquet" folder, fields/tables.csv). The error sounds like some sort of network issue (dropped connection perhaps) but it happens every single time (I've run it 5-10 times) and the json is quite small, so it might be something else.
Code:
import flatterer
from datetime import datetime
from urllib.parse import urlparse
import urllib.request
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
url = 'https://uhc-tic-mrf.azureedge.net/public-mrf/2023-10-01/2023-10-01_UnitedHealthcare-Insurance-Company_Insurer_OHPH-ST_30_in-network-rates.json.gz'
split = urlparse(url)
filename = "/dbfs/mnt/test/" + split.path.split("/")[-1]
urllib.request.urlretrieve(url, filename)
inputPath = filename
outputPath = "/dbfs/mnt/test/output/"
output = flatterer.flatten(inputPath, outputPath, csv=False, parquet=True, force=True)
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
Where /dbfs/mnt is the mount point for ADLS, which I can access in Databricks, using dbutils and the os module. What module are you using to write the parquet files?
@alevites Mounts of object stores, in my experience, are always flaky and do not operate the same as a real file system e.g after deleting a file they do not actually delete straight away.
I tried s3 mounts with flatterer, and they did not work very well. That is why I made a specific s3 output and input: https://flatterer.opendata.coop/s3.html which works lots better and does retries if certain chunks do not get saved for some reason. Sadly, azure does not have a s3 compatible API, so this will not work there.
It should be possible to do the same with Azure blob storage, but would require a lot of work.
Sounds good, no worries. Please let me know if you get around to implementing the customizable queue size. Thanks again!