dolphinscheduler
dolphinscheduler copied to clipboard
[Bug] [TaskPlugin] Loss of logs in AbstractCommandExecutor#parseProcessOutput
Search before asking
- [x] I had searched in the issues and found no similar issues.
What happened
The following is the main source code for this issue.
public abstract class AbstractCommandExecutor {
private void parseProcessOutput(Process process) {
// ...
getOutputLogService.execute(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
// 👉 1. Continuously append log line to logBuffer
logBuffer.add(line);
taskOutputParameterParser.appendParseLog(line);
}
processLogOutputIsSuccess = true;
} catch (Exception e) {
log.error("Parse var pool error", e);
processLogOutputIsSuccess = true;
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});
// ...
taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
try {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
if (logBuffer.size() > 1) {
// 👉 2. Consume logBuffer
// (Please look at the following method AbstractTask#logHandle)
logHandler.accept(logBuffer);
// 👉 3. Why is the logBuffer being cleared here?
logBuffer.clear();
logBuffer.add(EMPTY_STRING);
} else {
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
log.error("Output task log error", e);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
});
//...
}
}
public abstract class AbstractTask {
public void logHandle(LinkedBlockingQueue<String> logs) {
StringJoiner joiner = new StringJoiner("\n\t");
while (!logs.isEmpty()) {
joiner.add(logs.poll());
}
// 👉 Based on the logback configuaration, there is no using the AsyncAppender.
// 👉 Because logging is flushed by synchronously writing to the disk, it might take some time.
log.info(" -> {}", joiner);
// 👉 After `log.info()`
// 👉 Suppose at this moment, step(👆1) `logBuffer.add(line)`
// 👉 step(👆3) `logBuffer.clear()` 😮oh my gosh!!!🤯
}
}
What you expected to happen
Why is the logBuffer being cleared after logHandler.accept(logBuffer) ?
How to reproduce
The following is the simplified unit test to reproduce the bug.
@Test
public void testasynclog() throws InterruptedException {
BlockingQueue<String> buffer = new LinkedBlockingQueue<>();
buffer.add("");
AtomicBoolean success = new AtomicBoolean(false);
final int num = 1000;
new Thread(() -> {
for (int i = 0; i < num; i++) {
try { Thread.sleep(2); } catch (InterruptedException ignore) { }
buffer.add("" + i);
}
success.set(true);
}).start();
Thread t = new Thread(() -> {
int count = 0;
while (buffer.size() > 1 || !success.get()) {
if (buffer.size() > 1) {
{
StringJoiner joiner = new StringJoiner("\n\t");
while (!buffer.isEmpty()) {
String s = buffer.poll();
if (!s.isEmpty()) {
count++;
}
joiner.add(s);
}
// System.out.println(" -> " + joiner);
// simulate IO blocking
try { Thread.sleep(3); } catch (InterruptedException ignore) { }
}
buffer.clear();
buffer.add("");
}
}
System.out.printf("expect: %s, actual: %s\n", num, count);
});
t.start();
t.join();
}
Anything else
No response
Version
dev
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
We can directly remove the parseProcessOutputExecutorService here.