conductor icon indicating copy to clipboard operation
conductor copied to clipboard

batchPoll with count=1 in TaskServiceImpel return many tasks

Open shlomibd opened this issue 11 months ago • 1 comments

Describe the bug I am using client 4.0.4 with TaskRunner - it is requesting 1 task but from time to time it gets more than 1 task and it crash the client.

Details Conductor version: 3.21.11 Persistence implementation: Postgres Queue implementation: Postgres Lock: ? Workflow definition: Task definition: Event handler definition:

To Reproduce Steps to reproduce the behavior: just using client with 1 task count the workflow has 1000 dynamic forked subworkflows with the task being polled and another task

Image

subworkflow:

Image

Expected behavior A clear and concise description of what you expected to happen.

Screenshots If applicable, add screenshots to help explain your problem.

Additional context logs from the client: 2025-01-23 08:16:14.604 TRACE [TaskRunner 1] [] c.n.c.c.a.TaskRunner : Polling task of type: task.aluma.dferip.v2 in domain: 'null' with size 1 2025-01-23 08:16:14.604 TRACE [TaskRunner 1] [] c.n.c.c.a.TaskRunner : in memory queue size for tasks: 0
2025-01-23 08:16:14.604 DEBUG [TaskRunner 1] [] c.n.c.c.w.Worker : Setting worker id to worker-rip-deployment-996bbc489-fhlrz
2025-01-23 08:16:14.604 DEBUG [TaskRunner 1] [] c.n.c.c.a.TaskRunner : poll task.aluma.dferip.v2 in the domain null with batch size 1
2025-01-23 08:16:23.864 ERROR [TaskRunner 1] [] c.n.c.c.a.TaskRunner : null java.lang.IllegalArgumentException: null
at java.base/java.util.concurrent.Semaphore.release(Semaphore.java:618) at com.netflix.conductor.client.automator.TaskRunner.pollTasksForWorker(TaskRunner.java:242) at com.netflix.conductor.client.automator.TaskRunner.pollAndExecute(TaskRunner.java:152) at com.netflix.conductor.client.automator.TaskRunnerConfigurer.startWorker(TaskRunnerConfigurer.java:170) at com.netflix.conductor.client.automator.TaskRunnerConfigurer.lambda$init$0(TaskRunnerConfigurer.java:133) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)

