docs icon indicating copy to clipboard operation
docs copied to clipboard

Dapr pubsub with rabbitmq publishing works but subscription not working

Open mnabi1691 opened this issue 3 years ago • 0 comments

I am trying to establish a basic pubsub between two microservices. The publishing seem to be working but subscription is not working.

docker compose:

version: '3.4'

services: ordersapi: image: ${DOCKER_REGISTRY-}cefaloworkshopdaprorderapi build: context: . dockerfile: Source/Services/Order/Cefalo.Workshop.Dapr.Order.Api/Dockerfile ports: - "5100:80" - "63403:443" depends_on: - sqldata2

ordersapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "ordersapi", "-app-port", "80" ] depends_on: - ordersapi network_mode: "service:ordersapi"

paymentsapi: image: ${DOCKER_REGISTRY-}cefaloworkshopdaprpaymentapi build: context: . dockerfile: Source/Services/Payment/Cefalo.Workshop.Dapr.Payment.Api/Dockerfile ports: - "5101:80" - "63404:443" depends_on: - sqldata2

paymentsapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "paymentsapi", "-app-port", "80" ] depends_on: - paymentsapi network_mode: "service:paymentsapi"

aggregatorapi: image: ${DOCKER_REGISTRY-}cefaloworkshopaggregatorapi build: context: . dockerfile: Source/ApiGateways/Aggregator/Cefalo.Workshop.Dapr.HttpAggregator/Dockerfile ports: - "5102:80" - "63405:443"

aggregatorapi-dapr: image: "daprio/daprd:1.7.2" command: [ "./daprd", "-app-id", "aggregatorapi", "-app-port", "80" ] depends_on: - aggregatorapi network_mode: "service:aggregatorapi"

sqldata2: image: mcr.microsoft.com/mssql/server:2017-latest environment: - SA_PASSWORD=Pass@word - ACCEPT_EULA=Y ports: - "5435:1433"

rabbitmq: image: rabbitmq:3-management-alpine ports: # AMQP protocol port - '5673:5672' # HTTP management UI - '15673:15672'

redis: image: redis:alpine

seq: image: datalust/seq:latest

zipkin: image: openzipkin/zipkin-slim

dapr-placement: image: "daprio/dapr:1.7.2"

Docker Compose Override:

version: '3.4'

services: ordersapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro

ordersapi-dapr: command: ["./daprd", "-app-id", "ordersapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"

paymentsapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro

paymentsapi-dapr: command: ["./daprd", "-app-id", "paymentsapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"

aggregatorapi: environment: - ASPNETCORE_ENVIRONMENT=Development - ASPNETCORE_URLS=https://+:443;http://+:80 ports: - "80" - "443" volumes: - ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro

aggregatorapi-dapr: command: ["./daprd", "-app-id", "aggregatorapi", "-app-port", "80", "-log-level", "debug", "-placement-host-address", "dapr-placement:50000", "-components-path", "/components", "-config", "/configuration/cefalo-config.yaml" ] volumes: - "./dapr/components/:/components" - "./dapr/configuration/:/configuration"

dapr-placement: command: ["./placement", "-port", "50000", "-log-level", "debug"] ports: - "50000:50000"

zipkin: ports: - "5411:9411"

redis: image: redis:alpine ports: - "5379:6379"

Porgram.cs :

using Cefalo.Workshop.Dapr.Payment.Api.Configuration; using Cefalo.Workshop.Dapr.Payment.Api.EventHandling; using Cefalo.Workshop.Dapr.Payment.ApplicationService.PaymentService; using Cefalo.Workshop.Dapr.Payment.Data;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container. builder.Services.AddData( builder.Environment, builder.Configuration.GetConnectionString("DatabaseConnectionString")); //builder.Services.AddDaprClient(); builder.Services.AddEventBus(); builder.Services.AddTransient<IPaymentService, PaymentService>(); builder.Services.AddScoped<OnOrderPaymentCreationIntegrationEventHandler>();

//Add Automapper builder.Services.AddAutoMapper(AppDomain.CurrentDomain.GetAssemblies());

builder.Services.AddControllers() .AddDapr();

//builder.Services.AddEventBus();

// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();

// Call UseServiceProviderFactory on the Host sub property //builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory());

// Call ConfigureContainer on the Host sub property //builder.Host.ConfigureContainer<ContainerBuilder>(builder => //{ // //builder.RegisterType<PaymentService>().As<IPaymentService>().InstancePerDependency();

// builder.RegisterAssemblyTypes(Assembly.Load("Cefalo.Workshop.Payment.ApplicationService")) // .Where(t => t.Name.EndsWith("Service")) // .AsImplementedInterfaces() // .InstancePerDependency(); //});

var app = builder.Build();

var pathBase = builder.Configuration["PATH_BASE"]; if (!string.IsNullOrEmpty(pathBase)) { app.UsePathBase(pathBase); }

app.UseCloudEvents();

// Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); }

app.UseHttpsRedirection();

app.UseAuthorization(); app.MapDefaultControllerRoute(); app.MapControllers(); app.MapSubscribeHandler();

app.ApplyDatabaseMigration();

app.Run();

Endpoint:

[Route("api/[controller]")]
[ApiController]
public class IntegrationEventController : ControllerBase
{
    private const string DAPR_PUBSUB_NAME = "pubsub";

    public Task HandleAsync(
    OnOrderPaymentCreationIntegrationEvent @event,
    [FromServices] OnOrderPaymentCreationIntegrationEventHandler handler) =>
    handler.Handle(@event);
 }

pubsub.yaml:

apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: cefalo spec: type: pubsub.rabbitmq version: v1 metadata:

  • name: host value: "amqp://rabbitmq:5672"
  • name: durable value: "false"
  • name: deletedWhenUnused value: "false"
  • name: autoAck value: "false"
  • name: reconnectWait value: "0"
  • name: concurrency value: parallel

To me things seem fine. Not sure what went wrong

mnabi1691 avatar Jul 06 '22 04:07 mnabi1691