Add StepExecution parameter to StoppableTasklet.stop()
Currently, StoppableTasklet.stop() doesn't take any parameters. So, from within tasklet it is impossible to distinguish which exactly step execution is requested to stop out of multiple job executions running. This is important when some external request needs to be executed from StoppableTasklet.stop().
Looking into SimpleJobOperator.stop(long executionId) implementation, it looks very straightforward to implement this request - just pass stepExecution to the tasklet stop() method:
@Override
public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
JobExecution jobExecution = findExecutionById(executionId);
// Indicate the execution should be stopped by setting it's status to
// 'STOPPING'. It is assumed that
// the step implementation will check this status at chunk boundaries.
BatchStatus status = jobExecution.getStatus();
if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
throw new JobExecutionNotRunningException(
"JobExecution must be running so that it can be stopped: " + jobExecution);
}
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
try {
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
if (job instanceof StepLocator) {// can only process as StepLocator is the
// only way to get the step object
// get the current stepExecution
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isRunning()) {
try {
// have the step execution that's running -> need to 'stop' it
Step step = ((StepLocator) job).getStep(stepExecution.getStepName());
if (step instanceof TaskletStep) {
Tasklet tasklet = ((TaskletStep) step).getTasklet();
if (tasklet instanceof StoppableTasklet) {
StepSynchronizationManager.register(stepExecution);
((StoppableTasklet) tasklet).stop(**stepExecution**);
StepSynchronizationManager.release();
}
}
}
Thank you in advance!
Can I work on this issue?
I have submitted a PR!: https://github.com/spring-projects/spring-batch/pull/4715
Thank you for opening this feature request. I am not sure I fully understand the need for this, so would like to make sure the requirement is clear enough to me.
From the Javadocs of StoppableTasklet, the idea is that stopping a given job execution would stop all stoppable tasklets, which makes sense to me since we want to stop the overall job execution (ie it would not make sense to be selective to which tasklet to stop). For example, if I have three instances of the same stoppable tasklet or even different stoppable tasklets running in parallel, then if I request to stop the job execution, I am expecting all tasklets to get the stop signal, and not a selected subset. Is my understanding correct?
I am also not sure, from an API design point of view, about the added value of having the step execution in the stop method:
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class MyStoppableTasklet implements StoppableTasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// do some work
return RepeatStatus.FINISHED;
}
@Override
public void stop(StepExecution stepExecution) {
// We have just received a signal to stop the execution at this point,
// what is the added value of stepExecution here?
}
}
Again, I might be missing a use case, so please do not hesitate to elaborate on this.
What you've explained would be fully sufficient if there were no external async components involved in the tasklet execution.
We use tasklet to wrap JMS call to external system, and the tasklet waits for response completion. For simplicity, let's say one job has just one tasklet.
Now, let's say there 3 job instances running, each of which is waiting for JMS completion. They have send JMS messages with correlation IDs: A, B, C. Now we want to stop only second job (waiting for correlation ID B). If we call stop() we don't have enough context to send cancellation message with correlation ID B to cancel it in external system. On the other hand, having step execution ID (as part of stepExecution parameter), we are able to find respective correlation ID in our map created before the message was sent for processing.
Thank you for the feedback. The step execution is already available to any tasklet through the chunk context (or even the step contribution):
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.StoppableTasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class MyStoppableTasklet implements StoppableTasklet {
private long stepExecutionId;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
this.stepExecutionId = chunkContext.getStepContext().getStepExecution().getId();
// do some work
return RepeatStatus.FINISHED;
}
@Override
public void stop() {
// use this.stepExecutionId to send cancellation message with correlation id
}
}
Is that what you are looking for? If not, and for efficiency, please share your code and show the limitation with the current API as a code comment in order to correctly understand your point.
Your suggestion could solve the problem, although, it requires extra boilerplate on top of business logic:
- have extra field for every tasklet implementation who needs this capability
- don't forget it must be thread-safe (stop is called from a different thread)
Why I suggested this improvement is that org.springframework.batch.core.launch.support.SimpleJobOperator#stop already iterates through stepExecutions trying to find steps to be stopped, why not just pass it to the stop() method to provide enough context (as execute() does).
After all, the new stop() method parameter would be optional - who needs it, use it, similar to contribution and chunkContext in execute().
BTW, that's good that you've mentioned that stepExecution can be taken from contribution and chunkContext. Just an additional consideration for better consistency, why not pass one or both execute() methods parameters to stop() method instead of stepExecution?
Just an additional consideration for better consistency, why not pass one or both execute() methods parameters to stop() method instead of stepExecution?
StepContribution and ChunkContext are really about chunk oriented tasklets, but not all tasklets are chunk-oriented. I think this is a wrong method signature (see #4687). Tasklet#execute should really take the StepExecution as parameter. Chunk-oriented tasklets could be then specified in a sub-interface that uses StepContribution and ChunkContext. That said, I think stop should accept a step execution (I believe it's a bad idea to try to be consistent with a wrong design), so I will proceed and merge #4715 to resolve this.
BTW, take a look at #4965 which generalizes this feature to any kind of steps.