ipfixcol
ipfixcol copied to clipboard
Runtime Error with json kafka output
When we run ipfixol using 'ipfixcol -v 3' with input ipfix messages over udp and json output on kafka we get this error on console almost after 12-24 hours:
terminate called after throwing an instance of 'std::runtime_error' what(): Requested 'partition' is unknown in the Kafka cluster. Aborted
Though partition never changes, and spark as a consumer continues to work using same partition & topic but without data.
The kafka code is not well tested. Unfortunately, I'm not an expert on Kafka, so the code is very basic and maybe some error handling is missing. However, unless there is a way to replicate the error in a reasonable time, I'm not able to fix it at this point. Would you be able to provide a way (set of commands) to replicate the issue?
We ran 'ipfixcol -v 3 ' command after configuring startup.xml as following
<collectingProcess>
<name>UDP collector</name>
<udpCollector>
<name>Listening port 4739</name>
<localPort>4739</localPort>
<templateLifeTime>1800</templateLifeTime>
<optionsTemplateLifeTime>1800</optionsTemplateLifeTime>
<localIPAddress>192.168.135.70</localIPAddress>
</udpCollector>
<exportingProcess>json writer</exportingProcess>
</collectingProcess>
<exportingProcess>
<!--## Name of the exporting process, must match <exportingProcess> element
in <collectingProcess> -->
<name>json writer</name>
<!-- JSON data writer configuration -->
<destination>
<name>JSON storage plugin</name>
<fileWriter>
<fileFormat>json</fileFormat>
<metadata>no</metadata>
<tcpFlags>formatted</tcpFlags>
<timestamp>unix</timestamp>
<protocol>raw</protocol>
<ignoreUnknown>yes</ignoreUnknown>
<nonPrintableChar>no</nonPrintableChar>
<output>
<type>kafka</type>
<ip>192.168.135.62</ip>
<port>9092</port>
<ip>192.168.135.63</ip>
<port>9092</port>
<!-- <ip>192.168.135.64</ip>
<port>9092</port> -->
<partitions>36</partitions>
<topic>ipfix36</topic>
</output>
</fileWriter>
</destination>
<singleManager>yes</singleManager>
</exportingProcess>
I was hoping for a bit more detailed information, something similar to the following:
I'm trying to reproduce the issue now on Ubuntu 18.04 LTS with 2.1.0.
I've setup the system as follows:
apt-get update
apt-get -y install build-essential autoconf libtool flex bison libxml2-dev libssl1.0-dev librdkafka-dev openjdk-8-jre
cd /home/vagrant
sudo -u vagrant git clone --recursive https://github.com/CESNET/ipfixcol.git
cd ipfixcol/base
sudo -u vagrant git checkout devel
sudo -u vagrant autoreconf -i
sudo -u vagrant ./configure --disable-doc --disable-sctp
sudo -u vagrant make
make install
ldconfig
cd ../plugins/storage/json/
sudo -u vagrant autoreconf -i
sudo -u vagrant ./configure --disable-doc --enable-kafka
sudo -u vagrant make
sudo make install
useradd kafka -m -s /bin/bash
cd /home/kafka
sudo -u kafka wget https://www-eu.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz -O kafka.tgz
sudo -u kafka mkdir kafka && cd kafka
sudo -u kafka tar -xvzf /home/kafka/kafka.tgz --strip 1
sudo -u kafka echo -e '\ndelete.topic.enable = true' >> /home/kafka/kafka/config/server.properties
cat >/etc/systemd/system/zookeeper.service <<EOL
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
EOL
cat >/etc/systemd/system/kafka.service <<EOL
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
EOL
Configuration of the IPFIXcol is the following:
<?xml version="1.0" encoding="UTF-8"?>
<ipfix xmlns="urn:ietf:params:xml:ns:yang:ietf-ipfix-psamp">
<collectingProcess>
<name>UDP collector</name>
<udpCollector>
<name>Listening port 4739</name>
<localPort>4739</localPort>
<localIPAddress></localIPAddress>
<templateLifeTime>1800</templateLifeTime>
<optionsTemplateLifeTime>1800</optionsTemplateLifeTime>
</udpCollector>
<exportingProcess>json</exportingProcess>
</collectingProcess>
<!--## Exporting process configuration -->
<exportingProcess>
<name>json</name>
<destination>
<name>JSON storage plugin</name>
<fileWriter>
<fileFormat>json</fileFormat>
<metadata>no</metadata>
<tcpFlags>formatted</tcpFlags>
<timestamp>unix</timestamp>
<protocol>raw</protocol>
<ignoreUnknown>yes</ignoreUnknown>
<nonPrintableChar>no</nonPrintableChar>
<output>
<type>kafka</type>
<ip>127.0.0.1</ip>
<port>9092</port>
<partitions>36</partitions>
<topic>ipfix36</topic>
</output>
</fileWriter>
</destination>
<singleManager>yes</singleManager>
</exportingProcess>
</ipfix>
The following is done to start kafka and ipfixcol
systemctl start kafka
/usr/local/bin/ipfixcol -c /vagrant/startup-json.xml -d
I'm sending the same data repeatedly from file to ipfixcol and a consumer is running as follows:
/usr/local/bin/ipfixsend -i /vagrant/data.ipfix &
/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ipfix36 --from-beginning > /dev/null
Also, I needed to create enough partitions (not done by the broker by default):
/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic ipfix36 --partitions 36
I'll check the VM tomorrow to see whether it crashed or not.
So, what crashed was the kafka process. Could you start with the above configuration and provide additional steps to reproduce the issue?
In our case Kafka process din't crash.
On 12/6/18, Petr Velan [email protected] wrote:
So, what crashed was the kafka process. Could you start with the above configuration and provide additional steps to reproduce the issue?
-- You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub: https://github.com/CESNET/ipfixcol/issues/204#issuecomment-444850788