divolte-collector
divolte-collector copied to clipboard
Avro schema Validations are bypassing in Divolte JS
Hi @friso @asnare
Can you please let me know , As we know Avro schema has default validation on all fields is "required"
so when I pass custom event from front end as divolte.signal("myevent", {eventType: "buy"})
so it is passing to the sinked kafka which is nothing but Kafka Tracking topic, even though my schema has other fields as well which are also required , so without validating it it those fields are not coming in payload from divolte.signal this will pushed the data to kafka topic
here below is my avro schema
Avro Schema (MyEventRecord.avsc)
{
"type" : "record",
"name" : "MyClass",
"namespace" : "com.test.avro",
"fields" : [ {
"name" : "eventType",
"type" : "string"
}, {
"name" : "userId",
"type" : "string"
}, {
"name" : "productId",
"type" : "string"
}, {
"name" : "timeField",
"type" : "long"
}]
}
Groovy file content
mapping {
map eventParameters().value('eventType') onto 'eventType'
map eventParameters().value('userId') onto 'userId'
map eventParameters().value('productId') onto 'productId'
map timestamp() onto 'timeField'
}
divolte.conf content
divolte {
global {
kafka {
// Enable Kafka flushing
enabled = true
// Number of threads to use for flushing events to Kafka
threads = 2
// The maximum queue of mapped events to buffer before
// starting to drop new ones. Note that when this buffer is full,
// events are dropped and a warning is logged. No errors are reported
// to the source of the events. A single buffer is shared between all
// threads, and its size will be rounded up to the nearest power of 2.
buffer_size = 1048576
// The properties under the producer key in this
// configuration are used to create a Properties object
// which is passed to Kafka as is. At the very least,
// configure the broker list here. For more options
// that can be passed to a Kafka producer, see this link:
// http://kafka.apache.org/082/documentation.html#newproducerconfigs
producer = {
bootstrap.servers = "localhost:9092"
acks = 1
retries = 0
compression.type = lz4
max.in.flight.requests.per.connection = 1
group.id = "KafkaExampleConsumer"
}
}
}
sources {
browser {
type = browser
javascript.name = myproj.js
}
}
mappings {
a_mapping = {
schema_file = "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord.avsc"
mapping_script_file = "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/mapping.groovy"
discard_corrupted = true
discard_duplicates = true
sources = [browser]
sinks = [kafka]
}
# a_mapping = {
# schema_file = "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord2.avsc"
# mapping_script_file = "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/mapping2.groovy"
# discard_corrupted = true
# discard_duplicates = true
# sources = [browser]
# sinks = [kafka]
# }
}
sinks {
// The name of the sink. (It's referred to by the mapping.)
kafka {
type = kafka
// This is the name of the topic that data will be produced on
topic = tracking
}
}
}