cadence-client icon indicating copy to clipboard operation
cadence-client copied to clipboard

Chained Futures do not unblock selects if "source" future IsReady

Open Groxx opened this issue 7 years ago • 1 comments

package experiments

import (
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"go.uber.org/cadence/activity"
	"go.uber.org/cadence/testsuite"
	"go.uber.org/cadence/workflow"
)

func init() {
	workflow.Register(selectorWork)
	activity.Register(selectorAct)
}

func selectorWork(ctx workflow.Context, sync bool) error {
	ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
	})

	fut, set := workflow.NewFuture(ctx)
	orig := workflow.ExecuteActivity(ctx, selectorAct, 5)

	if sync {
		orig.Get(ctx, nil)
	}
	set.Chain(orig)

	s := workflow.NewSelector(ctx)
	s.AddFuture(fut, func(f workflow.Future) {
		// nothing to do, unblocking will pass the test
	})
	s.Select(ctx)
	return nil
}

func selectorAct(arg int) (int, error) {
	return arg, nil
}

func TestSelector(t *testing.T) {
	runner := func(block bool) func(t *testing.T) {
		return func(t *testing.T) {
			s := new(testsuite.WorkflowTestSuite)
			env := s.NewTestWorkflowEnvironment()

			env.ExecuteWorkflow(selectorWork, block)

			assert.True(t, env.IsWorkflowCompleted())
			assert.NoError(t, env.GetWorkflowError())
		}
	}

	t.Run("async chains work", runner(false))
	t.Run("sync chains never unblock", runner(true))
}

If you comment out one of those two, you can see that async passes ~immediately, while sync times out.

Cause appears to be that the IsReady branch doesn't close the related channel? https://github.com/uber-go/cadence-client/blob/9c4380588cc1452dc0e84d672cb1f3dd758ae67a/internal/internal_workflow.go#L329

func (f *futureImpl) Chain(future Future) {
	if f.ready {
		panic("already set")
	}

	ch, ok := future.(asyncFuture)
	if !ok {
		panic("cannot chain Future that wasn't created with workflow.NewFuture")
	}
	if !ch.IsReady() {
		ch.ChainFuture(f)
		return
	}
	val, err := ch.GetValueAndError()
	f.value = val
	f.err = err
	f.ready = true
	return
}

Compare to e.g. Set:

func (f *futureImpl) Set(value interface{}, err error) {
	if f.ready {
		panic("already set")
	}
	f.value = value
	f.err = err
	f.ready = true
	f.channel.Close()
	for _, ch := range f.chained {
		ch.Set(f.value, f.err)
	}
}

Groxx avatar May 31 '18 02:05 Groxx

I've faced with the same issue and came to the same conclusions. Workaround it with

newFuture, settable := workflow.NewFuture(ctx)
...........................
selector.AddFuture(newFuture, func(f workflow.Future) {.......})
...........................
var value interface{}
err := future.Get(ctx, &value)
if err != nil {
   settable.SetError(err)
} else {
  settable.SetValue(value)
}
...........................
selector.AddFuture()

Do you have plans to fix it? or if you are able to confirm that it is really source of problem I can prepare pull request that fixes the problem.

kotenko-anton avatar May 17 '21 12:05 kotenko-anton