kotlinx.coroutines
kotlinx.coroutines copied to clipboard
1.9.0-RC test task:compileDemoDebugUnitTestKotlin error Suspension functions can only be called within coroutine body.
I won't encounter this issue when using version 1.8.1
` @Test fun concurrentExecution1() = runTest {
println("当前线程=${Thread.currentThread().name}")
val queue = ArrayBlockingQueue<doOnBackground<String>>(150)
val scope = CoroutineScope(testScheduler)
for (index in 1..150) {
queue.add {
getDataDelayedTime(RandomUtil.getRandomNum(1, 2000).toLong(), index)
}
}
scope.launch {
CoroutineRunBlockUtil.asynchronousBatchProcessing(
coroutineScope = scope,
jobList = queue,
onErrorIsIgnore = {false},
onComplete = {
println("完成")
},
onProgress = {total, progress, error ->
println("total=$total, progress=$progress, error=$error")
},
togetherSize = 5
).let {
println("获取到最后结果")
}
}
}
` private suspend fun getDataDelayedTime(delayedTime: Long, index: Int): Result<String> { println("第${index} 睡眠时长=${delayedTime}") delay(delayedTime) println( "第${index}项完成->${DateUtil.testNowTimeString},thread.name=${Thread.currentThread().name},thread.id=${Thread.currentThread().id},返回结果=${ index.mod( 7 ) != 0 }" )
return Result.Success("")
// return if (index.mod(7) != 0) Result.Success("") else Result.Failure(Exception())
}
`
` suspend fun <T : Any>asynchronousBatchProcessing( coroutineScope: CoroutineScope, jobList: ArrayBlockingQueue<doOnBackground<T>>, onErrorIsIgnore: (Result<T>) -> Boolean = { false }, togetherSize: Int = 5, onProgress: (total: Int, progress: Int, error: Int) -> Unit, onComplete: (Boolean) -> Unit, ): kotlin.Result<String> {
/** 当前等待执行的任务 */
val currentWaitDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)
/** 正在运行的任务 */
val currentDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)
/** false 正常执行 */
val currentTaskState = AtomicBoolean(false)
/** 当前任务总数 */
val taskTotal: Int = jobList.size
/** 当前完成数 */
val currentCompleteTotal = AtomicInteger(0)
/** 当前失败数 */
val currentErrorTotal = AtomicInteger(0)
var currentJob: Job? = null
return suspendCancellableCoroutine { continuation ->
if (coroutineScope.isActive) {
currentJob = coroutineScope.launch(Dispatchers.IO) {
//当前任务正在进行中
while (coroutineScope.isActive && jobList.isNotEmpty() && !currentTaskState.get()) {
println("开始进入循环准备执行---->>>>>>,${currentWaitDoingTaskList.size}")
if (taskTotal == currentCompleteTotal.get()) {
//任务完成
currentTaskState.set(true)
}
if (currentWaitDoingTaskList.size < togetherSize && currentDoingTaskList.size < togetherSize) {
jobList.poll()?.let { job ->
async {
println("开启异步执行=${DateUtil.testNowTimeString}")
job.invoke()
}.let { task ->
if (coroutineScope.isActive) {
//待执行的任务列表
currentWaitDoingTaskList.add(task)
if(currentDoingTaskList.size < togetherSize){
currentDoingTaskList.add(task)
}
// task.start()
}
}
}
}
if (currentWaitDoingTaskList.size == togetherSize || (jobList.isEmpty() && currentWaitDoingTaskList.size< togetherSize)) {
//当前执行中 的数=最大执行数 或者没有待处理的任务且正在进行的任务非空
while (currentWaitDoingTaskList.isNotEmpty() && !currentTaskState.get() && currentDoingTaskList.size<= togetherSize) {
currentWaitDoingTaskList.poll()?.let {
//未执行前加入到正在等待的任务
val result = it.await()
//从正在等待的任务内移出
currentDoingTaskList.remove(it)
println("移出已完成的任务=${currentDoingTaskList.size},${DateUtil.testNowTimeString}")
currentCompleteTotal.incrementAndGet()
if (taskTotal == currentCompleteTotal.get()) {
//任务完成
currentTaskState.set(true)
}
if (result is Result.Failure) {
println("当前为失败-----》")
//当前为失败
currentErrorTotal.incrementAndGet()
withContext(Dispatchers.Main) {
onProgress.invoke(
taskTotal,
currentCompleteTotal.get(),
currentErrorTotal.get()
)
}
if (!onErrorIsIgnore(result)) {
//如果当前不忽略终止操作
currentTaskState.set(true)
withContext(Dispatchers.Main) {
onComplete.invoke(false)
}
println("正在执行的任务总数:${currentDoingTaskList.size}")
currentDoingTaskList.forEach { doJob ->
println("终止执行=${doJob.isActive},${doJob.isCancelled},${doJob.isCompleted}")
doJob.cancel()
}
// continuation.resumeWithException(result.exception)
continuation.resume(kotlin.Result.failure(result.exception))
currentJob?.cancel()
return@launch
}
} else {
withContext(Dispatchers.Main) {
onProgress.invoke(
taskTotal,
currentCompleteTotal.get(),
currentErrorTotal.get()
)
}
}
}
}
}
println("循环底部----->>>>>>>>${jobList.size},${currentTaskState.get()}")
}
withContext(Dispatchers.Main) {
onComplete.invoke(true)
}
println("返回结果-----------》》》》》》》")
continuation.resume(kotlin.Result.success("完成"))
}
}
}
}`
Closing as "can't reproduce".
If the issue still persists, please consider filing a self-contained sample that clearly highlights the problem