kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Support stream-based rotation interval with FieldPartitioner
If I'm not mistaken, the current behavior of the TopicPartitionWriter
makes it such that you cannot use rotate.interval.ms
with a FieldPartitioner
. This makes it impossible to have exactly-once semantics for short-lived partitions (without setting the flush size 1).
This happens for mainly two reasons:
-
rotate.interval.ms
requiresTimeBasedPartitioner
. This is a bit artificial though, since it really only needs aTimestampExtractor
. - Due to (1), the assumption is that only a single partition is being written to at a time and so
TopicPartitionWriter
rotates when a partition changes.
However, we have a use case that could benefit from exactly once semantics with a FieldPartitioner
with rotate.interval.ms
enabled and multiple writes.
The scenario is a stream of logs/events that are tied to a particular session ID. We can use FieldPartitioner
to extract the session ID from the events. However, since the session ends eventually, we commit the last file which potentially has less than flush.size
records. We could use scheduled rotation, but that would break exactly once semantics. Since the event stream as a whole doesn't stop, stream time (Record
or RecordField
) could be used to enable exactly once semantics.
So far, the scenario above can be accomplished by extending a TimeBasedPartitioner
and simply overriding the encodePartition
and partitionFields
methods with FieldPartitioner
logic. However, you can end up with a lot of small files if a Kafka partition has interspersed messages due to the second issue mentioned above. Having multiple open partitioners in this scenario would not hurt exactly-once semantics since session events will always route to the same session partition.
Looking through the source code, I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner
so that FieldPartitioner
can use it as well and allow the user to control if multiple writers are allowed.
Will try to get a PR up soon, but also looking for feedback.
We are hitting a similar problem: We use a custom partitioner to partition by 1) a custom payload field and 2) by time (year, month, day), like in this one: https://github.com/canelmas/kafka-connect-field-and-time-partitioner Since rotation currently always kicks in when the connector sees a record from a different encoded partition, this means we get many small files with usually 1-3 events per file. This makes the connector (and subsequent AWS Athena queries on the written data) painfully slow.
@jalaziz did you ever open a PR for this? We ran into the exact same issue and noticed it might be a great chance to contribute back.
Specifically this bit:
... I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner so that FieldPartitioner can use it as well and allow the user to control if multiple writers are allowed.
#380 has a solution, but requires rebuilding the connector.
@jalaziz did you ever open a PR for this? We ran into the exact same issue and noticed it might be a great chance to contribute back.
Specifically this bit:
... I think it could be meaningful to separate out timestamp extraction from the TimeBasedPartitioner so that FieldPartitioner can use it as well and allow the user to control if multiple writers are allowed.
Super sorry for the late reply here.
I did not open a PR because it turned out the changes we needed were slightly different and if I recall correctly, solving the exactly once problem wasn't so straight forward.
However, for anyone coming back to this, here's some examples of what we did:
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.{ PartitionerConfig, TimeBasedPartitioner }
import io.confluent.connect.storage.util.DataUtils
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord
import java.util
import scala.collection.JavaConverters._
import scala.util.Try
/**
* Based on [[io.confluent.connect.storage.partitioner.FieldPartitioner]], but supports
* Maps and nested fields and timestamp extraction.
*/
class FieldPartitioner[T] extends TimeBasedPartitioner[T] {
protected var fieldNames: Seq[String] = Seq.empty
protected var partitionFieldSchemas: util.List[T] = _
override def configure(config: util.Map[String, AnyRef]): Unit = {
super.configure(config)
fieldNames = config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG).asInstanceOf[java.util.List[String]].asScala
}
override def encodePartition(sinkRecord: SinkRecord): String = {
sinkRecord.value() match {
case value @ (_: Struct | _: util.Map[_, _]) ⇒
val partitions = fieldNames.map({ field ⇒
val partitionKey = Try(DataUtils.getNestedFieldValue(value, field)).getOrElse(null)
field + "=" + String.valueOf(partitionKey)
})
partitions.mkString(delim)
case _ ⇒
throw new PartitionException("Record is not a Struct or Map")
}
}
override def partitionFields(): util.List[T] = {
if (partitionFieldSchemas == null) {
partitionFieldSchemas = newSchemaGenerator(config).newPartitionFields(fieldNames.mkString(","))
}
partitionFieldSchemas
}
}
and
import io.confluent.connect.storage.common.StorageCommonConfig
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.{ DefaultPartitioner, PartitionerConfig }
import io.confluent.connect.storage.util.DataUtils
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord
import java.util
import scala.util.Try
trait FieldPartitionerMixin[T] extends DefaultPartitioner[T] {
private var dirDelim: String = _
protected var fieldNames: java.util.List[String] = _
abstract override def configure(config: util.Map[String, AnyRef]): Unit = {
super.configure(config)
fieldNames = config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG).asInstanceOf[java.util.List[String]]
dirDelim = config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG).asInstanceOf[String]
}
abstract override def encodePartition(sinkRecord: SinkRecord): String = {
val baseEncodedPartition = super.encodePartition(sinkRecord)
sinkRecord.value() match {
case value @ (_: Struct | _: util.Map[_, _]) ⇒
val builder = new StringBuilder(baseEncodedPartition)
fieldNames.forEach { field ⇒
builder.append(dirDelim)
val partitionKey = Try(DataUtils.getNestedFieldValue(value, field)).getOrElse(null)
builder.append(field + "=" + String.valueOf(partitionKey))
}
builder.toString
case _ ⇒
throw new PartitionException("Record is not a Struct or Map")
}
}
abstract override def partitionFields(): util.List[T] = {
// Implementing this to append our fields requires pulling in Hive as a dependency.
// We don't use this, but even if we did, it's not clear the extra fields should actually be partitions.
// For now, just use the base method.
super.partitionFields()
}
}
The way these are then used is by using defined partitioners as follows:
class DailyRegionPartitioner[T] extends DailyPartitioner[T] with RegionPrefixPartitioner[T]
class HourlyRegionPartition[T] extends HourlyPartitioner[T] with RegionPrefixPartitioner[T]
class DailyFieldPartitioner[T] extends DailyPartitioner[T] with FieldPartitionerMixin[T] with RegionPrefixPartitioner[T]
class HourlyFieldPartitioner[T] extends HourlyPartitioner[T] with FieldPartitionerMixin[T] with RegionPrefixPartitioner[T]
Here, RegionPrefixPartitioner
is a partitioner that inherits from DefaultPartitioner
and simply prepends the current region to the partitions. It's defined in that order to ensure we always prefix by region.