kafka-connect-storage-common icon indicating copy to clipboard operation
kafka-connect-storage-common copied to clipboard

How to modify the filename of the S3 object?

Open matias-dls opened this issue 5 years ago • 2 comments

Hi, I've been using the S3 connector for a couple of weeks now, and I want to change the way the connector names each file. I am using the HourlyBasedPartition, so the path to each file is already enough for me to find each file, and I want the filenames to be something generic for all the files, like just 'Data.json.gzip' (with the respective path from the partitioner).

For example, I want to go from this: <prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

To this: <prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

The objective of this is to only make one call to S3 to download the files later, instead of having to look for the filename first and then download it.

matias-dls avatar Oct 07 '20 13:10 matias-dls

Searching through the files from the repo called 'kafka-connect-s3', I found this file: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java which at the end has some of the following functions:

private RecordWriter getWriter(SinkRecord record, String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",
        encodedPartition,
        commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
    writers.put(encodedPartition, writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix, startOffset);
      commitFiles.put(encodedPartition, commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix, String keyPrefix, String name) {
    String suffix = keyPrefix + dirDelim + name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix + dirDelim + suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix, long startOffset) {
    String name = tp.topic()
                      + fileDelim
                      + tp.partition()
                      + fileDelim
                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;
    return fileKey(topicsDir, dirPrefix, name);
  }

I don't know if this can be customized to what I want to do but seems to be somehow near/related to my intentions. Hope it helps.

matias-dls avatar Oct 07 '20 14:10 matias-dls

Worth pointing out that the sink connector does not append into files, so you would be overriding those files if you were to change these lines

+ String name = "Data"
- String name = tp.topic()
-                      + fileDelim
-                      + tp.partition()
-                      + fileDelim
-                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;

OneCricketeer avatar Jun 22 '22 21:06 OneCricketeer