conductor-community icon indicating copy to clipboard operation
conductor-community copied to clipboard

Are MySQL Events working? (Also PostgreSQL)

Open TwoUnderscorez opened this issue 4 years ago • 21 comments

Hi,

Are MySQL events working? When following the instructions here everything works as expected, but when adding:

{
  "name": "test_workflow_for_eventHandler2",
  "description": "A test workflow to start another workflow with EventHandler",
  "version": 1,
  "tasks": [
    {
      "name": "test_start_workflow_event2",
      "taskReferenceName": "start_workflow_with_event2",
      "type": "EVENT",
      "sink": "conductor"
    },
    {
      "name": "test_task_tobe_completed_by_eventHandler2",
      "taskReferenceName": "test_task_tobe_completed_by_eventHandler2",
      "type": "WAIT"
    }
  ]
}
{
  "name": "test_workflow_startedBy_eventHandler2",
  "description": "A test workflow which is started by EventHandler, and then goes on to complete task in another workflow.",
  "version": 1,
  "tasks": [
    {
      "name": "test_complete_task_event2",
      "taskReferenceName": "complete_task_with_event2",
      "inputParameters": {
        "sourceWorkflowId": "${workflow.input.sourceWorkflowId}"
      },
      "type": "EVENT",
      "sink": "conductor"
    }
  ]
}

{
  "name": "test_start_workflow2",
  "event": "conductor:test_workflow_for_eventHandler2:start_workflow_with_event2",
  "actions": [
    {
      "action": "start_workflow",
      "start_workflow": {
        "name": "test_workflow_startedBy_eventHandler2",
        "input": {
          "sourceWorkflowId": "${workflowInstanceId}"
        }
      }
    }
  ],
  "active": true
}

{
  "name": "test_complete_task_event2",
  "event": "conductor:test_workflow_startedBy_eventHandler2:complete_task_with_event2",
  "actions": [
    {
        "action": "complete_task",
        "complete_task": {
            "workflowId": "${sourceWorkflowId}",
            "taskRefName": "test_task_tobe_completed_by_eventHandler"
         }
    }
  ],
  "active": true
}

then starting test_workflow_for_eventHandler2 the event works, it turns green, but the test_workflow_startedBy_eventHandler2 does not start. Am I missing something?

Thanks.

TwoUnderscorez avatar Jan 04 '21 14:01 TwoUnderscorez

In the example, there is no workflow called test_workflow_startedBy_eventHandler2. Do you have the right workflow name in the "actions" of the event handler?

aravindanr avatar Jan 05 '21 00:01 aravindanr

@aravindanr You might have misunderstood me. I know there isn't a test_workflow_startedBy_eventHandler2, but I've defined one, and it's just like the one in the example and therefore it should work. Shouldn't it?

TwoUnderscorez avatar Jan 05 '21 04:01 TwoUnderscorez

Using POSTGRES,

I am facing similar issue, the messages are present in queue_message Table, with popped column as false.

sanyamsidana avatar Jan 05 '21 13:01 sanyamsidana

The same goes here, using Postgres and Conductor internal events.

I'm using Postgres(11) persistence and also tried it out with the example provided here https://netflix.github.io/conductor/labs/eventhandlers/ still couldn't succeed. The second workflow never starts. I just checked the database it looks fine to me. (Queue messages are written etc.)

However, I have another version of an event handler that completes the WAIT task, and that mechanism is working.

As a workaround, I'm calling conductor /workflow endpoint to start the other workflow with HTTP task (I'm using Conductor Version: 2.30.3 with ES 6)

morbeleg avatar Jan 05 '21 22:01 morbeleg

We're also using @morbeleg 's workaround but it's kind of ugly and therefore would like for this to be marked as a bug so it can be eventually fixed.

TwoUnderscorez avatar Jan 06 '21 05:01 TwoUnderscorez

@TwoUnderscorez Please ~~create~~ mark this a bug. Note that we (Netflix) does not use or maintain the mysql-persistence and postgres-persistence modules. They are contributions from the community. We do our best to answer questions about them, but it's unlikely that we will address this anytime soon.

aravindanr avatar Jan 08 '21 07:01 aravindanr

@aravindanr ~~How do I create a bug? What do you mean?~~ Thanks.

