hadoop-connectors
hadoop-connectors copied to clipboard
How to avoid mid-write of a BLOB in GCS?
Let's take the following code snippet as context:
GoogleHadoopFileSystem ghfs
....
try(FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
if (true) throw new IOException("Unexpected I/O");
fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
}
When the FSDataOutputStream
output stream is closed the file lines.txt
will appear on GCS containing only the content:
first line
I'm exploring how to reach the functionality of seeing on GCS a file either when the file has been fully written or not see it all in the situation when it has been mid-written.
One possibility of doing this would be to close the stream only and only if ALL the content of the file has been successfully written.
FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
fs.close()
In such a case however, when an exception occurs while writing the file, the stream would remain unclosed resulting in "leaking" of resources within the application.
Is there a recipe of achieving the functionality of writing a BLOB on GCS with the following constraints:
- the destination BLOB must not exist on GCS. If a concurrent process tries later to write under the same path it fails.
- the BLOB is written on GCS if and only if it has been fully written
I investigated the hadoop-connectors project code and opted to use reflection in order to get access to com.google.api.services.storage.Storage
from the GoogleHadoopFileSystem
GoogleCloudStorage googleCloudStorage = ghfs.getGcsFs().getGcs();
Field gcsField = googleCloudStorage.getClass().getDeclaredField("gcs");
gcsField.setAccessible(true);
Storage gcs = (Storage) gcsField.get(googleCloudStorage);
I have made an own version of the Insert
call based on the template found in com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel#createRequest
and this solved my specific problem.
private static StorageObject createBlob(URI blobPath, byte[] content, GoogleHadoopFileSystem ghfs, Storage gcs)
throws IOException
{
CreateFileOptions createFileOptions = new CreateFileOptions(false);
CreateObjectOptions createObjectOptions = objectOptionsFromFileOptions(createFileOptions);
PathCodec pathCodec = ghfs.getGcsFs().getOptions().getPathCodec();
StorageResourceId storageResourceId = pathCodec.validatePathAndGetId(blobPath, false);
StorageObject object =
new StorageObject()
.setContentEncoding(createObjectOptions.getContentEncoding())
.setMetadata(encodeMetadata(createObjectOptions.getMetadata()))
.setName(storageResourceId.getObjectName());
InputStream inputStream = new ByteArrayInputStream(content, 0, content.length);
Storage.Objects.Insert insert = gcs.objects().insert(
storageResourceId.getBucketName(),
object,
new InputStreamContent(createObjectOptions.getContentType(), inputStream));
// The operation succeeds only if there are no live versions of the blob.
insert.setIfGenerationMatch(0L);
insert.getMediaHttpUploader().setDirectUploadEnabled(true);
insert.setName(storageResourceId.getObjectName());
return insert.execute();
}
/**
* Helper for converting from a Map<String, byte[]> metadata map that may be in a
* StorageObject into a Map<String, String> suitable for placement inside a
* GoogleCloudStorageItemInfo.
*/
@VisibleForTesting
static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
return Maps.transformValues(metadata, QuickstartParallelApiWriteExample::encodeMetadataValues);
}
// A function to encode metadata map values
private static String encodeMetadataValues(byte[] bytes) {
return bytes == null ? Data.NULL_STRING : BaseEncoding.base64().encode(bytes);
}
Would it be possible to expose gcs
field through a getter in GoogleCloudStorageImpl
to avoid the rather hacky reflection
trick?