Task Completion Pipe
WhenAll
static method of Task waits until all passed tasks will be completed. Thus, the maximum possible execution time of such task is equal to the slowest task in the group. It means that there is no way to obtain results from the faster tasks in the group. From other side, WhenAny
static method allows to obtain the result of the fastest task in the group. However, it doesn't allow to wait for all tasks. In some use cases, responsiveness of the program is a critical requirement so we need to have an ability to obtain and process tasks asynchronously as they complete.
TaskCompletionPipe<T> specially designed to process tasks as they complete using asynchronous streams. It provides the following API surface:
- Producer side:
Add
method to add asynchronous tasksComplete
method to inform the pipe that newly tasks will not be addedSubmit
method to add a group of tasks
- Consumer side:
- IAsyncEnumerable<T> interface implementation allows to use await foreach statement in C# programming language to consume tasks asynchronously as they complete
TryRead
method attempts to obtain the completed task synchronouslyWaitToReadAsync
allows to wait to the completed task submitted to the pipe. It should be used in combination withTryRead
to optimize memory allocations by consumer
Note
Performance tip: TryRead
and WaitToReadAsync
methods are preferred way if consumer processes completed tasks slower than producer submits new tasks. Otherwise, await foreach is preferred.
The pipe is thread-safe for both consumer and producer. Moreover, multiple consumers and multiple producers are allowed, no need to specify configuration properties like SingleReader as for Channel<T> class.
TaskCompletionPipe static class exposes one-shot helper methods to obtain async stream over the tasks:
Task<int> task1 = DoAsync();
Task<int> task2 = DoAsync();
int sum = 0;
await foreach (int result in TaskCompletionPipe.GetConsumer([task1, task2]))
{
sum += result;
}