cadence-client
cadence-client copied to clipboard
Chained Futures do not unblock selects if "source" future IsReady
package experiments
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/testsuite"
"go.uber.org/cadence/workflow"
)
func init() {
workflow.Register(selectorWork)
activity.Register(selectorAct)
}
func selectorWork(ctx workflow.Context, sync bool) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
})
fut, set := workflow.NewFuture(ctx)
orig := workflow.ExecuteActivity(ctx, selectorAct, 5)
if sync {
orig.Get(ctx, nil)
}
set.Chain(orig)
s := workflow.NewSelector(ctx)
s.AddFuture(fut, func(f workflow.Future) {
// nothing to do, unblocking will pass the test
})
s.Select(ctx)
return nil
}
func selectorAct(arg int) (int, error) {
return arg, nil
}
func TestSelector(t *testing.T) {
runner := func(block bool) func(t *testing.T) {
return func(t *testing.T) {
s := new(testsuite.WorkflowTestSuite)
env := s.NewTestWorkflowEnvironment()
env.ExecuteWorkflow(selectorWork, block)
assert.True(t, env.IsWorkflowCompleted())
assert.NoError(t, env.GetWorkflowError())
}
}
t.Run("async chains work", runner(false))
t.Run("sync chains never unblock", runner(true))
}
If you comment out one of those two, you can see that async passes ~immediately, while sync times out.
Cause appears to be that the IsReady branch doesn't close the related channel?
https://github.com/uber-go/cadence-client/blob/9c4380588cc1452dc0e84d672cb1f3dd758ae67a/internal/internal_workflow.go#L329
func (f *futureImpl) Chain(future Future) {
if f.ready {
panic("already set")
}
ch, ok := future.(asyncFuture)
if !ok {
panic("cannot chain Future that wasn't created with workflow.NewFuture")
}
if !ch.IsReady() {
ch.ChainFuture(f)
return
}
val, err := ch.GetValueAndError()
f.value = val
f.err = err
f.ready = true
return
}
Compare to e.g. Set:
func (f *futureImpl) Set(value interface{}, err error) {
if f.ready {
panic("already set")
}
f.value = value
f.err = err
f.ready = true
f.channel.Close()
for _, ch := range f.chained {
ch.Set(f.value, f.err)
}
}
I've faced with the same issue and came to the same conclusions. Workaround it with
newFuture, settable := workflow.NewFuture(ctx)
...........................
selector.AddFuture(newFuture, func(f workflow.Future) {.......})
...........................
var value interface{}
err := future.Get(ctx, &value)
if err != nil {
settable.SetError(err)
} else {
settable.SetValue(value)
}
...........................
selector.AddFuture()
Do you have plans to fix it? or if you are able to confirm that it is really source of problem I can prepare pull request that fixes the problem.