TwoUnderscorez avatar Jan 08 '21 10:01 TwoUnderscorez

This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.

github-actions[bot] avatar Apr 07 '21 02:04 github-actions[bot]

Recently after updating to latest conductor version v2.31.2 from v2.28.0,

event other than “conductor:test_workflow_startedBy_eventHandler:complete_task_with_event” are not working.

I tried with the below definitions.

Workflow with wait task

{
  "name": "test_workflow_with_wait_task",
  "description": "A test workflow with wait task",
  "version": 1,
  "ownerEmail": "[email protected]",
  "tasks": [
    {
      "name": "wait_for_external_execution_completion",
      "taskReferenceName": "wait_for_external_execution_completion",
      "type": "WAIT"
    }
  ]
}
Screenshot 2021-04-07 at 12 45 15 PM

workflow with event task

{
  "name": "test_workflow_with_event_task",
  "description": "A test workflow to complete task in another workflow.",
  "version": 1,
  "ownerEmail": "[email protected]",
  "tasks": [
    {
        "name": "choose_task",
        "taskReferenceName": "choose_task",
        "type": "LAMBDA",
        "inputParameters": {
            "lambdaValue": "${workflow.input.taskRefName}",
            "scriptExpression": "if ($.lambdaValue) {return $.lambdaValue} else {return 'wait_for_external_execution_completion'}"
        }
    },
    {
      "name": "complete_external_task",
      "taskReferenceName": "complete_task_with_event",
      "inputParameters": {
        "workflowId": "${workflow.input.callbackId}",
        "status": "${workflow.input.status}",
        "output": "${workflow.input.output}",
        "logs": "${workflow.input.logs}",
        "taskRefName": "${choose_task.output.result}"
      },
      "type": "EVENT",
      "sink": "conductor"
    }
  ],
  "inputParameters": [
        "taskRefName",
        "status",
        "callbackId",
        "output",
        "logs"
    ]
}
Screenshot 2021-04-07 at 12 52 48 PM

event

{
  "name": "test_event_with_complete_task_action",
  "event": "conductor:test_workflow_with_event_task:complete_task_with_event",
  "actions": [
    {
        "action": "complete_task",
        "complete_task": {
            "workflowId": "${workflowId}",
            "taskRefName": "${taskRefName}",
            "output": {
                "status": "${status}",
                "output": "${output}",
                "logs": "${logs}"
        }
         }
    }
  ],
  "active": true
}
Screenshot 2021-04-07 at 1 02 37 PM

Workflow started using POST /workflow

Screenshot 2021-04-07 at 1 04 02 PM workflowId: **00dff09e-b4f1-4380-a86a-61cd761348ec**

workflow with event task started using POST /workflow

Screenshot 2021-04-07 at 1 07 59 PM completed, and input/output screenshot Screenshot 2021-04-07 at 1 09 17 PM

AND still waiting workflow is in running. Screenshot 2021-04-07 at 1 10 24 PM

PostgreSQL 11 db screenshots queue table Screenshot 2021-04-07 at 1 27 48 PM queue_message table Screenshot 2021-04-07 at 1 16 24 PM payload -

{
  "output": "THIS IS o/p",
  "taskRefName": "wait_for_external_execution_completion",
  "asyncComplete": false,
  "sink": "conductor",
  "workflowType": "test_workflow_with_event_task",
  "correlationId": null,
  "workflowVersion": 1,
  "logs": "sample logs",
  "workflowInstanceId": "248ffbf2-09e1-45d8-88f3-767b2930629f",
  "workflowId": "00dff09e-b4f1-4380-a86a-61cd761348ec",
  "status": "SUCCESS"
}

this always remains as popped false

sanyamsidana avatar Apr 07 '21 07:04 sanyamsidana

Though, these events and workflows with same definition are working in v2.28.0

screenshot from v2.28.0 Screenshot 2021-04-07 at 1 52 12 PM

sanyamsidana avatar Apr 07 '21 08:04 sanyamsidana

@sanyamsidana I looked at the configuration and everything seems correctly configured. Unfortunately, we (Netflix) does not use or maintain the mysql-persistence and postgres-persistence modules. They are contributions from the community. I am tagging some users who might be able to help with this one.

