spark-deep-learning
spark-deep-learning copied to clipboard
Reading millions of images from s3
When attempting to read millions of images from s3 (all in a single bucket) with readImages, the command just hangs for several hours. Is this expected? Are they any best practices for how to use readImages with millions of images in s3?
Hello @mdagost , can you provide more information about how you store images in that S3 bucket? Is it in a flat directory or a hierarchy?
Right now there’s no hierarchy at all—everything is flat at the top level of the bucket.
I'm wondering if it's related to this forum question:
In fact, your job will appear stalled as the list() call collects the data from the single Driver node. No actually task processing is occurring, so progress appears stalled.
I could potentially construct the list of s3 file paths directly in spark in a faster way (by pulling from a DB, for example). But it looks like readImage takes a single directory string and not a list of file paths...
Ah interesting. It wasn't clear from the docs, but digging through the source code I found this:
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155)
And in a small test, it does indeed work to pass a string with comma-separated full image paths like
paths = "s3://my_bucket/image1,s3://my_bucket/image2"
image_df = readImages(paths)
I'll try it at scale tomorrow constructing that large string directly from the DB rather than listing from s3...
There is still something odd about the performance and scaling of this. Calling readImages on 100k images in s3 (where each path is specified as a comma separated list like I posted above), on a cluster of 8 c4.2xlarge's, and just writing the resulting dataframe back out as parquet, took an hour.
Trying to read 1m images on a cluster of 40 c4.2xlarge's just spins (doesn't even get to the Databricks spark progressbar) even after 4 hours. I would have expected the job to finish in 2 hours.
Any ideas @thunterdb?
@mdagost the issue may be related to listing files on S3: when you have millions of files, the cost (in time and money) of listing all these files can be significant. This is a known issue with Spark and Hadoop in general: https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-td15644.html
Most solutions suggest to concatenate files together, which is not going to work with images. One strategy is to maintain a list of all the image files (for example in a CSV), which you can generate once. Then, you can read images using a map job in Spark that reads each file using a user-defined function. You can see how this UDF is defined in this file:
https://github.com/databricks/spark-deep-learning/blob/master/python/sparkdl/image/imageIO.py#L235
@thunterdb Yep. That's the blog post and issue that I mention above in the thread. I do have the file names in a dataframe already, and I concatenated the names together into a comma-separated string to pass to readImages. That does indeed work (not just passing a bucket path) and I thought would avoid the listing issue. That's not solving things though.
Monitoring the s3 bucket, it seems like what's happening during the spinning is a large number of HEAD requests to the s3 bucket. I wonder why it's not just issuing GET requests right off the bat?
I dug into this more. For posterity and for anyone who comes across this issue, here's the explanation and code for a workaround that appears to be at least 20x faster.
Internally in readImages, filesToDF calls the spark context's binaryFiles function. That, in turn, makes a new BinaryFileRDD. That code has this comment:
// setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when // traversing a large number of directories and files. Parallelize it.
Crucially, when the parallelization happens, it uses
Runtime.getRuntime.availableProcessors().toString)
which indicates that all of that processing is happening on the driver, just parallelized over the processes that the JVM has. That's why this is so unbelievably slow--the driver is used to collect all of the stats on the millions of files, solely to calculate the number of partitions, and then the work is farmed out to the workers to actually read the files. The driver collecting stats on the files is the bottleneck.
Instead, I wrote code to do the following. I have a dataframe with the s3 paths. I run a python function in a map which uses boto3 to directly grab the file from s3 on the worker, decode the image data, and assemble the same type of dataframe as readImages.
Here's the code, more or less in its entirety, to read and decode the images and then just write them to parquet. It ran over 100k images in s3 on 40 nodes in 2.6 minutes instead of the 50 minutes that the vanilla readImages took.
from sparkdl.image.imageIO import _decodeImage, imageSchema
# this function will use boto3 on the workers directly to pull the image
def readFileFromS3(row):
import boto3
import os
s3 = boto3.client('s3')
filePath = row.image_url
# strip off the starting s3a:// from the bucket
bucket = os.path.dirname(str(filePath))[6:]
key = os.path.basename(str(filePath))
response = s3.get_object(Bucket=bucket, Key=key)
body = response["Body"]
contents = bytearray(body.read())
body.close()
if len(contents):
return (filePath, bytearray(contents))
# rows_df is a dataframe with a single string column called "image_url" that has the full s3a filePath
# Running rows_df.rdd.take(2) gives the output
# [Row(image_url=u's3a://mybucket/14f89051-26b3-4bd9-88ad-805002e9a7c5'),
# Row(image_url=u's3a://mybucket/a47a9b32-a16e-4d04-bba0-cdc842c06052')]
# farm out our images to the workers with a map
images_rdd = (
rows_df
.rdd
.map(readFileFromS3)
)
# convert our rdd to a dataframe and then
# use a udf to decode the image; the schema comes from sparkdl.image.imageIO
schema = StructType([StructField("filePath", StringType(), False),
StructField("fileData", BinaryType(), False)])
decodeImage = udf(_decodeImage, imageSchema)
image_df = (
images_rdd
.toDF(schema)
.select("filePath", decodeImage("fileData").alias("image"))
)
(
image_df
.write
.format("parquet")
.mode("overwrite")
.option("compression", "gzip")
.save("s3://my_bucket/images.parquet")
)
@mdagost thank you for working out the details! We would like to add this functionality to the library eventually since this is an important use case. I am going to leave the ticket open for the time being.
Great! Glad this is useful! In the meantime, I simplified the code to make it simpler and run even faster by using a single python udf to read from s3 and decode. New code is below. Let me know if you eventually want me to submit a PR somewhere.
from sparkdl.image.imageIO import _decodeImage, imageSchema
# this function will use boto3 on the workers directly to pull the image
# and then decode it, all in this function
def readFileFromS3(row):
import boto3
import os
s3 = boto3.client('s3')
filePath = row.image_url
# strip off the starting s3a:// from the bucket
bucket = os.path.dirname(str(filePath))[6:]
key = os.path.basename(str(filePath))
response = s3.get_object(Bucket=bucket, Key=key)
body = response["Body"]
contents = bytearray(body.read())
body.close()
if len(contents):
try:
decoded = _decodeImage(bytearray(contents))
return (filePath, decoded)
except:
return (filePath, {"mode": "RGB", "height": 378,
"width": 378, "nChannels": 3,
"data": bytearray("ERROR")})
# rows_df is a dataframe with a single string column called "image_url" that has the full s3a filePath
# Running rows_df.rdd.take(2) gives the output
# [Row(image_url=u's3a://mybucket/14f89051-26b3-4bd9-88ad-805002e9a7c5'),
# Row(image_url=u's3a://mybucket/a47a9b32-a16e-4d04-bba0-cdc842c06052')]
# farm out our images to the workers with a map and get back a dataframe
schema = StructType([StructField("filePath", StringType(), False), StructField("image", imageSchema)])
image_df = (
rows_df
.rdd
.map(readFileFromS3)
.toDF(schema)
)
(
image_df
.write
.format("parquet")
.mode("overwrite")
.option("compression", "gzip")
.save("s3://my_bucket/images.parquet")
)
@mdagost I am fairly new to using spark and was wondering would it be possible to store the resulting dataframe in a non-compressed form and also what is the row parameter being passed to the function? is it an image path from S3?
Hi @mdagost and @thunterdb
I am very interesting to the function that @mdagost provided to feed image data from S3 to train a model since your function above can construct image_df as image data source.
However, with the current version of sparkdl, it seems the code can no longer apply due to "_decodeImage" function is no longer in imageIO. It was there in the pysparkdl 0.2.0 documentation pysparkdl 0.2.0 documentation
And I have tried to use imageArrayToStruct but did not work as well.
decoded = imageArrayToStruct(bytearray(contents)).
...
schema = StructType([StructField("filePath", StringType(), False), StructField("image", ImageSchema)])
I received the following error.
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 406, in init AssertionError: dataType should be DataType
I assume it was due to the way we are getting the image bytearray. I am planing to try to apply the following code later tonight to read the bytearray from S3 object (a jpg file).
img = image.load_img(BytesIO(obj.get()['Body'].read()), target_size=(224, 224))
So I would like to confirm from @thunterdb that currently we can construct the image_df regardless where the images are locating at right? I am constructing the file path from the object keys that sitting in different subfolders in my S3 bucket.
And how can I export the trained weight of an inception model after transfer learning?
Thank you, Heng