dolphinscheduler icon indicating copy to clipboard operation
dolphinscheduler copied to clipboard

[Bug] [TaskPlugin] Loss of logs in AbstractCommandExecutor#parseProcessOutput

Open JorringHsiao opened this issue 1 month ago • 1 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

The following is the main source code for this issue.

public abstract class AbstractCommandExecutor {

    private void parseProcessOutput(Process process) {
        // ...
        getOutputLogService.execute(() -> {
            TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
            try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
                LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                String line;
                while ((line = inReader.readLine()) != null) {
                    // 👉 1. Continuously append log line to logBuffer
                    logBuffer.add(line);
                    taskOutputParameterParser.appendParseLog(line);
                }
                processLogOutputIsSuccess = true;
            } catch (Exception e) {
                log.error("Parse var pool error", e);
                processLogOutputIsSuccess = true;
            } finally {
                LogUtils.removeTaskInstanceLogFullPathMDC();
            }
            taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
        });

        // ...
        taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
            try {
                LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
                while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
                    if (logBuffer.size() > 1) {
                        // 👉 2. Consume logBuffer 
                        // (Please look at the following method AbstractTask#logHandle)
                        logHandler.accept(logBuffer);
                        // 👉 3. Why is the logBuffer being cleared here?
                        logBuffer.clear();
                        logBuffer.add(EMPTY_STRING);
                    } else {
                        Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
                    }
                }
            } catch (Exception e) {
                log.error("Output task log error", e);
            } finally {
                LogUtils.removeTaskInstanceLogFullPathMDC();
            }
        });

        //...
    }
}

public abstract class AbstractTask {

    public void logHandle(LinkedBlockingQueue<String> logs) {

        StringJoiner joiner = new StringJoiner("\n\t");
        while (!logs.isEmpty()) {
            joiner.add(logs.poll());
        }
        // 👉 Based on the logback configuaration, there is no using the AsyncAppender.
        // 👉 Because logging is flushed by synchronously writing to the disk, it might take some time.
        log.info(" -> {}", joiner);
        // 👉 After `log.info()`
        // 👉 Suppose at this moment, step(👆1) `logBuffer.add(line)`
        // 👉 step(👆3) `logBuffer.clear()`  😮oh my gosh!!!🤯
    }
}

What you expected to happen

Why is the logBuffer being cleared after logHandler.accept(logBuffer) ?

How to reproduce

The following is the simplified unit test to reproduce the bug.

@Test
public void testasynclog() throws InterruptedException {
    BlockingQueue<String> buffer = new LinkedBlockingQueue<>();
    buffer.add("");
    AtomicBoolean success = new AtomicBoolean(false);
    final int num = 1000;
    new Thread(() -> {
        for (int i = 0; i < num; i++) {
            try { Thread.sleep(2); } catch (InterruptedException ignore) { }
            buffer.add("" + i);
        }
        success.set(true);
    }).start();

    Thread t = new Thread(() -> {
        int count = 0;
        while (buffer.size() > 1 || !success.get()) {
            if (buffer.size() > 1) {
                {
                    StringJoiner joiner = new StringJoiner("\n\t");
                    while (!buffer.isEmpty()) {
                        String s = buffer.poll();
                        if (!s.isEmpty()) {
                            count++;
                        }
                        joiner.add(s);
                    }
                    // System.out.println(" -> " + joiner);
                    // simulate IO blocking
                    try { Thread.sleep(3); } catch (InterruptedException ignore) { }
                }
                buffer.clear();
                buffer.add("");
            }
        }
        System.out.printf("expect: %s, actual: %s\n", num, count);
    });
    t.start();
    t.join();
}

Anything else

No response

Version

dev

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

JorringHsiao avatar Dec 11 '25 13:12 JorringHsiao

We can directly remove the parseProcessOutputExecutorService here.

ruanwenjun avatar Dec 11 '25 15:12 ruanwenjun