ipfixcol icon indicating copy to clipboard operation
ipfixcol copied to clipboard

Runtime Error with json kafka output

Open ResearchIntern98 opened this issue 7 years ago • 5 comments

When we run ipfixol using 'ipfixcol -v 3' with input ipfix messages over udp and json output on kafka we get this error on console almost after 12-24 hours:

terminate called after throwing an instance of 'std::runtime_error' what(): Requested 'partition' is unknown in the Kafka cluster. Aborted

Though partition never changes, and spark as a consumer continues to work using same partition & topic but without data.

ResearchIntern98 avatar Oct 25 '18 05:10 ResearchIntern98

The kafka code is not well tested. Unfortunately, I'm not an expert on Kafka, so the code is very basic and maybe some error handling is missing. However, unless there is a way to replicate the error in a reasonable time, I'm not able to fix it at this point. Would you be able to provide a way (set of commands) to replicate the issue?

thorgrin avatar Nov 16 '18 22:11 thorgrin

We ran 'ipfixcol -v 3 ' command after configuring startup.xml as following

   <collectingProcess>
            <name>UDP collector</name>
            <udpCollector>
                    <name>Listening port 4739</name>
                    <localPort>4739</localPort>
                    <templateLifeTime>1800</templateLifeTime>
                    <optionsTemplateLifeTime>1800</optionsTemplateLifeTime>
                    <localIPAddress>192.168.135.70</localIPAddress>
            </udpCollector>
            <exportingProcess>json writer</exportingProcess>
    </collectingProcess>   
<exportingProcess>
        <!--## Name of the exporting process, must match <exportingProcess> element
           in <collectingProcess> -->
        <name>json writer</name>
        <!-- JSON data writer configuration -->
        <destination>
                <name>JSON storage plugin</name>
                <fileWriter>
                        <fileFormat>json</fileFormat>
                        <metadata>no</metadata>
                        <tcpFlags>formatted</tcpFlags>
                        <timestamp>unix</timestamp>
                        <protocol>raw</protocol>
                        <ignoreUnknown>yes</ignoreUnknown>
                        <nonPrintableChar>no</nonPrintableChar>
                        <output>
                           <type>kafka</type>
                           <ip>192.168.135.62</ip>
                           <port>9092</port>
                           <ip>192.168.135.63</ip>
                           <port>9092</port>
                           <!-- <ip>192.168.135.64</ip>
                          <port>9092</port> -->
                          <partitions>36</partitions>
                          <topic>ipfix36</topic>
                         </output>
                     </fileWriter>
        </destination>
<singleManager>yes</singleManager>
</exportingProcess>

ResearchIntern98 avatar Nov 22 '18 05:11 ResearchIntern98

I was hoping for a bit more detailed information, something similar to the following:

I'm trying to reproduce the issue now on Ubuntu 18.04 LTS with 2.1.0.

I've setup the system as follows:

apt-get update
apt-get -y install build-essential autoconf libtool flex bison libxml2-dev libssl1.0-dev librdkafka-dev openjdk-8-jre

cd /home/vagrant
sudo -u vagrant git clone --recursive https://github.com/CESNET/ipfixcol.git
cd ipfixcol/base
sudo -u vagrant git checkout devel
sudo -u vagrant autoreconf -i
sudo -u vagrant ./configure --disable-doc --disable-sctp
sudo -u vagrant make
make install
ldconfig

cd ../plugins/storage/json/
sudo -u vagrant autoreconf -i
sudo -u vagrant ./configure --disable-doc --enable-kafka
sudo -u vagrant make
sudo make install

useradd kafka -m -s /bin/bash

cd /home/kafka
sudo -u kafka wget https://www-eu.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz -O kafka.tgz
sudo -u kafka mkdir kafka && cd kafka
sudo -u kafka tar -xvzf /home/kafka/kafka.tgz --strip 1
sudo -u kafka echo -e '\ndelete.topic.enable = true' >> /home/kafka/kafka/config/server.properties

cat >/etc/systemd/system/zookeeper.service <<EOL
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
EOL

cat >/etc/systemd/system/kafka.service <<EOL
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
EOL

Configuration of the IPFIXcol is the following:

<?xml version="1.0" encoding="UTF-8"?>
<ipfix xmlns="urn:ietf:params:xml:ns:yang:ietf-ipfix-psamp">
        
        <collectingProcess>
                <name>UDP collector</name>
                <udpCollector>
                        <name>Listening port 4739</name>
                        <localPort>4739</localPort>
                        <localIPAddress></localIPAddress>
                        <templateLifeTime>1800</templateLifeTime>
                        <optionsTemplateLifeTime>1800</optionsTemplateLifeTime>
                </udpCollector>
                <exportingProcess>json</exportingProcess>
        </collectingProcess>

        <!--## Exporting process configuration -->
        <exportingProcess>
                <name>json</name>
                <destination>
                <name>JSON storage plugin</name>
                <fileWriter>
                        <fileFormat>json</fileFormat>
                        <metadata>no</metadata>
                        <tcpFlags>formatted</tcpFlags>
                        <timestamp>unix</timestamp>
                        <protocol>raw</protocol>
                        <ignoreUnknown>yes</ignoreUnknown>
                        <nonPrintableChar>no</nonPrintableChar>
                        <output>
                           <type>kafka</type>
                           <ip>127.0.0.1</ip>
                           <port>9092</port>
                          <partitions>36</partitions>
                          <topic>ipfix36</topic>
                         </output>
                     </fileWriter>
        </destination>
<singleManager>yes</singleManager>
</exportingProcess>
</ipfix>

The following is done to start kafka and ipfixcol

systemctl start kafka
/usr/local/bin/ipfixcol -c /vagrant/startup-json.xml -d

I'm sending the same data repeatedly from file to ipfixcol and a consumer is running as follows:

/usr/local/bin/ipfixsend -i /vagrant/data.ipfix &
/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ipfix36 --from-beginning > /dev/null

Also, I needed to create enough partitions (not done by the broker by default):

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper localhost:2181  --alter --topic ipfix36 --partitions 36

I'll check the VM tomorrow to see whether it crashed or not.

thorgrin avatar Nov 26 '18 12:11 thorgrin

So, what crashed was the kafka process. Could you start with the above configuration and provide additional steps to reproduce the issue?

thorgrin avatar Dec 06 '18 12:12 thorgrin

In our case Kafka process din't crash.

On 12/6/18, Petr Velan [email protected] wrote:

So, what crashed was the kafka process. Could you start with the above configuration and provide additional steps to reproduce the issue?

-- You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub: https://github.com/CESNET/ipfixcol/issues/204#issuecomment-444850788

ResearchIntern98 avatar Dec 07 '18 06:12 ResearchIntern98