Dapr pubsub with rabbitmq publishing works but subscription not working
I am trying to establish a basic pubsub between two microservices. The publishing seem to be working but subscription is not working.
docker compose:
version: '3.4'
services: ordersapi: image: ${DOCKER_REGISTRY-}cefaloworkshopdaprorderapi build: context: . dockerfile: Source/Services/Order/Cefalo.Workshop.Dapr.Order.Api/Dockerfile ports: - "5100:80" - "63403:443" depends_on: - sqldata2
ordersapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "ordersapi", "-app-port", "80" ] depends_on: - ordersapi network_mode: "service:ordersapi"
paymentsapi: image: ${DOCKER_REGISTRY-}cefaloworkshopdaprpaymentapi build: context: . dockerfile: Source/Services/Payment/Cefalo.Workshop.Dapr.Payment.Api/Dockerfile ports: - "5101:80" - "63404:443" depends_on: - sqldata2
paymentsapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "paymentsapi", "-app-port", "80" ] depends_on: - paymentsapi network_mode: "service:paymentsapi"
aggregatorapi: image: ${DOCKER_REGISTRY-}cefaloworkshopaggregatorapi build: context: . dockerfile: Source/ApiGateways/Aggregator/Cefalo.Workshop.Dapr.HttpAggregator/Dockerfile ports: - "5102:80" - "63405:443"
aggregatorapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "aggregatorapi", "-app-port", "80" ] depends_on: - aggregatorapi network_mode: "service:aggregatorapi"
sqldata2: image: mcr.microsoft.com/mssql/server:2017-latest environment: - SA_PASSWORD=Pass@word - ACCEPT_EULA=Y ports: - "5435:1433"
rabbitmq: image: rabbitmq:3-management-alpine ports: # AMQP protocol port - '5673:5672' # HTTP management UI - '15673:15672'
redis: image: redis:alpine
seq: image: datalust/seq:latest
zipkin: image: openzipkin/zipkin-slim
dapr-placement: image: "daprio/dapr:1.7.2"
Docker Compose Override:
version: '3.4'
services: ordersapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
ordersapi-dapr: command: ["./daprd", "-app-id", "ordersapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"
paymentsapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
paymentsapi-dapr: command: ["./daprd", "-app-id", "paymentsapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"
aggregatorapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
aggregatorapi-dapr: command: ["./daprd", "-app-id", "aggregatorapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"
dapr-placement: command: ["./placement", "-port", "50000", "-log-level", "debug"] ports: - "50000:50000"
zipkin: ports: - "5411:9411"
redis: image: redis:alpine ports: - "5379:6379"
Porgram.cs :
using Cefalo.Workshop.Dapr.Payment.Api.Configuration; using Cefalo.Workshop.Dapr.Payment.Api.EventHandling; using Cefalo.Workshop.Dapr.Payment.ApplicationService.PaymentService; using Cefalo.Workshop.Dapr.Payment.Data;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container. builder.Services.AddData( builder.Environment, builder.Configuration.GetConnectionString("DatabaseConnectionString")); //builder.Services.AddDaprClient(); builder.Services.AddEventBus(); builder.Services.AddTransient<IPaymentService, PaymentService>(); builder.Services.AddScoped<OnOrderPaymentCreationIntegrationEventHandler>();
//Add Automapper builder.Services.AddAutoMapper(AppDomain.CurrentDomain.GetAssemblies());
builder.Services.AddControllers() .AddDapr();
//builder.Services.AddEventBus();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();
// Call UseServiceProviderFactory on the Host sub property //builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory());
// Call ConfigureContainer on the Host sub property //builder.Host.ConfigureContainer<ContainerBuilder>(builder => //{ // //builder.RegisterType<PaymentService>().As<IPaymentService>().InstancePerDependency();
// builder.RegisterAssemblyTypes(Assembly.Load("Cefalo.Workshop.Payment.ApplicationService")) // .Where(t => t.Name.EndsWith("Service")) // .AsImplementedInterfaces() // .InstancePerDependency(); //});
var app = builder.Build();
var pathBase = builder.Configuration["PATH_BASE"]; if (!string.IsNullOrEmpty(pathBase)) { app.UsePathBase(pathBase); }
app.UseCloudEvents();
// Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); }
app.UseHttpsRedirection();
app.UseAuthorization(); app.MapDefaultControllerRoute(); app.MapControllers(); app.MapSubscribeHandler();
app.ApplyDatabaseMigration();
app.Run();
Endpoint:
[Route("api/[controller]")]
[ApiController]
public class IntegrationEventController : ControllerBase
{
private const string DAPR_PUBSUB_NAME = "pubsub";
public Task HandleAsync(
OnOrderPaymentCreationIntegrationEvent @event,
[FromServices] OnOrderPaymentCreationIntegrationEventHandler handler) =>
handler.Handle(@event);
}
pubsub.yaml:
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: cefalo spec: type: pubsub.rabbitmq version: v1 metadata:
- name: host value: "amqp://rabbitmq:5672"
- name: durable value: "false"
- name: deletedWhenUnused value: "false"
- name: autoAck value: "false"
- name: reconnectWait value: "0"
- name: concurrency value: parallel
To me things seem fine. Not sure what went wrong