solace-samples-python
solace-samples-python copied to clipboard
[Bug]: negative ack
Bug Description
i'm using the NACK here both fail and reject is working as same like both cases messages are going to be nack messages are in queue ,for fail condition when we run the redelivery script messages are consumed but it's deleting from the queue reject case messages deleted from the queue based on solace concept but my case messages are in queue
Expected Behavior
Fail and reject must work on differently
Steps to Reproduce
import os import platform import time import argparse from dotenv import load_dotenv
from solace.messaging.messaging_service import MessagingService, ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener, ServiceEvent from solace.messaging.resources.queue import Queue from solace.messaging.config.retry_strategy import RetryStrategy from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage from solace.messaging.errors.pubsubplus_client_error import PubSubPlusClientError from solace.messaging.config.missing_resources_creation_configuration import MissingResourcesCreationStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config.authentication_strategy import ClientCertificateAuthentication
Load environment variables from a .env file (if available)
load_dotenv()
Set up the environment variable for disabling stdout buffering on Windows
if platform.uname().system == 'Windows': os.environ["PYTHONUNBUFFERED"] = "1" # Disable stdout buffer
Parse command-line arguments using argparse
parser = argparse.ArgumentParser(description="BTP AEM consumer CLI") parser.add_argument('-q', '--queue', type=str, required=True, help='Valid AEM queue name') parser.add_argument('-a', '--ack', type=str, choices=['accept', 'reject', 'fail'], default='accept', help='Acknowledgement mode (accept, reject, fail)') args = parser.parse_args()
QUEUE_NAME = args.queue # Get queue name from CLI argument ACK_MODE = args.ack # Get ack mode from CLI argument
Handle received messages
class MessageHandlerImpl(MessageHandler): def init(self, persistent_receiver: PersistentMessageReceiver): self.receiver: PersistentMessageReceiver = persistent_receiver
def on_message(self, message: InboundMessage):
payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
print(f"Received a message of type: {type(payload)}. Decoding to string")
payload = payload.decode()
topic = message.get_destination_name()
print("\n" + f"Received message on: {topic}")
print("\n" + f"Message payload: {payload} \n")
# Acknowledge based on ack mode passed via CLI
if ACK_MODE == 'accept':
self.receiver.ack(message)
elif ACK_MODE == 'reject':
self.receiver.reject(message)
elif ACK_MODE == 'fail':
self.receiver.fail(message)
Inner classes for error handling
class ServiceEventHandler(ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener): def on_reconnected(self, e: ServiceEvent): print("\non_reconnected") print(f"Error cause: {e.get_cause()}") print(f"Message: {e.get_message()}")
def on_reconnecting(self, e: "ServiceEvent"):
print("\non_reconnecting")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
def on_service_interrupted(self, e: "ServiceEvent"):
print("\non_service_interrupted")
print(f"Error cause: {e.get_cause()}")
print(f"Message: {e.get_message()}")
Broker Config from environment variables
broker_props = { "solace.messaging.transport.host": os.getenv("AEM_HOST"), "solace.messaging.service.vpn-name": os.getenv("AEM_VPN_NAME"), }
Set up SSL Context for client certificate authentication
ssl_cert_dir = os.getenv("AEM_SSL_CERT_DIR") client_cert_file = os.getenv("AEM_CERT_FILE") client_key_file = os.getenv("AEM_CLIENT_KEY_FILE") client_key_password = os.getenv("AEM_CLIENT_KEY_PASSWORD")
Configure the transport security with client certificate authentication
transport_security = TLS.create()
.with_certificate_validation(True, validate_server_name=False, trust_store_file_path=ssl_cert_dir)
Configure client certificate authentication
client_cert_authentication = ClientCertificateAuthentication.of( certificate_file=client_cert_file, key_file=client_key_file, key_password=client_key_password )
Build MessagingService with authentication strategy and transport security
messaging_service = MessagingService.builder()
.from_properties(broker_props)
.with_transport_security_strategy(transport_security)
.with_authentication_strategy(client_cert_authentication)
.build()
Event Handler for the messaging service
service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) messaging_service.add_reconnection_attempt_listener(service_handler) messaging_service.add_service_interruption_listener(service_handler)
Blocking connect thread
messaging_service.connect() print(f'Messaging Service connected? {messaging_service.is_connected}')
Event Handling for the messaging service
Queue name from the command-line argument
durable_exclusive_queue = Queue.durable_exclusive_queue(QUEUE_NAME)
try:
# Build a receiver and bind it to the durable exclusive queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(MissingResourcesCreationStrategy.CREATE_ON_START)
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print('\nKeyboardInterrupt received')
except PubSubPlusClientError as exception: print(f'\nMake sure queue {QUEUE_NAME} exists on the broker!')
finally: if persistent_receiver and persistent_receiver.is_running(): print('\nTerminating receiver') persistent_receiver.terminate(grace_period=0) print('\nDisconnecting Messaging Service') messaging_service.disconnect()
Solace Product
Solace PubSub+ Event Portal
Solace Broker version
No response
Solace API
python
Solace API version
No response