logstash-input-rethinkdb
logstash-input-rethinkdb copied to clipboard
RethinkDB - Logstash -> Elasticsearch config
Hi, i had a lot of trouble configuring logstash, rethinkdb and elastic to work together. Since i saw lots of people questioning how to do it, i want to share my solution
input {
rethinkdb {
host => "10.71.34.101"
port => 28015
auth_key => ""
watch_dbs => ["table_name"]
watch_tables => ["table_name.countries"]
backfill => true
}
}
filter{
if ![new_val] and [old_val] {
mutate {
add_field => {
"[@metadata][action]" => "delete"
"[@metadata][id]" => "%{[old_val][id]}"
"[@metadata][index]" => "%{[db]}"
"[@metadata][document_type]" => "%{[table]}"
}
}
}
if [new_val] and [old_val] {
ruby {
init => "require 'json'"
code => "
e = event['new_val']
e.each_with_index do |(key, value), index|
event[key] = value
end
"
}
mutate {
add_field => {
"[@metadata][action]" => "update"
"[@metadata][id]" => "%{[new_val][id]}"
"[@metadata][index]" => "%{[db]}"
"[@metadata][document_type]" => "%{[table]}"
}
remove_field => [ "old_val", "new_val" ]
}
}
if [new_val] and ![old_val] {
ruby {
init => "require 'json'"
code => "
e = event['new_val']
e.each_with_index do |(key, value), index|
event[key] = value
end
"
}
mutate {
add_field => {
"[@metadata][action]" => "index"
"[@metadata][id]" => "%{[new_val][id]}"
"[@metadata][index]" => "%{[db]}"
"[@metadata][document_type]" => "%{[table]}"
}
remove_field => [ "old_val", "new_val" ]
}
}
}
output {
elasticsearch {
hosts => ["192.168.0.101:9200"]
action => "%{[@metadata][action]}"
document_id => "%{[@metadata][id]}"
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][document_type]}"
}
stdout {
codec => json_lines
}
}
Hope this helps
@AtnNn Perhaps we should move this to a wiki or something?
@AtnNn We should, but i don't know how.
@rzvavram wouldn't this suffice - https://github.com/rethinkdb/logstash-input-rethinkdb/wiki/Plugin-config-example
I'm guessing a simplified version should be added to main README instead of what is there. If this project starts getting some love we can add a pull request.
thx @rzvavram @sagivf for sharing, you might want to update this config for logstash 5.x breaking changes
e = event['new_val']
e.each_with_index do |(key, value), index|
event[key] = value
end
change to
e = event.get('new_val')
e.each_with_index do |(key, value), index|
event.set( key, value )
end
btw you probably just need the each method instead of the each_with_index method, don't see where index is used in that block...