solace-samples-python icon indicating copy to clipboard operation
solace-samples-python copied to clipboard

[Bug]: negative ack

Open sowjanya1117 opened this issue 7 months ago • 12 comments

Bug Description

i'm using the NACK here both fail and reject is working as same like both cases messages are going to be nack messages are in queue ,for fail condition when we run the redelivery script messages are consumed but it's deleting from the queue reject case messages deleted from the queue based on solace concept but my case messages are in queue

Expected Behavior

Fail and reject must work on differently

Steps to Reproduce

import os import platform import time import argparse from dotenv import load_dotenv

from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication

Load environment variables from a .env file (if available)

load_dotenv()

Set up the environment variable for disabling stdout buffering on Windows

if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer

Parse command-line arguments using argparse

parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()

QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument

Handle received messages

class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver

def on_message(self, message: InboundMessage):
    payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
    if isinstance(payload, bytearray):
        print(f"Received a message of type: {type(payload)}. Decoding to string")
        payload = payload.decode()

    topic = message.get_destination_name()
    print("\n" + f"Received message on: {topic}")
    print("\n" + f"Message payload: {payload} \n")
    
    # Acknowledge based on ack mode passed via CLI
    if ACK_MODE == 'accept':
        self.receiver.ack(message)
    elif ACK_MODE == 'reject':
        self.receiver.reject(message)
    elif ACK_MODE == 'fail':
        self.receiver.fail(message)

Inner classes for error handling

class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")

def on_reconnecting(self, e: "ServiceEvent"):
    print("\non_reconnecting")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

def on_service_interrupted(self, e: "ServiceEvent"):
    print("\non_service_interrupted")
    print(f"Error cause: {e.get_cause()}")
    print(f"Message: {e.get_message()}")

Broker Config from environment variables

broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }

Set up SSL Context for client certificate authentication

ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")

Configure the transport security with client certificate authentication

transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)

Configure client certificate authentication

client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )

Build MessagingService with authentication strategy and transport security

messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()

Event Handler for the messaging service

service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)

Blocking connect thread

messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')

Event Handling for the messaging service

Queue name from the command-line argument

durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)

try: # Build a receiver and bind it to the durable exclusive queue persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue) persistent_receiver.start()

# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')

try: 
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print('\nKeyboardInterrupt received')

except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')

finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()

Solace Product

Solace PubSub+ Event Portal

Solace Broker version

No response

Solace API

python

Solace API version

No response

sowjanya1117 avatar Apr 21 '25 17:04 sowjanya1117