hadoop-connectors icon indicating copy to clipboard operation
hadoop-connectors copied to clipboard

How to avoid mid-write of a BLOB in GCS?

Open findinpath opened this issue 2 years ago • 1 comments

Let's take the following code snippet as context:

       GoogleHadoopFileSystem ghfs 
        ....
        try(FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
             fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
             if (true) throw new IOException("Unexpected I/O");
             fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
        }

When the FSDataOutputStream output stream is closed the file lines.txt will appear on GCS containing only the content:

first line

I'm exploring how to reach the functionality of seeing on GCS a file either when the file has been fully written or not see it all in the situation when it has been mid-written.

One possibility of doing this would be to close the stream only and only if ALL the content of the file has been successfully written.

             FSDataOutputStream fs = ghfs.create(new Path("gs://tiny-world/tiny/lines.txt"), false)){
             fs.write("first line\n".getBytes(StandardCharsets.UTF_8));
             fs.write("second line\n".getBytes(StandardCharsets.UTF_8));
             fs.close()

In such a case however, when an exception occurs while writing the file, the stream would remain unclosed resulting in "leaking" of resources within the application.

Is there a recipe of achieving the functionality of writing a BLOB on GCS with the following constraints:

  • the destination BLOB must not exist on GCS. If a concurrent process tries later to write under the same path it fails.
  • the BLOB is written on GCS if and only if it has been fully written

findinpath avatar May 20 '22 20:05 findinpath

I investigated the hadoop-connectors project code and opted to use reflection in order to get access to com.google.api.services.storage.Storage from the GoogleHadoopFileSystem

    GoogleCloudStorage googleCloudStorage =  ghfs.getGcsFs().getGcs();
    Field gcsField = googleCloudStorage.getClass().getDeclaredField("gcs");
    gcsField.setAccessible(true);
    Storage gcs = (Storage) gcsField.get(googleCloudStorage);

I have made an own version of the Insert call based on the template found in com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel#createRequest and this solved my specific problem.

private static StorageObject createBlob(URI blobPath, byte[] content, GoogleHadoopFileSystem ghfs, Storage gcs)
        throws IOException
{
    CreateFileOptions createFileOptions = new CreateFileOptions(false);
    CreateObjectOptions createObjectOptions = objectOptionsFromFileOptions(createFileOptions);
    PathCodec pathCodec = ghfs.getGcsFs().getOptions().getPathCodec();
    StorageResourceId storageResourceId = pathCodec.validatePathAndGetId(blobPath, false);

    StorageObject object =
            new StorageObject()
                    .setContentEncoding(createObjectOptions.getContentEncoding())
                    .setMetadata(encodeMetadata(createObjectOptions.getMetadata()))
                    .setName(storageResourceId.getObjectName());

    InputStream inputStream = new ByteArrayInputStream(content, 0, content.length);
    Storage.Objects.Insert insert = gcs.objects().insert(
            storageResourceId.getBucketName(),
            object,
            new InputStreamContent(createObjectOptions.getContentType(), inputStream));
    // The operation succeeds only if there are no live versions of the blob. 
    insert.setIfGenerationMatch(0L);
    insert.getMediaHttpUploader().setDirectUploadEnabled(true);
    insert.setName(storageResourceId.getObjectName());
    return insert.execute();
}

/**
 * Helper for converting from a Map<String, byte[]> metadata map that may be in a
 * StorageObject into a Map<String, String> suitable for placement inside a
 * GoogleCloudStorageItemInfo.
 */
@VisibleForTesting
static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
    return Maps.transformValues(metadata, QuickstartParallelApiWriteExample::encodeMetadataValues);
}

// A function to encode metadata map values
private static String encodeMetadataValues(byte[] bytes) {
    return bytes == null ? Data.NULL_STRING : BaseEncoding.base64().encode(bytes);
}

Would it be possible to expose gcs field through a getter in GoogleCloudStorageImpl to avoid the rather hacky reflection trick?

findinpath avatar May 21 '22 21:05 findinpath