Show / Hide Table of Contents

Class AsyncCorrelationSource<TKey, TValue>

Represents pub/sub synchronization primitive when each event has unique identifier.

Inheritance
object
AsyncCorrelationSource<TKey, TValue>
Inherited Members
object.Equals(object)
object.Equals(object, object)
object.GetHashCode()
object.GetType()
object.MemberwiseClone()
object.ReferenceEquals(object, object)
object.ToString()
Namespace: DotNext.Threading
Assembly: DotNext.Threading.dll
Syntax
public class AsyncCorrelationSource<TKey, TValue> where TKey : notnull
Type Parameters
Name Description
TKey

The type of the event identifier.

TValue

The type of the event payload.

Remarks

This synchronization primitive is useful when you need to correlate two events across process boundaries. For instance, you can send asynchronous message to another process or machine in the network and wait for the response. The message passing is not a duplex operation (in case of message brokers) so you need to wait for another input message and identify that this message is a response. These two messages can be correlated with the key. The consumer and producer of the event must be protected by happens-before semantics. It means that the call to WaitAsync(TKey, TimeSpan, CancellationToken) by the consumer must happen before the call to Pulse(TKey, in Result<TValue>) by the producer for the same key.

Constructors

| Edit this page View Source

AsyncCorrelationSource(int, IEqualityComparer<TKey>?)

Initializes a new event correlation source.

Declaration
public AsyncCorrelationSource(int concurrencyLevel, IEqualityComparer<TKey>? comparer = null)
Parameters
Type Name Description
int concurrencyLevel

The number of events that can be processed without blocking at the same time.

IEqualityComparer<TKey> comparer

The comparer to be used for comparison of the keys.

Exceptions
Type Condition
ArgumentOutOfRangeException

concurrencyLevel is less than or equal to zero.

Methods

| Edit this page View Source

Pulse(TKey, in Result<TValue>)

Informs that the event is occurred.

Declaration
public bool Pulse(TKey eventId, in Result<TValue> value)
Parameters
Type Name Description
TKey eventId

The unique identifier of the event.

Result<TValue> value

The value to be passed to the listener.

Returns
Type Description
bool

true if there is an active listener of this event; false.

Remarks

If no listener present for eventId then the signal will be dropped.

| Edit this page View Source

Pulse(TKey, in Result<TValue>, out object?)

Informs that the event is occurred.

Declaration
public bool Pulse(TKey eventId, in Result<TValue> value, out object? userData)
Parameters
Type Name Description
TKey eventId

The unique identifier of the event.

Result<TValue> value

The value to be passed to the listener.

object userData

Custom data associated with an event.

Returns
Type Description
bool

true if there is an active listener of this event; false.

Remarks

If no listener present for eventId then the signal will be dropped.

| Edit this page View Source

PulseAll(Exception)

Raises the exception on all active listeners.

Declaration
public void PulseAll(Exception e)
Parameters
Type Name Description
Exception e

The exception to be passed to all active listeners.

| Edit this page View Source

PulseAll(CancellationToken)

Cancels all active listeners.

Declaration
public void PulseAll(CancellationToken token)
Parameters
Type Name Description
CancellationToken token

The token in the canceled state.

| Edit this page View Source

PulseAll(TValue)

Notifies all active listeners.

Declaration
public void PulseAll(TValue value)
Parameters
Type Name Description
TValue value

The value to be passed to all active listeners.

| Edit this page View Source

WaitAsync(TKey, object?, TimeSpan, CancellationToken)

Returns the task linked with the specified event identifier.

Declaration
public ValueTask<TValue> WaitAsync(TKey eventId, object? userData, TimeSpan timeout, CancellationToken token = default)
Parameters
Type Name Description
TKey eventId

The unique identifier of the event.

object userData

Custom data associated with the event.

TimeSpan timeout

The time to wait for Pulse(TKey, in Result<TValue>, out object?).

CancellationToken token

The token that can be used to cancel the operation.

Returns
Type Description
ValueTask<TValue>

The task representing the event arrival.

Exceptions
Type Condition
TimeoutException

The operation has timed out.

OperationCanceledException

The operation has been canceled.

| Edit this page View Source

WaitAsync(TKey, CancellationToken)

Returns the task linked with the specified event identifier.

Declaration
public ValueTask<TValue> WaitAsync(TKey eventId, CancellationToken token = default)
Parameters
Type Name Description
TKey eventId

The unique identifier of the event.

CancellationToken token

The token that can be used to cancel the operation.

Returns
Type Description
ValueTask<TValue>

The task representing the event arrival.

Exceptions
Type Condition
OperationCanceledException

The operation has been canceled.

| Edit this page View Source

WaitAsync(TKey, TimeSpan, CancellationToken)

Returns the task linked with the specified event identifier.

Declaration
public ValueTask<TValue> WaitAsync(TKey eventId, TimeSpan timeout, CancellationToken token = default)
Parameters
Type Name Description
TKey eventId

The unique identifier of the event.

TimeSpan timeout

The time to wait for Pulse(TKey, in Result<TValue>).

CancellationToken token

The token that can be used to cancel the operation.

Returns
Type Description
ValueTask<TValue>

The task representing the event arrival.

Exceptions
Type Condition
TimeoutException

The operation has timed out.

OperationCanceledException

The operation has been canceled.

Extension Methods

BasicExtensions.As<T>(T)
BasicExtensions.GetUserData<T>(T)
BasicExtensions.IsBetween<T, TLowerBound, TUpperBound>(T, TLowerBound, TUpperBound)
BasicExtensions.IsOneOf<T>(T, ReadOnlySpan<T>)
ExpressionBuilder.Const<T>(T)
AsyncLockAcquisition.AcquireLockAsync<T>(T, CancellationToken)
AsyncLockAcquisition.AcquireLockAsync<T>(T, TimeSpan, CancellationToken)
AsyncLockAcquisition.AcquireReadLockAsync<T>(T, CancellationToken)
AsyncLockAcquisition.AcquireReadLockAsync<T>(T, TimeSpan, CancellationToken)
AsyncLockAcquisition.AcquireWriteLockAsync<T>(T, bool, CancellationToken)
AsyncLockAcquisition.AcquireWriteLockAsync<T>(T, bool, TimeSpan, CancellationToken)
AsyncLockAcquisition.AcquireWriteLockAsync<T>(T, CancellationToken)
AsyncLockAcquisition.AcquireWriteLockAsync<T>(T, TimeSpan, CancellationToken)
LockAcquisition.AcquireReadLock<T>(T)
LockAcquisition.AcquireReadLock<T>(T, TimeSpan)
LockAcquisition.AcquireUpgradeableReadLock<T>(T)
LockAcquisition.AcquireUpgradeableReadLock<T>(T, TimeSpan)
LockAcquisition.AcquireWriteLock<T>(T)
LockAcquisition.AcquireWriteLock<T>(T, TimeSpan)
  • Edit this page
  • View Source
☀
☾
In this article
Back to top
Supported by the .NET Foundation
☀
☾