newrelic-python-agent icon indicating copy to clipboard operation
newrelic-python-agent copied to clipboard

Error in NewRelic on publishing to kafka in Python

Open mishikaraj opened this issue 1 year ago • 3 comments

Description While publishing message to apache kafka , getting error with Newrelic Message Transaction - Error 'MessageTransaction' object has no attribute 'destination_name' code

producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        )
producer.send(topic, value=event_data).get()

Error Stack Trace

File "/Users/mishika/supply-metrics/app/kafka_manager/base_consumer_v2.py", line 55, in produce
    self.producer.send(topic, value=event_data, key=key).get()
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 68, in wrap_KafkaProducer_send
    return wrapped(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 581, in send
    value_bytes = self._serialize(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 714, in _serialize
    return f(data)
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 203, in _wrap_serializer
    topic = transaction.destination_name
AttributeError: 'MessageTransaction' object has no attribute 'destination_name'

newrelic.ini file

[newrelic]
log_level = debug
high_security = false
transaction_tracer.enabled = true
transaction_tracer.transaction_threshold = apdex_f
transaction_tracer.record_sql = obfuscated
transaction_tracer.stack_trace_threshold = 0.5
transaction_tracer.explain_enabled = true
transaction_tracer.explain_threshold = 0.5
transaction_tracer.function_trace =
error_collector.enabled = true
error_collector.ignore_errors = pycommon.exceptions.common_exceptions:DuplicateError rest_framework.exceptions:ValidationError rest_framework.exceptions:NotFound rest_framework.exceptions:ParseError
browser_monitoring.auto_instrument = true
thread_profiler.enabled = true
distributed_tracing.enabled = false
app_name = 
monitor_mode = true
license_key =  

Expected Behavior Expected message should have sent to kafka topic

Steps to Reproduce

Your Environment

  • Application Python version : 3.10
  • kafka-python==2.0.2
  • newrelic==9.4.0

mishikaraj avatar Jan 24 '24 04:01 mishikaraj

Hi @mishikaraj --out of curiosity, how are you setting up the consumer? My concern with the proposed solution is that there is some other issue that is being masked; destination_name should get set as the topic from ConsumerRecord (which is returned from the consumer iterator)

This is the sample app I am using, so I want to make sure I am not missing an obvious implementation:

import json
import time
from datetime import datetime
from threading import Thread

import kafka

TOPIC = "test-topic-%d" % datetime.now().timestamp()
BROKERS = ["localhost:9092"]


consumer = kafka.KafkaConsumer(
    TOPIC,
    bootstrap_servers=BROKERS,
    client_id="whatsup",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    consumer_timeout_ms=5000,
    fetch_max_wait_ms=304999,
)
producer = kafka.KafkaProducer(
    bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

def consume():
    print("Starting consumer...")
    for message in consumer:
        print(f"Recieved {message.value}")
    print("Consumer finished.")

def produce():
    print("Starting producer...")
    for json_message in [
        {"foo": "bar"},
        {"baz": "bat"},
        {"user1": "Hello!"},
        {"user2": "Hola!"},
    ]:  
        time.sleep(1)
        producer.send(TOPIC, value=json_message).get()
    producer.flush()
    print("Producer finished.")

def main():
    t1 = Thread(target=produce)
    t2 = Thread(target=consume)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finished.")
    consumer.close()
    producer.close()


if __name__ == "__main__":
    main()

lrafeei avatar Jan 24 '24 20:01 lrafeei

Hii @lrafeei , For Temporary Fix I had Disabled kafka producer transaction on newrelic by keeping below in newrelic.ini file and was able to send the message to kafka

[import-hook:kafka.producer.kafka] enabled = false

StackOverFlow Link

And for answering to your above question, Yes I have initialized my consumer in the same way

self.consumer = KafkaConsumer(
            self.topic,
            group_id=self.group_id,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda x: json.loads(v.decode("utf-8"))
            enable_auto_commit=False,
            auto_offset_reset="latest",
            max_poll_records=500,
            **consumer_kwargs,
        )

Please let me know if you need any other help with this and what should be the fix we can look for enabling back the kafka producer transaction on newrelic for kafka producer

mishikaraj avatar Feb 09 '24 11:02 mishikaraj

Sorry about the delay, I thought I'd get an alert when you replied! Looking at this now.

lrafeei avatar Mar 28 '24 16:03 lrafeei

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Apr 26 '25 12:04 stale[bot]

https://new-relic.atlassian.net/browse/NR-407818

Looks similar to https://github.com/newrelic/newrelic-python-agent/issues/1363

JestemStefan avatar Jun 14 '25 11:06 JestemStefan