log in the server for this request: Jan 23, 2025 @ 10:16:23.792 DEBUG com.netflix.conductor.service.TaskServiceImpl The Tasks [8bdc01f1-0403-4e1f-a587-2a93edf7eb1c, 7e3b5351-5d92-4135-88f9-95a94c83fbe3, fb002c2d-d485-4b8a-8129-a12fabebb31e, 11bef6c1-3832-4094-bba1-a6b3b39300e6, f9b272a2-c23e-4173-9f75-c028247ee9e4, 5d47fba0-9f64-48a2-8d29-d8244aaf3913, a672d6f0-2768-4f12-8a0f-008067c960ad, 6700d66f-6765-44f6-a36e-f2310076ba3d, 0c95720a-74a3-46b3-9be3-ab0867168295, 12bf17ce-daa1-45a7-b15a-4d0904f0d038, eb177fb3-db37-415c-aa44-c846721c2bb9, 65f2f0cc-8c05-4426-9467-49a09e70fc64, 3c2f8865-c8d3-4f4f-9c19-ecbfea5b47ae, 92d13b88-8fb4-4672-90f4-a19e1378b49b, 9c33bd01-6e03-49fd-8859-da381f16393b, fce269c3-91c4-46e4-80f8-310ed87694a1, 1786ab62-8d8f-4015-b453-cf6a432c2e57, 1f547f18-c75d-4a71-aa5b-5c8e0c88de46, 4a244d13-84fa-4377-9df5-49dc7c2c3bce, d13e9a89-6828-4291-a677-03ff3749c0e5, 65cfb83b-364c-4b7b-be3c-2a2d4b5e1ebc, 5acbab12-d46b-4270-9a6e-fb6f12df19e1, 8cdd75fb-ecfb-4bec-a89e-aa74c31c428f, e79ca92c-f4ab-45bd-90f0-18dadf39ee14, 3c2aa4e0-d848-4935-be50-26943f2b6409, a90a9d71-cb31-4966-9668-ab83c494a2b0, 0bd8d906-b7a4-4cbe-b15f-95989a5dc9db, a4174c1e-ccd3-4892-bd3e-58e38a84988a, ac083c13-ff47-48a9-a68f-2ab41e521c88, ff22a671-f5f5-4579-992f-0f9db4375e15, aba84253-79b5-4d6e-a675-efca545d6ee4, 2f8ddb1a-b8a2-4db5-a45b-8ef9ef1597f4, 32d20d25-755e-4b8f-be15-47879b324845, 27a1f121-5ce2-4291-96b3-ce39411d4a46, 86549f64-3811-44ae-b322-eeeea4dab358, 222dbbda-5b6b-4218-ac1a-ab3a83bffc4b, 532f36c3-ee06-4789-adc3-6ee7d1d80b83, dab5ef2d-aa66-404b-a056-d5c37d0cc450, 45ef8416-76d1-4dbe-a372-a3c273abf97e, 46de4fd2-28c4-49f6-93b7-2c520548e9e0, 45cfcb33-453d-4caa-89f1-2c36c1805b1d, 2510f015-49c7-4dc4-916e-5b1faaca54bb, f2748e86-e41d-42d5-95c0-b86e57f62bd9, 2cc95790-0f9b-4d61-9a32-2f00cd46eafc, 0fab7fe4-ce21-4a3a-8570-066c0bfb4e37, 9bae14db-2b50-4971-bbbe-bf9968128d58, 071d69b5-c9b3-401d-beb3-85f60e56706b, ab40a415-0dda-437d-9a98-eef204ee7399, 8ade396a-b252-475a-98e9-131669fd9c19, 3f82f286-2dd8-4f00-b564-555b110bd04b, c418f39a-f768-43a0-a9b7-90118a350477, 6824b166-4983-4f6c-9d16-829264c72391, 5c36a953-ea6d-4712-ac7a-b37784830b21, a9003f65-97e8-4f9d-9d6b-ae17c296582a, 726eace7-f7bc-49dc-93f1-6cd3d0673dd8, b3b1ae51-eae7-4db2-b318-f7cddb5e524f, 61ca1979-4636-453b-934d-3e07f12a0a50, ff5af1ee-7849-4b42-919c-0ac50eb8100b, 34a6fe91-6e96-4acb-80cf-d63dcab7c2ad, 1a56d0c9-c5cc-4bfd-98f4-38998223d402, 70b07d9f-4948-4328-8592-05506f4b4efa, 8cb8288f-b979-4529-9e8e-69b5e58891f8, 042757ef-406e-49f8-896a-d272b1aead1a, 9a6ea95b-a468-4ec9-b60e-d080019d2662, 3f513134-7831-4a37-a306-dd5551a616b2, 2c404583-3d9d-4660-9ebd-d398bfdd30b3, af1f3a5c-5463-40a2-b5e1-402af272b3e0, d5ca0935-ea7a-47fb-bb48-33b029a99c36, 06d5a7a5-72f0-463b-8ff0-3a54b24768a7, 9a7e2ce0-aefc-40ae-8cfa-331f6cc1ad48, 422069da-cfb2-4de4-a1b3-5fb9e8989828, 7c7339e7-5dbc-4d8d-808c-4d062e1e1868, 3e5e5cb1-9ada-4c5e-8268-91755722b570, 9b328f39-1d33-4aff-b3e4-d3140d4465ae, 9523aa90-52cb-4e1d-a3a9-1bf94ca15985, 6167ebbb-0c6c-4071-9df5-f0b23be58a86, f661642a-e7f3-4113-a7dd-ff806649c56a, 94e4aa36-faf2-438a-9192-51eddda41b4a, 459463ce-7d73-4341-8997-b4a1396f3511, 9b9a3435-d76d-4ddc-aca3-7a3acc5fa6d3, 88e55a24-c468-40e1-8707-42b6e4ad7d25, b2736d62-83a1-42a5-872b-36a00e8338ac, 77b73f5d-479b-4b84-807f-cd62ce4105f8, d04ce79a-41a7-4507-8c9c-7827ec6de49f, ee43ea08-f279-4b40-9ce2-597aa4f6c370, de527983-ac1c-418f-856b-ce89e9f45b92, 57fa47b2-9364-40a3-bd2e-bf97656f0ca5, c9a67d5e-74db-4acc-ab86-2021e628b997, 44e04314-3f1f-4573-8b25-d93e021263a0, a6242d25-b820-4bc2-a662-9d33208c5c51, 9e8029b4-2456-4124-a778-40d0f549c32d, 97ffaaf6-34bc-436b-bc56-ff2d998d883f, 42480445-bbae-4581-afe6-6532d268a7e1, 0f7fb66c-163d-43c3-8b88-54fbdd9527e3, 644b2438-4088-4cdf-88be-c22006e49fb1, ce6d925c-39e2-4a26-9b25-ea339b2bc329, a8020883-cc98-4ecc-b2c3-6ce793e36ad4, 25dc7ea9-240f-4109-af5f-67225cb93621, f3a01ef9-4cad-43e5-9e85-ab2475b3c975, 79b966a9-18f1-48ee-84a2-59fa27ef400c, a548f833-e83b-45f2-b815-73c4682b758c, aef2b526-efdc-448f-87bc-796673cf918a, f9aecbcd-b0ba-4f63-a378-f0b85498755c, aff3cf83-480f-4779-9da1-0dae8da43ddc, 3de94477-d26a-4043-a1ba-01a0ef97fbdd, 2513b309-be21-409f-ac2b-d66674a5c084, ff43b13b-927a-4e60-9160-ee22183eae1b, 56c4bb36-5f5a-42b8-ba00-5012db002d86, 9c0b48da-4b9f-4a0d-b208-8c6e8d5e356d, 31898328-f7a6-450f-891f-cc13b22dabcd, 19a71ada-a1c5-4d75-a321-aa88cd08f70e, 9030bfdd-b15e-472b-98ac-7112c269489b, 55262cb7-0e36-4478-9709-62a7226bf94e, 677d3ed2-f0b8-44e2-ad8b-e8076f1fbcf8, 70ebab07-497e-4838-9cf6-33429dd26160] being returned for /tasks/poll/task.aluma.dferip.v2?worker-rip-deployment-996bbc489-fhlrz&null

