Orleans Streams
Orleans v.1.0.0 added support for streaming extensions to the programing model. Streaming extensions provide a set of abstractions and APIs that make thinking about and working with streams simpler and more robust. Streaming extensions allow developers to write reactive applications that operate on a sequence of events in a structured way. The extensibility model of stream providers makes the programming model compatible with and portable across a wide range of existing queuing technologies, such as Event Hubs, ServiceBus, Azure Queues, and Apache Kafka. There is no need to write special code or run dedicated processes to interact with such queues.
Why should I care?
If you already know all about Stream Processing and are familiar with technologies like Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Apache Spark Streaming, and Reactive Extensions (Rx) in .NET, you may be asking why should you care. Why do we need yet another Stream Processing System and how Actors are related to Streams? "Why Orleans Streams?" is meant to answer that question.
Programming Model
There are a number of principles behind Orleans Streams Programming Model:
- Orleans streams are virtual. That is, a stream always exists. It is not explicitly created or destroyed, and it can never fail.
- Streams are identified by stream IDs, which are just logical names comprised of GUIDs and strings.
- Orleans Streams allow you to decouple generation of data from its processing, both in time and space. That means that the stream producer and the stream consumer may be on different servers or in different time zones, and will withstand failures.
- Orleans streams are lightweight and dynamic. Orleans Streaming Runtime is designed to handle a large number of streams that come and go at a high rate.
- Orleans stream bindings are dynamic. Orleans Streaming Runtime is designed to handle cases where grains connect to and disconnect from streams at a high rate.
- Orleans Streaming Runtime transparently manages the lifecycle of stream consumption. After an application subscribes to a stream, it will then receive the stream's events, even in the presence of failures.
- Orleans streams work uniformly across grains and Orleans clients.
Programming APIs
Applications interact with streams via APIs that are very similar to the well-known Reactive Extensions (Rx) in .NET, by using Orleans.Streams.IAsyncStream<T>
that implementsOrleans.Streams.IAsyncObserver<T>
and
Orleans.Streams.IAsyncObservable<T>
interfaces.
In a typical example below, a device generates some data, which is sent as an HTTP request to the service running in the Cloud. The Orleans client running in the front-end server receives this HTTP call and publishes the data into a matching device stream:
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
// Post data directly into the device's stream.
IStreamProvider streamProvider = GrainClient.GetStreamProvider("MyStreamProvider");
IAsyncStream<DeviceEventData> deviceStream = streamProvider.GetStream<DeviceEventData>(deviceEvent.DeviceId, "MyNamespace");
await deviceStream.OnNextAsync(deviceEvent.Data);
}
In another example below, a chat user (implemented as Orleans Grain) joins a chat room, gets a handle to a stream of chat messages generated by all others users in this room, and subscribes to it. Notice that the chat user does not need to know about the chat room grain itself (there might not be such a grain in our system) or about other users in that group that produce messages. Needless to say, to publish to the chat stream, users don't need to know who is currently subscribed to the stream. This demonstrates how chat users can be completely decoupled in time and space.
public class ChatUser: Grain
{
public async Task JoinChat(Guid chatGroupId)
{
IStreamProvider streamProvider = base.GetStreamProvider("MyStreamProvider");
IAsyncStream<string> chatStream = streamProvider.GetStream<string>(chatGroupId, "MyNamespace");
await chatStream.SubscribeAsync(async (message, token) => Console.WriteLine(message))
}
}
Quick Start Sample
The Quick Start Sample is a good quick overview of the overall workflow of using streams in the application. After reading it, you should read the Streams Programming APIs to get a deeper understanding of the concepts.
Streams Programming APIs
A Streams Programming APIs provides a detailed description of the programming APIs.
Stream Providers
Streams can come via physical channels of various shapes and forms and can have different semantics. Orleans Streaming is designed to support this diversity via the concept of Stream Providers, which is an extensibility point in the system. Orleans currently has implementations of two stream providers: TCP based Simple Message Stream Provider and Azure Queue based Azure Queue Stream Provider. More details on Stream Providers can be found at Stream Providers.
Stream Semantics
Stream Subscription Semantics:
Orleans Streams guarantee Sequential Consistency for Stream Subscription operations. Specifically, when a consumer subscribes to a stream, once the Task
representing the subscription operation was successfuly resolved, the consumer will see all events that were generated after it has subscribed. In addition, Rewindable streams allow you to subscribe from an arbitrary point in time in the past by using StreamSequenceToken
(more details can be found here).
Individual Stream Events Delivery Guarantees: Individual event delivery guarantees depend on individual stream providers. Some provide only best-effort at-most-once delivery (such as Simple Message Streams (SMS)), while others provide at-least-once delivery (such as Azure Queue Streams). It is even possible to build a stream provider that will guarantee exactly-once delivery (we don't have such a provider yet, but it is possible to build one).
Events Delivery Order:
Event order also depends on a particular stream provider. In SMS streams, the producer explicitly controls the order of events seen by the consumer by controlling the way it publishes them. Azure Queue streams do not guarantee FIFO order, since the underlying Azure Queues do not guarantee order in failure cases. Applications can also control their own stream delivery ordering by using StreamSequenceToken
.
Streams Implementation
The Orleans Streams Implementation provides a high-level overview of the internal implementation.
Code Samples
More examples of how to use streaming APIs within a grain can be found here. We plan to create more samples in the future.