kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Support stream-based rotation interval with FieldPartitioner

Open jalaziz opened this issue 5 years ago • 4 comments

If I'm not mistaken, the current behavior of the TopicPartitionWriter makes it such that you cannot use rotate.interval.ms with a FieldPartitioner. This makes it impossible to have exactly-once semantics for short-lived partitions (without setting the flush size 1).

This happens for mainly two reasons:

  1. rotate.interval.ms requires TimeBasedPartitioner. This is a bit artificial though, since it really only needs a TimestampExtractor.
  2. Due to (1), the assumption is that only a single partition is being written to at a time and so TopicPartitionWriter rotates when a partition changes.

However, we have a use case that could benefit from exactly once semantics with a FieldPartitioner with rotate.interval.ms enabled and multiple writes.

The scenario is a stream of logs/events that are tied to a particular session ID. We can use FieldPartitioner to extract the session ID from the events. However, since the session ends eventually, we commit the last file which potentially has less than flush.size records. We could use scheduled rotation, but that would break exactly once semantics. Since the event stream as a whole doesn't stop, stream time (Record or RecordField) could be used to enable exactly once semantics.

So far, the scenario above can be accomplished by extending a TimeBasedPartitioner and simply overriding the encodePartition and partitionFields methods with FieldPartitioner logic. However, you can end up with a lot of small files if a Kafka partition has interspersed messages due to the second issue mentioned above. Having multiple open partitioners in this scenario would not hurt exactly-once semantics since session events will always route to the same session partition.

Looking through the source code, I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner so that FieldPartitioner can use it as well and allow the user to control if multiple writers are allowed.

Will try to get a PR up soon, but also looking for feedback.

jalaziz avatar Jun 11 '19 01:06 jalaziz

We are hitting a similar problem: We use a custom partitioner to partition by 1) a custom payload field and 2) by time (year, month, day), like in this one: https://github.com/canelmas/kafka-connect-field-and-time-partitioner Since rotation currently always kicks in when the connector sees a record from a different encoded partition, this means we get many small files with usually 1-3 events per file. This makes the connector (and subsequent AWS Athena queries on the written data) painfully slow.

jurgispods avatar Dec 10 '19 16:12 jurgispods

@jalaziz did you ever open a PR for this? We ran into the exact same issue and noticed it might be a great chance to contribute back.

Specifically this bit:

... I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner so that FieldPartitioner can use it as well and allow the user to control if multiple writers are allowed.

grantatspothero avatar Apr 10 '20 20:04 grantatspothero

#380 has a solution, but requires rebuilding the connector.

maxstepanov avatar Mar 24 '21 15:03 maxstepanov

@jalaziz did you ever open a PR for this? We ran into the exact same issue and noticed it might be a great chance to contribute back.

Specifically this bit:

... I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner so that FieldPartitioner can use it as well and allow the user to control if multiple writers are allowed.

Super sorry for the late reply here.

I did not open a PR because it turned out the changes we needed were slightly different and if I recall correctly, solving the exactly once problem wasn't so straight forward.

However, for anyone coming back to this, here's some examples of what we did:

import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.{ PartitionerConfig, TimeBasedPartitioner }
import io.confluent.connect.storage.util.DataUtils
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord

import java.util
import scala.collection.JavaConverters._
import scala.util.Try

/**
  * Based on [[io.confluent.connect.storage.partitioner.FieldPartitioner]], but supports
  * Maps and nested fields and timestamp extraction.
  */
class FieldPartitioner[T] extends TimeBasedPartitioner[T] {
  protected var fieldNames: Seq[String] = Seq.empty
  protected var partitionFieldSchemas: util.List[T] = _

  override def configure(config: util.Map[String, AnyRef]): Unit = {
    super.configure(config)
    fieldNames = config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG).asInstanceOf[java.util.List[String]].asScala
  }

  override def encodePartition(sinkRecord: SinkRecord): String = {
    sinkRecord.value() match {
      case value @ (_: Struct | _: util.Map[_, _]) ⇒
        val partitions = fieldNames.map({ field ⇒
          val partitionKey = Try(DataUtils.getNestedFieldValue(value, field)).getOrElse(null)
          field + "=" + String.valueOf(partitionKey)
        })
        partitions.mkString(delim)
      case _ ⇒
        throw new PartitionException("Record is not a Struct or Map")
    }
  }

  override def partitionFields(): util.List[T] = {
    if (partitionFieldSchemas == null) {
      partitionFieldSchemas = newSchemaGenerator(config).newPartitionFields(fieldNames.mkString(","))
    }
    partitionFieldSchemas
  }
}

and

import io.confluent.connect.storage.common.StorageCommonConfig
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.{ DefaultPartitioner, PartitionerConfig }
import io.confluent.connect.storage.util.DataUtils
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord

import java.util
import scala.util.Try

trait FieldPartitionerMixin[T] extends DefaultPartitioner[T] {
  private var dirDelim: String = _
  protected var fieldNames: java.util.List[String] = _

  abstract override def configure(config: util.Map[String, AnyRef]): Unit = {
    super.configure(config)
    fieldNames = config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG).asInstanceOf[java.util.List[String]]
    dirDelim = config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG).asInstanceOf[String]
  }

  abstract override def encodePartition(sinkRecord: SinkRecord): String = {
    val baseEncodedPartition = super.encodePartition(sinkRecord)
    sinkRecord.value() match {
      case value @ (_: Struct | _: util.Map[_, _]) ⇒
        val builder = new StringBuilder(baseEncodedPartition)
        fieldNames.forEach { field ⇒
          builder.append(dirDelim)
          val partitionKey = Try(DataUtils.getNestedFieldValue(value, field)).getOrElse(null)
          builder.append(field + "=" + String.valueOf(partitionKey))
        }
        builder.toString
      case _ ⇒
        throw new PartitionException("Record is not a Struct or Map")
    }
  }

  abstract override def partitionFields(): util.List[T] = {
    // Implementing this to append our fields requires pulling in Hive as a dependency.
    // We don't use this, but even if we did, it's not clear the extra fields should actually be partitions.
    // For now, just use the base method.
    super.partitionFields()
  }
}

The way these are then used is by using defined partitioners as follows:

class DailyRegionPartitioner[T] extends DailyPartitioner[T] with RegionPrefixPartitioner[T]
class HourlyRegionPartition[T] extends HourlyPartitioner[T] with RegionPrefixPartitioner[T]
class DailyFieldPartitioner[T] extends DailyPartitioner[T] with FieldPartitionerMixin[T] with RegionPrefixPartitioner[T]
class HourlyFieldPartitioner[T] extends HourlyPartitioner[T] with FieldPartitionerMixin[T] with RegionPrefixPartitioner[T]

Here, RegionPrefixPartitioner is a partitioner that inherits from DefaultPartitioner and simply prepends the current region to the partitions. It's defined in that order to ensure we always prefix by region.

jalaziz avatar Mar 24 '21 21:03 jalaziz