fiware-cygnus icon indicating copy to clipboard operation
fiware-cygnus copied to clipboard

Unable to deliver event to Kafka

Open filgiuff opened this issue 3 years ago • 6 comments

I've installed Fiware Orion (v 3.0.0), Fiware Cygnus (2.8.0) and Kafka (2.7.0) in order to send data from Orion to Kafka via Cygnus.

When the notification arrives from Orion to Cygnus, this error appears:

cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6 7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows. cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.google.common.base.Preconditions.checkState(Preconditions.java:172) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at java.lang.Thread.run(Thread.java:748) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67 ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception (begin() calle d when transaction is OPEN!) ...

Here other details.

filgiuff avatar Apr 20 '21 14:04 filgiuff

Hi, I've got the same issue. Apparently the error appears when cygnus is checking whether a specific kafka topic exists.

cygnus_1  | time=2021-04-23T09:04:54.013Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=topicExists | msg=com.telefonica.iot.cygnus.backends.kafka.KafkaBackendImpl[60] : Checking if topic 'libraryxffffx002fcatalogxffffBook1xffffBook' already exists.
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=sendResponse | msg=org.eclipse.jetty.server.HttpChannel[693] : sendResponse info=null content=HeapByteBuffer@587c2b08[p=0,l=0,c=0,r=0]={<<<>>>} complete=true committing=false callback=Blocker@4fec5d4c{null}
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: CONTINUE (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: NEED_CHUNK (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: FLUSH (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=write | msg=org.eclipse.jetty.io.WriteFlusher[315] : write: WriteFlusher@76adc9a1{IDLE}->null [HeapByteBuffer@2ee8a74[p=0,l=5,c=1024,r=5]={<<<0\r\n\r\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}]
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=updateState | msg=org.eclipse.jetty.io.WriteFlusher[119] : update WriteFlusher@76adc9a1{WRITING}->null:IDLE-->WRITING
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=flush | msg=org.eclipse.jetty.io.ChannelEndPoint[288] : flushed 5 SocketChannelEndPoint@376222c4{/172.25.0.3:58186<->/172.25.0.4:5050,OPEN,fill=-,flush=W,to=4/30000}{io=0/0,kio=0,kro=1}->HttpConnection@210f4b41[p=HttpParser{s=END,383 of 383},g=HttpGenerator@59427309{s=COMPLETING}]=>HttpChannelOverHttp@f59a675{r=1,c=true,a=COMPLETING,uri=//cygnus:5050/notify}
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=ERROR | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_1  | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_1  |     at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_1  |     at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_1  |     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_1  |     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_1  |     at java.lang.Thread.run(Thread.java:748)

damianhorna avatar Apr 28 '21 12:04 damianhorna

Possible steps to reproduce:

docker-compose.yml:

version: "3.9"
services:
  mongo:
    image: mongo:4.4
    command: --nojournal
  orion:
    image: fiware/orion
    links:
      - mongo
    ports:
      - "1026:1026"
    command: -dbhost mongo
  cygnus:
    image: fiware/cygnus-ngsi
    volumes:
      - ./agent.conf:/opt/apache-flume/conf/agent.conf
    depends_on:
      - orion
    expose:
      - "5050"
      - "5080"
    ports:
      - "5050:5050"
      - "5080:5080"
    environment:
      - CYGNUS_LOG_LEVEL=DEBUG
      - CYGNUS_SKIP_CONF_GENERATION=true
      - CYGNUS_MULTIAGENT=false

agent.conf:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = kafka-sink
cygnus-ngsi.channels = kafka-channel

cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf

cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100

cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 172.17.0.1:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 172.17.0.1:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

And then requests as in the tutorial (https://fiware-cygnus.readthedocs.io/en/latest/cygnus-ngsi/integration/orion_cygnus_kafka/index.html):

(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "entities": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1"
        }
    ],
    "attributes": [
    ],
    "reference": "http://cygnus:5050/notify",
    "duration": "P1M",
    "notifyConditions": [
        {
            "type": "ONCHANGE",
            "condValues": [
                "title",
                "pages",
                "price"
            ]
        }
    ],
    "throttling": "PT5S"
}
EOF
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "contextElements": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1",
            "attributes": [
                {
                    "name": "title",
                    "type": "text",
                    "value": "Game of Thrones: Book 1"
                },
                {
                    "name": "pages",
                    "type": "integer",
                    "value": "927"
                },
                {
                    "name": "price",
                    "type": "float",
                    "value": "18.50"
                }
            ]
        }
    ],
    "updateAction": "APPEND"
}
EOF

After that the error appears in the console.

damianhorna avatar Apr 28 '21 12:04 damianhorna

Thanks for adding the information Damian. The problem seems to be a specific problem with the Kafka sink, which it does not have a high priority on the maintenance list at the present moment by Cygnus development team, so we cannot offer a quick answer to your problem. However, we are open to contributions to get Kafka sink working properly.

mapedraza avatar Apr 28 '21 13:04 mapedraza

Could you also provide how you starts kafka broker?

AlvaroVega avatar Apr 30 '21 09:04 AlvaroVega

I've tried with different versions of Kafka and docker-compose Following one that I used:

version: "3"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  
      
  kafka:
    image: docker.io/bitnami/kafka:2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.142:9092
    depends_on:
      - zookeeper
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  
      
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Started using docker stack deploy -c docker-compose.yml kafka

The result was always the same error.

filgiuff avatar Apr 30 '21 09:04 filgiuff

Hey, im having the same issue. Is there any update on this? Thanks

jaimeventura avatar Dec 22 '21 16:12 jaimeventura