@rickfish @mactaggart @picaron

aravindanr avatar Apr 12 '21 23:04 aravindanr

Not sure if this is the same problem but see issue Netflix/conductor#1806 (it is closed now). If you are running in a kubernetes implementation, your pod might be using one logical core. There was a change made (PR Netflix/conductor#1748) that defaulted the number of threads in the event processing scheduler to the number of cores on the box. That change made it so that if you use the default of the new 'workflow.event.queue.scheduler.poll.thread.count' property, only one type of event would ever be processed if you have one core. If you are not running in a kubernetes platform or you only have one event type then this probably won't solve your problem.

The solution is to change DynoObservableQueue's OnSubscribe() method from this: Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS, scheduler); to this: Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS, Schedulers.io());

I thought it was fixed, i.e. reverted back to the original code before that PR changed it, because the person making that change indicated that it was going to be reverted. Maybe that never was done.

rickfish avatar Apr 13 '21 12:04 rickfish

Yeah @rickfish is right. Unfortunately, that PR was "closed" because it just got stale :( So we still probably need that reverted.

Another option, which we've done temporarily until that core issue can be addressed, is just tweaked a setting to handle it:

            workflow_event_queue_scheduler_poll_thread_count: "300",

Now, that's a problem if you have more than 300 events, you'll be in the same boat. For us this is fine because we don't have even close to that many. But this setting should help in the short time too.

james-deee avatar Apr 15 '21 00:04 james-deee

@rickfish This fix worked for us. Should I open a PR to change it back to Schedulers.io()? Now in https://github.com/Netflix/conductor/blob/d32ad1622740221398b48589a44079febbcdc5fb/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java I assume.

TwoUnderscorez avatar Apr 30 '21 19:04 TwoUnderscorez

See Netflix/conductor#1987 , @mdepak can you please comment on the changes and how you are attempting to fix this? Thanks

apanicker-nflx avatar May 10 '21 23:05 apanicker-nflx

@apanicker-nflx I noticed you merged Netflix/conductor#1987 which fixed the issue for v2. Any updates on plans to fix v3?

TwoUnderscorez avatar Jul 25 '21 09:07 TwoUnderscorez

@TwoUnderscorez we would want to pull in this fix into v3. If you can get to this soon, we would appreciate a pull request for the same ;). Thanks

apanicker-nflx avatar Jul 27 '21 00:07 apanicker-nflx

@apanicker-nflx @aravindanr Is there an update on V3? It looks like it still has the same bug?

In ConductorProperties:

    /**
     * The number of threads to be usde in Scheduler used for polling events from multiple event
     * queues. By default, a thread count equal to the number of CPU cores is chosen.
     */
    private int eventQueueSchedulerPollThreadCount = Runtime.getRuntime().availableProcessors();

availableProcessors() is what the bug was that was introduced. If people run in envs like k8s, and they dont specify a thread count, then this is currently broken.

That's why https://github.com/Netflix/conductor/pull/1987 was made.

james-deee avatar Jun 01 '22 12:06 james-deee

@james-deee could you elaborate in what way its broken? Conductor is not run in K8s in Netflix, so we don't have an environment to replicate the issue. Does Runtime.getRuntime().availableProcessors() return 0 in k8s?

aravindanr avatar Jun 06 '22 21:06 aravindanr

@aravindanr yeah we're hitting this same issue. I'm not sure why this got reverted back to the old (wrong way) of doing this. See the comment here: https://github.com/Netflix/conductor/pull/1987#issuecomment-737420051

So in our case, that value was 1 or 2. So we only ever got to process 1 or 2 events. Now, we are overriding the eventQueueSchedulerPollThreadCount via an env var so we got past this. But it's 100% definitely still a bug.

Whatever Runtime.getRuntime().availableProcessors(); returns, is the number of events you'll be capped at processing.

james-deee avatar Aug 10 '22 16:08 james-deee

Hi,

I'm experiencing the same issue with Postgres. When following the instructions provided, the event in "test_workflow_for_eventHandler2" works as expected and turns green. However, "test_workflow_startedBy_eventHandler2" does not start in Postgres. I'm wondering if I'm missing something.

Thanks.

yohanyflores avatar May 26 '23 18:05 yohanyflores