pie icon indicating copy to clipboard operation
pie copied to clipboard

Concurrent processing

Open Deleplace opened this issue 6 years ago • 7 comments

It would make sense to simplify the use of concurrency for some methods, in order to improve performance and throughput, and to provide users with correct concurrency out-of-the-box, reducing risks of subtle bugs.

2 strategies come to my mind:

  • 1 goroutine per element to process. This would be usually okay for the scheduler and make sense if each processing is expensive. But it's usually not optimal.
  • Bounded concurrency: max M elements being processed concurrently. This is best achieved with workers pattern (exactly M worker goroutines) and makes sense in various bottleneck cases (CPU cores, network, filesystem, etc.)

Examples of concurrency-enabled Functions:

ConcurrentAll(fn func(value ElementType) bool, M int) bool
ConcurrentAny(fn func(value ElementType) bool, M int) bool
ConcurrentEach(fn func(ElementType), M int) SliceType
ConcurrentFilter(condition func(ElementType) bool, M int) SliceType
ConcurrentMap(fn func(ElementType) ElementType, M int) SliceType
ConcurrentReduce(reducer func(ElementType, ElementType) ElementType, M int)  ElementType  ...if reducer is associative

Note that this is most useful for Functions taking a user-provided, potentially long, function as parameter. It would probably not be very useful for other Functions (e.g. Product, Shuffle), even for very large slices.

This issue is to discuss, before implementing anything.

Deleplace avatar May 09 '19 12:05 Deleplace

I certainly see your point of having the ability to make some functions concurrent. I'm thinking aloud as I type this...

If the number of goroutines is specified on the call then other goroutines doing the same thing will fan out. Not necessary a good or bad thing but it will make it harder to control overall. It may make more sense to create worker pools then attach workloads to them so that if there were 100 server threads that all needed to fan out to do a read they could go through the same preconfigured workers with a max concurrency of say 4.

Of course, you could just use new wait groups that allow unlimited concurrent jobs if your lazy or you understand the situation better than the scheduler may have. This works similar to how context is used.

I'm thinking of a practical example:

// Count all of the lines in a bunch of files.

workers := pie.NewWorkers(2)
files := pie.Strings{"file1.csv", "file2.csv", "file3.csv"}
totalLines := files.ConcurrentToInts(workers, func (file string) int {
    // Read the file, count the lines...
    return numberOfLines
}).Sum()

elliotchance avatar May 09 '19 23:05 elliotchance

Hi everyone. I think it might be worth to save the same interface for functions.

But I see that it'll make a overhead. Perhaps better way which I've found out.

// Count all of the lines in a bunch of files.

workers := pie.NewWorkers(2)
files := pie.Strings{"file1.csv", "file2.csv", "file3.csv"}
workers.Concurrent(ctx, files.IntsUsing(func (file string) int {
     // Read the file, count the lines...    
     return numberOfLines
})).(Ints).Sum()

As for this, it could be implemented using reflect(route to needed functions) or type workers can be generated for each type, then it'll be like this

// Count all of the lines in a bunch of files.

files := pie.Strings{"file1.csv", "file2.csv", "file3.csv"}
files.Concurrent(2).IntsUsing(ctx, func (file string) int {
     // Read the file, count the lines...    
     return numberOfLines
})).Sum()

func Concurrent return workers who will carry out next function.

PS. I've been thinking about that for a long time, so I've just left these thoughts here.

zhiburt avatar May 10 '19 19:05 zhiburt

@zhiburt For me, neither of those solutions align with the goals of pie.

The first suggestion looks very choppy to me. That is, it looks difficult to understand and chain/control. But really my main objection is the use of the reflect package. From the start, I wanted all of the pie code to be statically correct or the build break. There are several similar libraries out there that use reflect to different degrees - flexibility at the expense of potential bugs.

The second suggestion is very similar to one of the first ideas that I had. However, I didn't suggest it because I realised it requires one of two things:

  1. files would have to be a struct so that we attach carrying metadata, such as the 2. This breaks the fluidity of being able to easily go between native types and slices (ie. []string <-> pie.Strings).

  2. I'm always wary of chaining or nesting actions that inherit values from previous stages. This means that to be safe one would have to be careful to switch off the concurrency when you don't want/need it. Such as files.Concurrent(2).IntsUsing().Concurrent(0).... I much prefer if a stage needs to be concurrent you must be explicit about it and pass in the necessary options for that concurrency.

elliotchance avatar May 11 '19 06:05 elliotchance

One other thing that is important to mention is how these will be context aware. It's both totally valid that you would want to configure a worker pool that would be shared amongst goroutines, but also have different contexts for individual goroutines.

It might be better explained with an example. Let's say you have a simple web server that returns the number of lines for the files requested. Silly in practice but it's a good way to demonstrate the scenario.

Individuals requests will need to be able to inherit or otherwise cancel a context based on the request context. Whereas all requests might share the same worker pool to ensure that too many requests are not made in parallel to the storage layer. It might look something like this (crude and incomplete):

workers := pie.NewWorkers(context.Background(), 32)

func handler(w http.ResponseWriter, r *http.Request) {
    files := pie.Strings(/* from r.Body */)

    // This ensures that if the HTTP request is cancelled so will any
    // further processing of the files.
    ctx := workers.WithContext(r.Context())

    totalLines := files.ConcurrentIntsUsing(ctx, func (file string) int {
        // Read the file, count the lines...
        return numberOfLines
    }).Sum()

    fmt.Fprintf(w, "%d", totalLines)
}

func main() {
    http.HandleFunc("/", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

In a lot of cases the context.Background() is totally fine as well:

workers := pie.NewWorkers(context.Background(), 32)
ctx := workers.Context() // Returns the context set for the workers

elliotchance avatar May 11 '19 07:05 elliotchance

My thoughts in no specific order:

  • It's a very good idea to use Context
  • Let's make sure to not create any global state (not try to enforce globally the number of goroutines doing something)
  • Having a Workers type is certainly a possibility, I'm not sure it's necessary though. It's not like, say, java's ThreadPool or WorkerPool which want to be reused because threads are expensive to create.

Deleplace avatar May 11 '19 08:05 Deleplace

@Deleplace I absolutely agree with your first two points.

I like having a Workers type, not because threads are expensive, but because it's critical to be able to control how much an algorithm can fan out for various reasons. In any other case, somebody would use a WaitGroup or some other mechanism.

If you truly didn't care about how much each operation fanned out then the package could provide an unlimited placeholder, such as:

var Unlimited = NewWorkers(context.Background(), 1e6)

Used as:

totalLines := files.ConcurrentIntsUsing(pie.Unlimited, func (file string) int {

elliotchance avatar May 18 '19 07:05 elliotchance

Some related idea: how about a "custom function limiter" returning a function with same signature? With code generation, we're not constrained in genericity and we don't have to deal with reflection headaches.

e.g.

func (e ElementType) myPredicate() bool { ... }

// Not limited
b := s.ExpensiveConcurrentComputation(ElementType.myPredicate)

// Limited to N concurrent invocations of myPredicate
b := s.ExpensiveConcurrentComputation(ElementType.myPredicate_limit(N))

where the generated myPredicate_limit implementation would be like

func (ElementType) myPredicate_limit(N int) func(ElementType) bool {
    // Create a semaphore with N token

    // Return a new limited predicate
   return func(e ElementType) bool {
       // Take 1 token
       return e.myPredicate()
       // Give back 1 token
   }
}

Deleplace avatar May 18 '19 08:05 Deleplace