newrelic-python-agent
newrelic-python-agent copied to clipboard
Error in NewRelic on publishing to kafka in Python
Description While publishing message to apache kafka , getting error with Newrelic Message Transaction - Error 'MessageTransaction' object has no attribute 'destination_name' code
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)
producer.send(topic, value=event_data).get()
Error Stack Trace
File "/Users/mishika/supply-metrics/app/kafka_manager/base_consumer_v2.py", line 55, in produce
self.producer.send(topic, value=event_data, key=key).get()
File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 68, in wrap_KafkaProducer_send
return wrapped(
File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 581, in send
value_bytes = self._serialize(
File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 714, in _serialize
return f(data)
File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 203, in _wrap_serializer
topic = transaction.destination_name
AttributeError: 'MessageTransaction' object has no attribute 'destination_name'
newrelic.ini file
[newrelic]
log_level = debug
high_security = false
transaction_tracer.enabled = true
transaction_tracer.transaction_threshold = apdex_f
transaction_tracer.record_sql = obfuscated
transaction_tracer.stack_trace_threshold = 0.5
transaction_tracer.explain_enabled = true
transaction_tracer.explain_threshold = 0.5
transaction_tracer.function_trace =
error_collector.enabled = true
error_collector.ignore_errors = pycommon.exceptions.common_exceptions:DuplicateError rest_framework.exceptions:ValidationError rest_framework.exceptions:NotFound rest_framework.exceptions:ParseError
browser_monitoring.auto_instrument = true
thread_profiler.enabled = true
distributed_tracing.enabled = false
app_name =
monitor_mode = true
license_key =
Expected Behavior Expected message should have sent to kafka topic
Steps to Reproduce
- kept newrelic.ini file locally and ran kafka consumer , will be able to above stack trace
- Converting topic = transaction.destination_name to topic = getattr(transaction, "destination_name","Default") in newrelic-python-agent lib seems to fix the issue
Your Environment
- Application Python version : 3.10
- kafka-python==2.0.2
- newrelic==9.4.0
Hi @mishikaraj --out of curiosity, how are you setting up the consumer? My concern with the proposed solution is that there is some other issue that is being masked; destination_name should get set as the topic from ConsumerRecord (which is returned from the consumer iterator)
This is the sample app I am using, so I want to make sure I am not missing an obvious implementation:
import json
import time
from datetime import datetime
from threading import Thread
import kafka
TOPIC = "test-topic-%d" % datetime.now().timestamp()
BROKERS = ["localhost:9092"]
consumer = kafka.KafkaConsumer(
TOPIC,
bootstrap_servers=BROKERS,
client_id="whatsup",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
auto_offset_reset="earliest",
consumer_timeout_ms=5000,
fetch_max_wait_ms=304999,
)
producer = kafka.KafkaProducer(
bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
def consume():
print("Starting consumer...")
for message in consumer:
print(f"Recieved {message.value}")
print("Consumer finished.")
def produce():
print("Starting producer...")
for json_message in [
{"foo": "bar"},
{"baz": "bat"},
{"user1": "Hello!"},
{"user2": "Hola!"},
]:
time.sleep(1)
producer.send(TOPIC, value=json_message).get()
producer.flush()
print("Producer finished.")
def main():
t1 = Thread(target=produce)
t2 = Thread(target=consume)
t1.start()
t2.start()
t1.join()
t2.join()
print("Finished.")
consumer.close()
producer.close()
if __name__ == "__main__":
main()
Hii @lrafeei , For Temporary Fix I had Disabled kafka producer transaction on newrelic by keeping below in newrelic.ini file and was able to send the message to kafka
[import-hook:kafka.producer.kafka] enabled = false
And for answering to your above question, Yes I have initialized my consumer in the same way
self.consumer = KafkaConsumer(
self.topic,
group_id=self.group_id,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda x: json.loads(v.decode("utf-8"))
enable_auto_commit=False,
auto_offset_reset="latest",
max_poll_records=500,
**consumer_kwargs,
)
Please let me know if you need any other help with this and what should be the fix we can look for enabling back the kafka producer transaction on newrelic for kafka producer
Sorry about the delay, I thought I'd get an alert when you replied! Looking at this now.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
https://new-relic.atlassian.net/browse/NR-407818
Looks similar to https://github.com/newrelic/newrelic-python-agent/issues/1363