Raft
Raft is a consensus algorithm suitable for building master-replica clusters with the following features:
- Linearizability of operations
- Data consistency (weak or strong)
- Election of the leader node responsible for processing write operations
- Replication
- Cluster configuration management
The core of Raft implementation is RaftCluster<TMember> class which contains transport-agnostic implementation of Raft algorithm. First-class support of Raft in ASP.NET Core as well as other features are based on this class. In addition to original Raft implementation, the library provides the following augmentations and extensions:
- Pre-voting phase
- Standby state
- Automatic failure detection and reconfiguration of membership list
Consensus
Correctness of consensus algorithm is tightly coupled with Write-Ahead Log defined via AuditTrail
property of IPersistentState interface or via Dependency Injection. If your application requires only consensus without replication of real data then ConsensusOnlyState implementation is used. Note that this implementation is used by default as well. It is lighweight and fast. However it doesn't store state on disk. Consider to use persistent WAL as fully-featured persistent log for Raft.
State Recovery
The underlying state machine can be reconstruced at application startup using InitializeAsync
method provided by implementation of IPersistentState interface. Usually, this method is called by .NEXT infrastructure automatically.
MemoryBasedStateMachine class exposes ReplayAsync
method to do this manually. Read more about persistent Write-Ahead Log for Raft here.
Client Interaction
Chapter 6 of Diego's dissertation contains recommendations about interaction between external client and cluster nodes. Raft implementation provided by .NEXT doesn't implement client session control as described in the paper. However, it offers all necessary tools for that:
IPersistentState.EnsureConsistencyAsync
method waits until last committed entry is from leader's termIReplicationCluster.ForceReplicationAsync
method initiates a new round of heartbeats and waits for reply from the majority of nodesIRaftCluster.Lease
property to gets the lease that can be used for linearizable readIRaftCluster.ReplicateAsync
method to append, replicate and commit the log entry. Useful for implementing write operationsIRaftCluster.ApplyReadBarrierAsync
method to insert a barrier to achieve linearizable readIRaftCluster.LeadershipToken
property provides CancellationToken that represents a leadership state. If the local node is a leader then the token is in non-signaled state. If the local node is a follower node then the token is in canceled state. If local node is downgrading from the leader to the follower state then the token will be moved to the canceled state. This token is useful when implementing write operations and allow to abort asynchronous operation in case of downgrade
Elimination of duplicate commands received from clients should be implemented manually because basic framework is not aware about underlying network transport.
Linearizability
Linearizability requires the results of a read to reflect a state of the system sometime after the read was initiated; each read must at least return the results of the latest committed write. For instance, if the client performs Write operation on variable A and immediately requests variable A back then A must have the value which is equal to the value provided by Write operation or more recent value. Without the linearizability, the client can see stale value of A. In other words, there is no guarantee that the client will able to see the result of its own Write operation. A system that allowed stale reads would only provide serializability, which is a weaker form of consistency.
Linearizable read can be achieved in Raft naturally. Read operation can be performed on leader or follower nodes.
IRaftCluster.Lease
property exposes leadership lease than quarantees that the leader cannot be changed during that lease. This method of provoding linearizability doesn't require extra round of heartbeats. As a result, this is the most performant way to process read-only queries. However, the duration of the lease depends on clockDriftBound. Here's the citation from Raft paper:
The lease approach assumes a bound on clock drift across servers (over a given time period, no server’s clock increases more than this bound times any other). Discovering and maintaining this bound might present operational challenges (e.g., due to scheduling and garbage collection pauses, virtual machine migrations, or clock rate adjustments for time synchronization). If the assumptions are violated, the system could return arbitrarily stale information.
Lease approach can be used only if processing of all read-only queries performing by the leader node.
Another approach is to use read barrier. The barrier is provided by IRaftCluster.ApplyReadBarrierAsync
method. It allows to process read-only queries by follower nodes. In case of follower node, the method instructs leader node to execute a new round of heartbeats (with help of ForceReplicationAsync
method). The follower waits for its state machine to advance at least as far as the index of the last committed log entry on the leader node. These actions are enough to satisfy linearizability. As you can see, this approach leads to extra overhead caused by network communication.
Lease and read barrier are mechanisms for linearizable reads provided out-of-the-box. However, it's possible to use any other approach. For instance, the server respond with the commit index for each Write request. The client can update and remember this value locally and provide it with read-only query. When Read request is received, the server may call IPersistentState.WaitForCommitAsync
to ensure that the log contains the index of the last committed log entry by the client.
Node Bootstrapping
The node can be started in two modes:
- Cold Start means that the starting node is the initial node in the cluster. In that case, the node adds itself to the cluster configuration in committed state.
- Announcement means that the starting node must be announced through the leader, added to the cluster configuration and committed by the majority of nodes. In that case the node is started in Standby mode and waits until it will be added to the configuration by leader node and replicated to the that node
The node is started using StartAsync
method of RaftCluster<TMember> class doesn't mean that the node is ready to serve client requests. To ensure that the node is bootstrapped correctly, use Readiness Probe. The probe is provided through Readiness
property of IRaftCluster interface.
Another way of cluster bootstrapping is to pre-populate a list of cluster members with 3 or more cluster members and start them in parallel. In this case, Cold start mode is unnecessary.
Cluster Configuration Management
Raft supports cluster configuration management out-of-the-box. Cluster configuration is a set of cluster members consistently stored on the nodes. The leader node is responsible for processing amendments of the configuration and replicating the modified configuration to follower nodes. Thus, a list of cluster members is always in consistent state.
The configuration can be in two states:
- Active configuration which is used by the leader node for sending heartbeats. This type of configuration is always acknowledged by the majority of nodes and, as a result, the same on every node in the cluster
- Proposed configuration which is created by leader node as a response to configuration change. This type of configuration must be replicated and confirmed by the majority of nodes to be transformed into Active configuration.
Proposed configuration is similar to uncommitted log entries in Raft log. Due to simplicity, the proposed configuration can be created using the following operations:
- Add a new member
- Remove the existing member
It's not possible to remove or add multiple members at a time. Instead, you need to add or remove single member and replicate that change. When the proposed configuration is accepted by the majority of nodes, the leader node turns that configuration into the active configuration.
IClusterConfigurationStorage interface is responsible for maintaining cluster configuration. There are two possible storages:
- In-memory storage that stores configuration in the memory. Restarting the node leads to configuration loss
- Persistent storage that stores configuration in the file system
Warning
In-memory configuration storage is not recommended for production use. In case of node failures, the configuration will not survive the node restart.
When a new node is added, it passes through warmup procedure. The leader node attempts to replicate as much as possible log entries to the added node. The number of rounds for catch up can be configured by WarmupRounds
configuration property. When the leader node decided that the new node is in sync then it adds the address of that node to the proposed configuration. When the proposed configuration becomes the active configuration, readiness probe of the added node turning into the signaled state.
A new member can be proposed using IRaftHttpCluster.AddMemberAsync method for ASP.NET Core application or RaftCluster.AddMemberAsync method for application without DI support.
Network Transport
.NEXT supports the following network transports:
- HTTP 1.1, HTTP 2.0, and HTTP 3.0
- TCP transport
- Generic transport on top of ASP.NET Core Connections abstractions. See CustomTransportConfiguration class for more information.
TCP network transport shipped with DotNext.Net.Cluster
library without heavyweight dependencies such as ASP.NET Core or DotNetty. The library provides specialized application protocol on top of these transports which is binary protocol, highly optimized for Raft purposes and provide maximum bandwidth in contrast to HTTP. However, additional features for cluster programming are limited:
- General-purpose messaging between nodes is not supported via IMessageBus interface
Cluster programming model using TCP, and generic transports is unified and exposed via RaftCluster class. The following example demonstrates usage of this class:
using DotNext.Net.Cluster.Consensus.Raft;
RaftCluster.NodeConfiguration config = ...;//configuration of the local node
//configuring members in the cluster
config.Members.Add(new IPEndPoint(IPAddress.Loopback), 3262);
config.Members.Add(new IPEndPoint(IPAddress.Loopback), 3263);
using var cluster = new RaftCluster(config);
await cluster.StartAsync(CancellationToken.None); //starts hosting of the local node
//the code for working with cluster instance
await cluster.StopAsync(CancellationToken.None); //stops hosting of the local node
The configuration of the local node depends on chosen network transport. NodeConfiguration abstract class exposes common properties for both transports:
Configuration parameter | Required | Default Value | Description |
---|---|---|---|
Metrics | No | null | Allows to specify custom metrics collector |
PublicEndPoint | No | The same as HostEndPoint |
Allows to specify real IP address of the host where cluster node launched. Usually it is needed when node executed inside of Docker container. If this parameter is not specified then cluster node may fail to detect itself because network interfaces inside of Docker container have different addresses in comparison with real host network interfaces |
HeartbeatThreshold | No | 0.5 | Specifies frequency of heartbeat messages generated by leader node to inform follower nodes about its leadership. The range is (0, 1). The lower the value means that the messages are generated more frequently and vice versa |
LowerElectionTimeout, UpperElectionTimeout | No | 150 | Defines range for election timeout (in milliseconds) which is picked randomly inside of it for each cluster member. If cluster node doesn't receive heartbeat from leader node during this timeout then it becomes a candidate and start a election. The recommended value for upperElectionTimeout is 2 X lowerElectionTimeout |
MemoryAllocator | No | Memory pool from PipeConfig property | Memory allocator used to allocate memory for network packets |
Metadata | No | Empty dictionary | A set of metadata properties associated with the local node |
RequestTimeout | No | UpperElectionTimeout | Defines request timeout for accessing cluster members across the network |
LoggerFactory | No | NullLoggerFactory.Instance | The logger factory |
Standby | No | false | true to prevent election of the cluster member as a leader. It's useful to configure the nodes available for read-only operations only |
ConfigurationStorage | Yes | N/A | Represents a storage for the list of cluster members. You can use UseInMemoryConfigurationStorage method for testing purposes |
Announcer | No | null | A delegate of type ClusterMemberAnnouncer<TAddress> that can be used to announce a new node on leader |
WarmupRounds | No | 10 | The numbers of rounds used to warmup a fresh node which wants to join the cluster |
ColdStart | No | true | true to start the initial node in the cluster. In case of cold start, the node doesn't announce itself. false to start the node in standby node and wait for announcement |
By default, all transport bindings for Raft use in-memory configuration storage.
Cluster configuration management is represented by the following methods declared in RaftCluster class or IRaftHttpCluster interface (in case of HTTP transport working on top of ASP.NET Core infrastructure):
AddMemberAsync
to add and catch up a new nodeRemoveMemberAsync
to remove the existing node
AddMemberAsync
can be called by deployment script, manually by the administrator or through Announcer. RemoveMemberAsync
can be called as a part of graceful shutdown (planned deprovisioning) or manually by the administrator.
HTTP transport and ASP.NET Core
DotNext.AspNetCore.Cluster
library is an extension for ASP.NET Core for writing microservices and supporting the following features:
- Messaging is fully supported and organized through HTTP 1.1, HTTP 2.0 or HTTP 3.0 protocol including TLS
- Replication is fully supported
- Consensus is fully supported and based on Raft algorithm
- Tight integration with ASP.NET Core ecosystem such as Dependency Injection and Configuration Object Model
- Compatible with Kestrel or any other third-party web host
- Detection of changes in the list of cluster nodes via configuration
These extensions are located in DotNext.Net.Cluster.Consensus.Raft.Http
namespace.
This implementation is WAN friendly because it uses reliable network transport and supports TLS. It is good choice if your cluster nodes communicate over Internet or any other unreliable network. However, HTTP leads to performance and traffic overhead. Moreover, the library depends on ASP.NET Core.
Web application is treated as cluster node. The following example demonstrates how to turn ASP.NET Core application into cluster node:
using DotNext.Net.Cluster.Consensus.Raft.Http;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
sealed class Startup
{
public void Configure(IApplicationBuilder app)
{
app.UseConsensusProtocolHandler(); //informs that processing pipeline should handle Raft-specific requests
}
public void ConfigureServices(IServiceCollection services)
{
services.UsePersistentConfigurationStorage("/path/to/folder");
}
}
IHost host = new HostBuilder()
.ConfigureWebHost(webHost => webHost
.UseKestrel(options => options.ListenLocalhost(80))
.UseStartup<Startup>()
)
.JoinCluster() //registers all necessary services required for normal cluster node operation
.Build();
Note that JoinCluster
method should be called after ConfigureWebHost
. Otherwise, the behavior of this method is undefined.
JoinCluster
method has overloads that allow to specify custom configuration section containing the configuration of the local node.
UseConsensusProtocolHandler
method should be called before registration of any authentication/authorization middleware.
UsePersistentConfigurationStorage
allows to configure a persistent storage for the cluster configuration. Additionally, you can use UseInMemoryConfigurationStorage
method and keep the configuration in the memory.
Dependency Injection
The application may request the following services from ASP.NET Core DI container:
- ICluster
- IRaftCluster represents Raft-specific version of
ICluster
interface - IMessageBus for point-to-point messaging between nodes
- IPeerMesh<IRaftClusterMember> for tracking changes in cluster membership
- IReplicationCluster<IRaftLogEntry> to work with audit trail used for replication. IRaftLogEntry is Raft-specific representation of the record in the audit trail
- IReplicationCluster to work with audit trail in simplified manner
- IRaftHttpCluster provides HTTP-specific extensions to IRaftCluster interface plus cluster management methods
Configuration
The application should be configured properly to work as a cluster node. The following JSON represents the example of configuration:
{
"lowerElectionTimeout" : 150,
"upperElectionTimeout" : 300,
"metadata" :
{
"key": "value"
},
"requestJournal" :
{
"memoryLimit": 5,
"expiration": "00:00:10",
"pollingInterval" : "00:01:00"
},
"clientHandlerName" : "raftClient",
"port" : 3262,
"heartbeatThreshold" : 0.5,
"requestTimeout" : "00:01:00",
"rpcTimeout" : "00:00:150",
"keepAliveTimeout": "00:02:00",
"openConnectionForEachRequest" : false,
"clockDriftBound" : 1.0,
"coldStart" : true,
"standby" : false,
"warmupRounds" : 10,
"protocolVersion" : "auto",
"protocolVersionPolicy" : "RequestVersionOrLower",
}
Configuration parameter | Required | Default Value | Description |
---|---|---|---|
lowerElectionTimeout, upperElectionTimeout | No | 150, 300 | Defines range for election timeout (in milliseconds) which is picked randomly inside of it for each cluster member. If cluster node doesn't receive heartbeat from leader node during this timeout then it becomes a candidate and start a election. The recommended value for upperElectionTimeout is 2 X lowerElectionTimeout |
metadata | No | empty dictionary | A set of key/value pairs to be associated with cluster node. The metadata is queriable through IClusterMember interface |
openConnectionForEachRequest | No | false | true to create TCP connection every time for each outbound request. false to use HTTP KeepAlive |
clientHandlerName | No | raftClient | The name to be passed into IHttpMessageHandlerFactory to create HttpMessageInvoker used by Raft client code |
requestJournal:memoryLimit | No | 10 | The maximum amount of memory (in MB) utilized by internal buffer used to track duplicate messages |
requestJournal:expiration | No | 00:00:10 | The eviction time of the record containing unique request identifier |
requestJournal:pollingInterval | No | 00:01:00 | Gets the maximum time after which the buffer updates its memory statistics |
heartbeatThreshold | No | 0.5 | Specifies frequency of heartbeat messages generated by leader node to inform follower nodes about its leadership. The range is (0, 1). The lower the value means that the messages are generated more frequently and vice versa. |
protocolVersion | No | auto | HTTP protocol version to be used for the communication between members. Possible values are auto , http1 , http2 , http3 |
protocolVersionPolicy | No | RequestVersionOrLower | Specifies behaviors for selecting and negotiating the HTTP version for a request. Possible values are RequestVersionExact , RequestVersionOrHigher , RequestVersionOrLower |
requestTimeout | No | upperElectionTimeout |
Request timeout used to access cluster members across the network using HTTP client |
rpcTimeout | No | upperElectionTimeout / 2 |
Request timeout used to send Raft-specific messages to cluster members. Must be less than or equal to requestTimeout parameter |
standby | No | false | true to prevent election of the cluster member as a leader. It's useful to configure the nodes available for read-only operations only |
coldStart | No | true | true to start the initial node in the cluster. In case of cold start, the node doesn't announce itself. false to start the node in standby node and wait for announcement |
clockDriftBound | No | 1.0 | A bound on clock drift across servers. This value is used to calculate the leader lease duration. The lease can be obtained via IRaftCluster.Lease property. The lease approach assumes a bound on clock drift across servers: over a given time period, no server’s clock increases more than this bound times any other |
warmupRounds | No | 10 | The numbers of rounds used to warmup a fresh node which wants to join the cluster |
requestJournal
configuration section is rarely used and useful for high-load scenarios only.
Choose lowerElectionTimeout
and upperElectionTimeout
according with the quality of your network. If these values are small then you'll get a frequent leader re-elections.
Controlling node lifetime
The service implementing IRaftCluster
is registered as singleton service. It starts receiving Raft-specific messages immediately. Therefore, you can loose some events raised by the service such as LeaderChanged
at starting point. To avoid that, you can implement IClusterMemberLifetime interface and register implementation as a singleton.
using DotNext.Net.Cluster.Consensus.Raft;
using System.Collections.Generic;
internal sealed class MemberLifetime : IClusterMemberLifetime
{
private static void LeaderChanged(ICluster cluster, IClusterMember leader) {}
void IClusterMemberLifetime.OnStart(IRaftCluster cluster, IDictionary<string, string> metadata)
{
metadata["key"] = "value";
cluster.LeaderChanged += LeaderChanged;
}
void IClusterMemberLifetime.OnStop(IRaftCluster cluster)
{
cluster.LeaderChanged -= LeaderChanged;
}
}
Additionally, the hook can be used to modify metadata of the local cluster member.
HTTP Client Behavior
HTTP binding for Raft uses HttpClient for communication between cluster nodes. The client itself delegates all operations to HttpMessageHandler. It's not recommended to use HttpClientHandler because it has inconsistent behavior on different platforms. For instance, on Linux it invokes libcurl. Raft implementation uses Timeout
property of HttpClient
to establish request timeout. It's always defined as upperElectionTimeout
by .NEXT infrastructure. To demonstrate the inconsistent behavior let's introduce three cluster nodes: A, B and C. A and B have been started except C:
- On Windows the leader will not be elected even though the majority is present - 2 of 3 nodes are available. This is happening because Connection Timeout is equal to Response Timeout, which is equal to
upperElectionTimeout
. - On Linux everything is fine because Connection Timeout less than Response Timeout
By default, Raft implementation uses SocketsHttpHandler. However, the handler can be overridden using IHttpMessageHandlerFactory. You can implement this interface manually and register that implementation as a singleton. .NEXT tries to use this interface if it is registered as a factory of custom HttpMessageHandler. The following example demonstrates how to implement this interface and create platform-independent version of message invoker:
using System;
using System.Net.Http;
internal sealed class RaftClientHandlerFactory : IHttpMessageHandlerFactory
{
public HttpMessageHandler CreateHandler(string name) => new SocketsHttpHandler { ConnectTimeout = TimeSpan.FromMilliseconds(100) };
}
In practice, ConnectTimeout
should be equal to or less than lowerElectionTimeout
configuration property. Note that name
parameter is equal to the clientHandlerName
configuration property when handler creation is requested by Raft implementation.
Redirection to Leader
Client interaction requires automatic detection of a leader node. Cluster Development Suite provides a way to automatically redirect requests to the leader node if it was originally received by a follower node. The redirection is organized with help of 307 Temporary Redirect status code. Every follower node knows the actual address of the leader node. If cluster or its partition doesn't have leader then node returns 503 Service Unavailable.
Automatic redirection can be configured using RedirectToLeader extension method.
using DotNext.Net.Cluster.Consensus.Raft.Http;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
sealed class Startup
{
private readonly IConfiguration configuration;
public Startup(IConfiguration configuration) => this.configuration = configuration;
public void Configure(IApplicationBuilder app)
{
app.UseConsensusProtocolHandler()
.RedirectToLeader("/endpoint1")
.RedirectToLeader("/endpoint2");
}
public void ConfigureServices(IServiceCollection services)
{
}
}
This redirection can be transparent to actual client if you use reverse proxy server such as NGINX. Reverse proxy can automatically handle the redirection without returning control to the client.
It is possible to change default behavior of redirection where 307 Temporary Redirect status code is used. You can pass custom implementation into the optional parameter of RedirectToLeader
method.
The following example demonstrates how to return 404 Not Found and location of Leader node as its body.
private static Task CustomRedirection(HttpResponse response, Uri leaderUri)
{
response.StatusCode = StatusCodes.Status404NotFound;
return response.WriteAsync(leaderUri.AbsoluteUri);
}
public void Configure(IApplicationBuilder app)
{
app.UseConsensusProtocolHandler()
.RedirectToLeader("/endpoint1", redirection: CustomRedirection);
}
The customized redirection should be as fast as possible and don't block the caller.
Port mapping
Redirection mechanism trying to construct valid URI of the leader node based on its actual IP address. Identification of the address is not a problem unlike port number. The infrastructure cannot use the port if its WebHost because of Hosted Mode or the port from the incoming Host
header because it can be rewritten by reverse proxy. The only way is to use the inbound port of the TCP listener responsible for handling all incoming HTTP requests. It is valid for the non-containerized environment. Inside of the container the ASP.NET Core application port is mapped to the externally visible port which not always the same. In this case you can specify port for redirections explicitly as follows:
public void Configure(IApplicationBuilder app)
{
app.UseConsensusProtocolHandler()
.RedirectToLeader("/endpoint1", applicationPortHint: 3265);
}
Messaging
Cluster Programming Suite supports messaging beween nodes through HTTP out-of-the-box. However, the infrastructure don't know how to handle custom messages. Therefore, if you want to utilize this functionality then you need to implement IInputChannel interface.
Messaging inside of cluster supports redirection to the leader as well as for external client. But this mechanism implemented differently and exposed as IInputChannel
interface via LeaderRouter
property of IMessageBus interface.
Replication
Raft algorithm requires additional persistent state in order to basic audit trail. This state is represented by IPersistentState interface. By default, it is implemented as ConsensusOnlyState which is suitable only for applications that doesn't have replicated state. If your application has it, then use MemoryBasedStateMachine class or implement the interface from scratch. The implementation can be injected explicitly via AuditTrail
property of IRaftCluster interface or implicitly via Dependency Injection. The explicit registration should be done inside of the user-defined implementation of IClusterMemberLifetime interface registered as a singleton service in ASP.NET Core application. The implicit injection requires registration of a singleton service implementing IPersistentState interface. UsePersistenceEngine
extension method of RaftClusterConfiguration class can be used for that purpose.
Information about reliable persistent state that uses non-volatile storage is located in the separated article. However, its usage turns your microservice into stateful service because its state must be persisted on a disk. Consider this fact if you are using containerization technologies such as Docker or LXC.
Metrics
Raft node internals can be measured using any OpenTelemetry-compliant tool or dotnet-counters. .NEXT instrumented with modern System.Diagnostics.Metrics API so you can use the following counters:
- DotNext.Net.Cluster.Consensus.Raft.Server - for server-side metrics
- DotNext.Net.Cluster.Consensus.Raft.Client - for client-side metrics
TCP Transport
TCP transport used as bottom layer for specialized application protocol aimed to efficient transmission of Raft messages. This transport can be configured using TcpConfiguration class:
using DotNext.Net.Cluster.Consensus.Raft;
RaftCluster.NodeConfiguration config = new RaftCluster.TcpConfiguration(new IPEndPoint(IPAddress.Loopback));
using var cluster = new RaftCluster(config);
await cluster.StartAsync(CancellationToken.None); //starts hosting of the local node
//the code for working with cluster instance
await cluster.StopAsync(CancellationToken.None); //stops hosting of the local node
Constructor expecting address and port used for hosting of the local node.
The following table describes configuration properties applicable to TCP transport:
Configuration parameter | Required | Default Value | Description |
---|---|---|---|
ServerBacklog | No | Equal to the number of cluster members | The number of active incoming connections allowed by the local node |
LingerOption | No | Not enabled | The configuration that specifies whether a TCP socket will delay its closing in an attempt to send all pending data |
GracefulShutdownTimeout | No | LowerElectionTimeout | The timeout of graceful shutdown of active incoming connections |
TransmissionBlockSize | No | 65535 | The size, in bytes, of internal memory block used for sending packets. If your network has high packet loss then you can decrease this value to avoid retransmission of large blocks. |
RequestTimeout | No | LowerElectionTimeout / 2 | A timeout used for Raft RPC call. Must be less than or equal to LowerElectionTimeout |
ConnectTimeout | No | LowerElectionTimeout / 2 | TCP connection timeout. Must be less than or equal to RequestTimeout |
SslOptions | No | N/A | Allows to enable and configure transport-level encryption using SSL and X.509 certificates |
TimeToLive | No | 64 | Time To Live (TTL) value of Internet Protocol (IP) packets |
TCP transport is WAN friendly and support transport-level encryption. However, the underlying application-level protocol is binary and can be a problem for corporate firewalls.
The recommended relationship between timeouts: ConnectTimeout < RequestTimeout < LowerElectionTimeout
. In practice, ConnectTimeout should be as small as possible to avoid impact on the cluster by disconnected nodes.
Example
There is Raft playground represented by RaftNode application. You can find this app here. This playground allows to test Raft consensus protocol in real world using one of the supported transports: http
, tcp
, tcp+ssl
.
Each instance of launched application represents cluster node. All nodes can be started using the following script:
cd <dotnext>/src/examples/RaftNode
dotnet run -- http 3262
dotnet run -- http 3263
dotnet run -- http 3264
Every instance should be launched in separated Terminal session. After that, you will see diagnostics messages in stdout
about election process. Press Ctrl+C in the window related to the leader node and ensure that new leader will be elected.
Optionally, you can test replication powered by persistent WAL. To do that, you need to specify the name of folder which is used to store Write Ahead Log files
cd <dotnext>/src/examples/RaftNode
dotnet run -- http 3262 node1
dotnet run -- http 3263 node2
dotnet run -- http 3264 node3
Now you can see replication messages in each Terminal window. The replicated state stored in the node1
, node2
and node3
folders. You can restart one of the nodes and make sure that its state is recovered correctly.
Extensions
Raft implementation provided by .NEXT library contains some extensions of the original algorithm. For instance, Standby extra state is added in addition to Follower, Candidate, and Leader states. All those extensions are grouped into interfaces that can be found in separated namespace.
Automatic failure detection
Automatic Failure Detection is another extension to original Raft algorithm that allows cluster leader to detect unresponsive followers and remove those from the cluster configuration. This behavior allows to detect and tolerate permanent failures of a particular cluster node and remove it from the majority calculation to remain cluster available for writes. For instance, we have 7 nodes in the cluster. The cluster remains available if at least 4 nodes are alive. With failure detector, we can remove 3 faulty nodes and reconfigure the cluster dynamically to indicate that the cluster has 4 nodes only. In that case, the cluster remains available even with 3 nodes.
However, the current implementation needs to inform the rest of the cluster about faulty node. In other words, the cluster must be available for writes. If 4 of 7 nodes are detected as faulty in the same time, it is not possible to reconfigure the cluster because there is no majority to keep the leader working as expected.
IFailureDetector interface is an extension point that provides failure detection algorithm. The library ships φ Accrual Failure Detector as an efficient implementation of the detector which is based on anomalies of response time. By default, automatic failure detection is disabled. But the caller code can specify a factory for failure detectors. In that case, the internals of Raft implementation instantiate failure detector for each cluster member automatically on a leader's side. See RaftCluster<TMember> property for more information. In DI environment (ASP.NET Core), the factory can be registered as singleton service.
Development and Debugging
It may be hard to reproduce the real cluster on developer's machine. You may want to run your node in Debug mode and ensure that the node you're running is a leader node. To do that, you need to start the node in Cold Start mode.
Performance
The wire format is highly optimized for transferring log entries during the replication process over the wire. The most performance optimizations should be performed when configuring persistent Write-Ahead Log.
MemoryBasedStateMachine supports several log compaction modes. Some of them allow compaction in parallel with appending of new log entries. Read this article for more information about the available modes. Background compaction provides precise control over the compaction. There are few ways to control it:
- If you're using
UsePersistenceEngine
extension method for registering your engine based onMemoryBasedStateMachine
then .NEXT infrastructure automatically detects the configured compaction mode. If it is Background then it will register compaction worker as a background service in ASP.NET Core. This worker provides incremental background compaction. You can override this behavior by implementing ILogCompactionSupport in your persistence engine. - If you're registering persistence engine in DI container manually, you need to implement background compaction worker manually using BackgroundService class and call
ForceCompactionAsync
method in the overriddenExecuteAsync
method.
Incremental background compaction is the default strategy when Background compaction enabled. The worker just waits for the commit and checks whether MemoryBasedStateMachine.CompactionCount
property is greater than zero. If so, it calls ForceCompactionAsync
with the compaction factor which is equal to 1. It provides minimal compaction of the log. As a result, the contention between the compaction worker and readers is minimal or close to zero.
Guide: How To Implement Database
This section contains recommendations about implementation of your own database or distributed service based on .NEXT Cluster programming model. It can be K/V database, distributed UUID generator, distributed lock or anything else.
For memory-based state machine:
- Derive from MemoryBasedStateMachine class to implement core logic related to manipulation with state machine
- Override
ApplyAsync
method which contains interpretation of commands contained in log entries - Override
CreateSnapshotBuilder
method which is responsible for log compaction - Expose high-level data operations declared in the derived class in the form of interface. Let's assume that its name is
IDataEngine
- Override
- Declare class that is responsible for communication with leader node using custom messages
- This class aggregates reference to
IDataEngine
- This class encapsulates logic for messaging with leader node
- This class acting as controller for API exposed to external clients
- Use
IRaftCluster.ApplyReadBarrierAsync
to ensure that the node is fully synchronized with the leader node - Use
IRaftCluster.ReplicateAsync
for write operations
- This class aggregates reference to
- Expose data manipulation methods from class described above to clients using selected network transport
- Implement duplicates elimination logic for write requests from clients
- Call
ReplayAsync
method which is inherited fromMemoryBasedStateMachine
class at application startup. This step is not need if you're using Raft implementation for ASP.NET Core.
ForceReplicationAsync
method doesn't provide strong guarantees that the log entry at the specified index will be replicated and committed on return. A typical code for processing a new log entry from the client might be look like this:
IRaftCluster cluster = ...;
var term = cluster.Term;
await cluster.ReplicateAsync(new MyLogEntry(term), Timeout.InfiniteTimeSpan, token);
The same pattern is applicable to disk-based state machine except snapshotting.
Designing binary format for custom log entries and interpreter for them may be hard. Examine this article to learn how to use Interpreter Framework shipped with the library.
Guide: Custom Transport
Transport- and serialization-agnostic implementation of Raft is represented by RaftCluster<TMember> class. It contains core consensus and replication logic but it's not aware about network-specific details. You can use this class as foundation for your own Raft implementation for particular network protocol. All you need is to implementation protocol-specific communication logic. This chapter will guide you through all necessary steps.
Note
The easiest way to support new network protocol (e.g. Bluetooth) is to use CustomTransportConfiguration class. However, it doesn't provide control over the serialization format of Raft messages. If you're looking for a way to provide custom application-level protocol for Raft, follow this guide.
Existing Implementations
.NEXT library ships multiple network transports:
- RaftHttpCluster as a part of
DotNext.AspNetCore.Cluster
library offers HTTP 1.1/HTTP 2/HTTP 3 implementations adopted for ASP.NET Core framework - TransportServices as a part of
DotNext.Net.Cluster
library contains reusable network transport layer TCP transport shipped as a part of this library
All these implementations can be used as examples of transport for Raft messages.
Architecture
RaftCluster
contains implementation of consensus and replication logic so your focus is network-specific programming. First of all, you need to derive from this class. There are two main extensibility points when network-specific programing needed:
TMember
generic parameter which should be replaced with actual type argument by the derived class. Actual type argument should be a class implementing IRaftClusterMember interface and other generic constraints. This part of implementation contains code necessary for sending Raft-specific messages over the wire.- Body of derived class itself. This part of implementation contains code necessary for receiving Raft-specific messages over the wire.
From architecture point of view, these two parts are separated. However, the actual implementation may require a bridge between them.
Cluster Member
IRaftClusterMember declares the methods that are equivalent to Raft-specific message types.
NextIndex
property should return a location in memory to the index of the next log entry to be replicated for the current member. It doesn't contain any logic.
using DotNext;
using DotNext.Net.Cluster.Consensus.Raft;
sealed class ClusterMember : Disposable, IRaftClusterMember
{
private long nextIndex;
ref long IRaftClusterMember.NextIndex => ref nextIndex;
}
VoteAsync
, PreVoteAsync
, AppendEntriesAsync
, InstallSnapshotAsync
are methods for sending Raft-specific messages over the wire. They are called automatically by core logic located in RaftCluster
class. Implementation of these methods should throw MemberUnavailableException if any network-related problem occurred.
The last two methods responsible for serializing log entries to the underlying network connection. IRaftLogEntry is inherited from IDataTransferObject which represents abstraction for Data Transfer Object. DTO is an object that can be serialized to or deserialized from binary form. However, serialization/deserialization process and binary layout are fully controlled by DTO itself in contrast to classic .NET serialization. You need to wrap underlying network stream to IAsyncBinaryWriter and pass it to IDataTransferObject.WriteAsync
method for each log entry. IAsyncBinaryWriter
interface has built-in static factory methods for wrapping streams and pipes. Note that IDataTransferObject.Length
may return null and you will not be able to identify log record size (in bytes) during serialization. This behavior depends on underlying implementation of Write-Ahead Log. You can examine value of IAuditTrail.IsLogEntryLengthAlwaysPresented
property to apply necessary optimizations to the transmission process:
- If it's true then all log entries retrieved from such log has known size in bytes and
IDataTransferObject.Length
is not null. - If it's false then some or all log entries retrieved from such log has unknown size in bytes and
IDataTransferObject.Length
may be null. Thus, you need to provide special logic which allows to write binary data of undefined size to the underlying connection.
The default implementation for ASP.NET Core covers both cases. It uses multipart content type where log records separated by the special boundary from each other if IAuditTrail.IsLogEntryLengthAlwaysPresented
returns false. Otherwise, more optimized transfer over the wire is applied. In this case, the overhead is comparable to the raw TCP connection.
ResignAsync
method sends the message to the leader node and receiver should downgrade itself to the follower state. This is service message type not related to Raft but can be useful to force leader election.
You can use this code as an example of HTTP-specific implementation.
Derivation from RaftCluster
RaftCluster
class contains all necessary methods for handling deserialized Raft messages:
AppendEntriesAsync
method allows to handle AppendEntries Raft message type that was sent by another nodeResignAsync
method allows to handle leadership revocation procedureInstallSnapshotAsync
method allows to handle InstallSnapshot Raft message type that was sent by another nodeVoteAsync
method allows to handle Vote Raft message type that was sent by another nodePreVoteAsync
method allows to handle PreVote message introduced as extension to original Raft model to avoid inflation of Term value
The underlying code responsible for listening network requests must restore Raft messages from transport-specific representation and call the necessary handler for particular message type.
It is recommended to use partial class feature of C# language to separate different parts of the derived class. The recommended layout is:
- Main part with
StartAsync
andStopAsync
methods containing initialization logic, configuration and other infrastructure-related aspects. The example is here - Raft-related messaging. The example is here
- General-purpose messaging (if you need it)
AppendEntriesAsync
and InstallSnapshotAsync
expecting access to the log entries deserialized from the underlying transport. This is where IRaftLogEntry interface comes into play. Transport-specific implementation of IRaftLogEntry
should be present on the receiver side. Everything you need is just wrap section of underlying stream into instance of IAsyncBinaryReader and pass the reader to Transformation that comes through the parameter of TransformAsync
method. The example is here. IAsyncBinaryReader
has static factory methods for wrapping streams and pipes.
Another important extensibility points are StartAsync
and StopAsync
virtual methods. They are responsible for lifecycle management of RaftCluster
instance. You can override them for the following reasons:
- Opening and closing sockets
- Sending announcement to other nodes
- Detection of local cluster member
- Initialization of a list of cluster members
- Enforcement of configuration
Input/Output
Low-level code related to network communication requires a choice of I/O core framework. There are two standard approaches:
Pipe is more preferred way because of its asynchronous nature and shared memory buffer between consumer and producer. As a result, it gives you a small memory footprint during intense I/O operations. Read this article to learn more.
.NEXT has broad support of I/O pipelines:
IAsyncBinaryReader.Create
static factory method can wrap PipeReader to enable high-level decoding operationsIAsyncBinaryWriter.Create
static factory method can wrap PipeWriter to enable high-level encoding operations- Various I/O enhancements aimed to simplify programming using pipes
Network programming
The most important configuration of Raft cluster member is election timeout. Your transport-specific implementation should align socket timeouts correctly with it. For instance, connection timeout should not be greater than lower election timeout. Otherwise, you will have unstable cluster with frequent re-elections.
Another important aspect is a deduplication of Raft messages which is normal situation for TCP protocol. Vote, PreVote and InstallSnapshot are idempotent messages and can be handled twice by receiver. However, AppendEntries is not.
Hosting Model
The shape of your API for transport-specific Raft implementation depends on how the potential users will host it. There are few possible situations:
- Using Dependency Injection container:
- Generic application host from Microsoft.Extensions.Hosting
- Web host from ASP.NET Core
- Third-party Dependency Injection container
- Standalone application without DI container
In case of DI container from Microsoft you need to implement IHostedService in your derived class. The signatures of StartAsync
and StopAsync
methods from RaftCluster
class are fully compatible with this interface so you don't need implement interface methods manually. As a result, you will have automatic lifecycle management and configuration infrastructure at low cost. The instance of your class which is derived from RaftCluster
should be registered as singleton service. All its interfaces should be registered separately.
Different DI container requires correct adoption of your implementation.
If support of DI container is not a concern for you then appropriate configuration API and lifecycle management should be provided to the potential users.
The configuration of a cluster member is represented by IClusterMemberConfiguration interface. Your configuration model should be based on this interface because it should be passed to the constructor of RaftCluster
class. Concrete implementation of the configuration model depends on the hosting model.