shlomibd avatar Jan 23 '25 10:01 shlomibd

I pinned it down to this method in PostgresQueueDAO.java: private List<Message> popMessages( Connection connection, String queueName, int count, int timeout) {

    if (this.queueListener != null) {
        if (!this.queueListener.hasMessagesReady(queueName)) {
            return new ArrayList<>();
        }
    }

    String POP_QUERY =
            "UPDATE queue_message SET popped = true WHERE message_id IN (SELECT message_id FROM"
                    + " queue_message WHERE queue_name = ? AND popped = false AND deliver_on <="
                    + " (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority"
                    + " DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED) RETURNING"
                    + " message_id, priority, payload";

    List<Message> result =
            query(
                    connection,
                    POP_QUERY,
                    p ->
                            p.addParameter(queueName)
                                    .addParameter(count)
                                    .executeAndFetch(
                                            rs -> {
                                                List<Message> results = new ArrayList<>();
                                                while (rs.next()) {
                                                    Message m = new Message();
                                                    m.setId(rs.getString("message_id"));
                                                    m.setPriority(rs.getInt("priority"));
                                                    m.setPayload(rs.getString("payload"));
                                                    results.add(m);
                                                }
                                                return results;
                                            }));
   **I added this log message
    if (result != null && result.size() > count) {
        logger.error(
                "------------------ popMessages: result.size() > count --- size is {} count is"
                        + " {}",
                result.size(),
                count);
    }**
    return result;
}

I reproduce it and got the log message: ------------------ popMessages: result.size() > count --- size is 109 count is 1

shlomibd avatar Jan 26 '25 11:01 shlomibd