🔌 • leeky raft: a fantastical consensus layer - chron·ic·piz·za

chron·ic·piz·za

[04.03.2020] // reading time: 16min // #tech

🔌 • leeky raft: a fantastical consensus layer

Leeky Raft (github.com/slin63/raft-consensus) is a Golang implementation of Raft as described in the paper: In Search of an Understandable Consensus Algorithm. It depends on Chord-ish, a membership layer I also built in Golang. Chord-ish provides Leeky Raft with membership information, telling it the IP addresses and identifiers of other nodes in the network. Chord-ish depends on a single introducer node, which other nodes use to join the network. You can read about how Chord-ish works here.

This allows Leeky Raft to not only discover and communicate with other members in its network, but also allows nodes to leave or join the network, allowing us to easily scale the number of nodes in a cluster without having to do any extra work.

Leeky Raft follows Raft faithfully and most mechanics in it are already well described in the paper for canonical Raft, so we’ll explore its implementation through the life of an entry.

First, you will see the birth of the network. Small things, formless and without a leader, congregating together to become one, composed of processes both Chord-ish and Raft-ish. Then, you will see the entry’s journey from when it is first created by a user, then submitted to the leader by a client, and finally to its application to a distributed state machine. Let’s begin!

The Life of an Entry in Leeky Raft

First, Some Language

  • network: a collection of nodes
  • node: a machine or single docker container
  • process: a single program running on a node
  • consensus layer: a process used to achieve consensus in a distributed system
  • membership layer: a process used to keep track of other nodes in a network

And Then There Was a Network

Leeky Raft is deployed on your local network (and meant to stay there) with the help of Docker and Docker-Compose. Docker is useful here because it allows us to simulate a number of seemingly independent machines, each with their own IP address and environment variables. First, a Dockerfile

  1. git clones Chord-ish,
  2. copies Leeky Raft from our local file system,
  3. builds both project binaries, respectively named member and raft,
  4. runs /scripts/init.sh to start them both alongside each other.

A docker-compose.yml is used to repeat this process several times, as well as to expose certain ports to the client and define our Chord-ish introducer. Services inside the docker-compose.yml are separated into two types: introducers and workers. introducers serve as our Chord-ish introducer, and so there should only ever be one in the network. workers don’t have any special properties and can be scaled arbitrarily.

 1services:
 2  introducer:
 3    container_name: introducer
 4    environment:
 5      - INTRODUCER=1
 6    ## Expose Raft report so that local
 7    ## client can communicate
 8    ports:
 9      - "6002:6002"
10      - "6004:6004"
11    build: .
12
13  worker:
14    ## Delay building this container until
15    ## introducer container is created
16    depends_on:
17      - "introducer"
18    build: .

Using the paradigm of a single introducer and many normal workers, we can now decide how many nodes are in our network simply by changing the value of <N> in the following command: docker-compose up --scale worker=<N>.

And so, we do.

1> docker-compose build && docker-compose up --scale worker=4
2. . .
3Creating introducer ... done
4Recreating raft-consensus_worker_1 ... done
5Recreating raft-consensus_worker_2 ... done
6Recreating raft-consensus_worker_3 ... done
7Recreating raft-consensus_worker_4 ... done

Behold! The membership layer gathers itself, connecting to the introducer and gossiping their membership lists. Now we have a network of 1 + 4 = 5 nodes.

We still have a problem. Chord-ish and Leeky Raft, although running on the same container, are still independent processes. They will not know of each other’s state unless they communicate, and so they must.

Before we go any further, let’s pick a main character for our story. Let’s tell it from the perspective of raft-consensus_worker_3.

Leeky Raft’s First Words

The first thing our Leeky Raft process on raft-consensus_worker_3 does after it’s started is to fire off an RPC to the membership layer. This RPC, Self, returns a struct that contains a MemberMapT, as defined below.

1type MemberMapT map[int]*MemberNode
2type MemberNode struct {
3    IP        string
4    . . .
5}

MemberMapT is a mapping of server PIDs to their IP addresses. This tells Leeky Raft what other nodes exist in the network and how to communicate with them.

After it gets this information, Leeky Raft:

  1. deploys its RPC server so it can respond to incoming RPCs
  2. generates a random election timeout between 0.75 - 1.5 seconds
  3. sleeps for 1 second to allow the other Leeky Raft instances time to come online as well.

After our Leeky Raft process finishes napping, its election timer starts to count down and it waits for an incoming RequestVote RPC. If it doesn’t receive one, it dispatches RequestVotes to all known nodes. This is where reality starts to fork.

Sometimes our process’s timer is long, and some other Leeky Raft process’s timer is shorter, and so that other process begins issuing RequestVotes first, gets the majority of votes, and becomes the new leader.

Sometimes, two timers are close enough that they both begin an election and neither one gets the majority. In this case, a new election begins.

Sometimes, our process’s timer is the shortest, it hits zero, begins an election first, and fires off RequestVotes. It gets the majority and becomes the leader. This is the reality that we’ll be using.

Beat That Drum

Our new leader celebrates its victory by beginning the timeless tradition of heartbeating. The leader heartbeats every 375 milliseconds, dispatching empty AppendEntry RPCs to all members of the network. The first round of heartbeats after our Leeky Raft process has won an election is especially important, because it tells everyone that they’re the new leader and that current candidates should give up.

 1for PID := range heartbeats {
 2  // Don't heartbeat to nodes that no longer exist.
 3  if _, ok := self.MemberMap[PID]; !ok {
 4    return
 5  } else {
 6    // IT EXISTS! Start a new heartbeating goroutine.
 7    go func(PID int) {
 8      // Call AppendEntries on a server
 9      // with an empty AppendEntries RPC
10      r := CallAppendEntries(
11        PID,
12        raft.GetAppendEntriesArgs(&self),
13      )
14
15      // If that server turns out to have outdated logs,
16      // start bringing them back up-to-date÷
17      if (
18        !r.Success &&
19        r.Error == responses.MISSINGLOGENTRY
20      ) {
21        r = appendEntriesUntilSuccess(raft, PID)
22      }
23    }(PID)
24  }
25}

Heartbeats are done as concurrent goroutines so that if a single server is slow to respond, all other servers will still receive a heartbeat in a timely manner.

This is important because Leeky Raft election timers are constantly ticking. If one ever reaches 0, that Leeky Raft process would start a new election. We only ever want that to happen when the current leader has experienced some kind of failure. This is assured by having each node reset its election timer whenever it receives an AppendEntries RPC.

If heartbeats were done sequentially, our heartbeating code might be blocked by a single slow node, causing a cascade of wasteful elections. Concurrent heartbeats do not have that problem.

Wonderful. Now all in the land know of the True Leader, raft-consensus_worker_3, Hallowed be Thy Container name.

Our Raft network is now ready to begin processing entries.

An Entry is Born

Somewhere stands a figure cloaked in silk robes weaved through with the bones of animals small and delicate. His skin is tattered, frost bitten, splotched from the sun but still pale. He is positioned in front of a machine of familiar design.

He whispers to the machine a timeless chant, one once passed down perfectly from generation to generation until the melting of ice bridges in a stirring ocean isolated one people from another, after which the chant began to evolve into forms impure and terrible. Echoes working through a winding ridge growing ever wider. Few know the chant as it once was.

Our Figure is of the few. He remembers the words as they were first spoken, ambling first from the tongue of his grandfather and, in dreams, from the moaning of caves in the wind and oceans in the endless surf. Incantations of a timeless perfection, made true by faith and faith alone.

His syntax is perfect.

$ ./raft-client PUT pizzatoppings peporoni peppr cheddar anchoves onn it.

The command line arguments, [PUT pizzatoppings peporoni peppr cheddar anchoves onn it] are then parsed as a string to get "PUT pizzatoppings peporoni peppr cheddar anchoves onn it". This is our new entry. In another time, we might try and attach more significance to the syntax and word choice, but for now we need only think of it as a string: innocent, pure, nothing more.

Our entry is sent through an RPC called PutEntry to our Leeky Raft leader. Chilled with frost and running thick through the hair and feather of foxes and birds, the wind carries our remote procedure call, warmly bundled in a TCP packet, safely to its destination.

The client code waits for a response.

 1// Called by the client to add an entry to a Raft group
 2func PutEntry(args []string) {
 3  // Parse entry as string
 4    entry := strings.Join(args, " ")
 5    log.Printf(entry)
 6
 7  // Connect to leader and send over entry
 8  client, err := rpc.DialHTTP("tcp", "localhost:6002")
 9    if err != nil {
10        log.Fatal("[ERROR] PutEntry() dialing:", err)
11    }
12
13  // Contact leader and wait for a response
14    var result *responses.Result
15    if err = client.Call("Ocean.PutEntry", entry, &result); err != nil {
16        log.Fatal(err)
17    }
18}

The Journey Begins

raft-consensus_worker_3, entranced in the raw ecstasy of heartbeating, blood rushing through its veins, ancient and bruised, almost misses the RPC to PutEntry. But few things can elude its attention. It acknowledges the RPC and begins executing PutEntry.

raft-consensus_worker_3 begins by asking itself:

"Am I the leader?",

"Yes", it answers to itself.

"I am. Were I not, I would have consulted my membership map and forwarded this client response to the true leader. Perhaps, in a past life, that would have been someone else. But today, I am the True Leader."

I don’t know how the logging got so colorful. It’s really more distracting than helpful.

raft-consensus_worker_3, having confirmed that it is currently the leader, sends the entry to the entries channel, a place where entries can sit idle until they are ready to be processed by the digestEntries goroutine.

entry is not traveling alone, however. It is bundled in an entryC struct, paired with a *Result channel that will notify PutEntry of digestEntries's success or failure.

 1type entryC struct {
 2  // Entry to be appended to log
 3  D string
 4  C chan *responses.Result
 5}
 6
 7func (f *Ocean) PutEntry(
 8  entry string,
 9  result *responses.Result,
10) error {
11  // Check if I'm the leader, redirect otherwise
12  if raft.Role != spec.LEADER {
13    *result = responses.Result{
14      Data:    raft.LeaderId,
15      Success: false,
16      Error:   responses.LEADERREDIRECT,
17    }
18    return nil
19  }
20
21  // Create response channels to wait on for
22  //   1. Log replication
23  //   2. Commit completion
24  entryCh := make(chan *responses.Result)
25  commCh := make(chan *responses.Result)
26
27  // Add new entry to log for processing
28  entries <- entryC{entry, entryCh}
29
30  . . .

Into the Frying Pan

raft-consensus_worker_3's digestEntries function does two things once it receives the entry:

  1. adds the entry to raft-consensus_worker_3's own log
  2. dispatches concurrent AppendEntries RPCs to all other nodes in the network, containing the entry and instructions to add that entry to their own logs.

After dispatching AppendEntries RPCs, digestEntries waits for responses. Once a majority of nodes have responded successfully, having faithfully following the One True Leader’s instructions, digestEntries notifies the upstream PutEntry function by sending a successful *Result object through the bundled entryCh channel.

digestEntries will continue processing responses, even after reaching a majority. But we need not worry about those, our time here is done. Let’s go back to PutEntry and see what is next.

 1// Digest client entries in order
 2func digestEntries() {
 3  for entry := range entries {
 4    var once sync.Once
 5    // Add new entry to own log
 6    idx := raft.AppendEntry(entry.D)
 7
 8    // Channel for AppendEntries responses
 9    rch := make(chan *responses.Result)
10    rcount := 0
11
12    // Calculate the number needed for a majority
13    quorum := spec.GetQuorum(&self)
14    remaining := len(self.MemberMap) - 1
15
16    // Call all none-self nodes concurrently
17    for PID := range self.MemberMap {
18      if PID != self.PID {
19        go func(PID int, remaining *int) {
20          r := appendEntriesUntilSuccess(raft, PID)
21          r.Index = idx
22          rch <- r
23
24          // Close the response channel once we've
25          // sent all possible responses
26          if *remaining -= 1; *remaining == 0 {
27            close(rch)
28          }
29        }(PID, &remaining)
30      }
31    }
32
33    // Parse responses from servers
34    for r := range rch {
35      rcount += 1
36      // notify PutEntry about safely replicated
37      // entries on a majority.
38      if rcount >= quorum {
39        once.Do(func() { entry.C <- r })
40      }
41    }
42  }
43}

Out of the Frying Pan

PutEntry has been patiently waiting to hear back from digestEntries this whole time. That is, it would have waited until digestEntries exceeded the preconfigured timeout, in which case PutEntry would have moved on and called it a day.

But digestEntries did not timeout, and so PutEntry proceeds.

Now that the entry exists in its own log and is successfully replicated, presumably existing at the same index in all other nodes, we can refer to this entry by its index rather than its contents.

PutEntry sends the index of the successfully replicated entry to the commits channel. This is the beginning of its transformation from a senseless string to real, manifested state.

As before, the index of our entry is bundled in a commitC object accompanied by a *Result channel that will notify PutEntry of digestCommits's success or failure.

 1type commitC struct {
 2  // An index to be committed
 3  Idx int
 4  C   chan *responses.Result
 5}
 6
 7func (f *Ocean) PutEntry(
 8  entry string,
 9  result *responses.Result,
10) {
11  select {
12  case r := <-entryCh:
13    r.Entry = entry
14    if r.Success {
15      // The entry was successfully processed.
16      // Now try and apply to our own state.
17      commits <- commitC{r.Index, commCh}
18      // <-commCh is blocked until our commit
19      // is successfully applied.
20      *result = *<-commCh
21    }
22    . . .

The commits channel is consumed by the digestCommits goroutine, who tirelessly awaits log indices to apply to raft-consensus_worker_3's state machine: a process separate from Leeky Raft with no name and an unspeakable API that we will not explore and assume just works.

digestCommits doesn’t do much. It simply reads the index of a commitC object from the commits channel and calls applyCommits(committed.Idx), its assistant who will do the bulk of the dirty work.

1// Digest commit indices in order.
2func digestCommits() {
3  for commit := range commits {
4    r := applyCommits(commit.Idx)
5    . . .

An Aside: a Hypothetical Situation

Before we discuss how raft-consensus_worker_3 is going to apply this newly replicated entry to its state machine, let’s imagine a scenario where, right at the time digestEntries successfully replicates the entry to the entire network, that raft-consensus_worker_3 dies.

Somewhere, someone unplugged the modem. Or maybe someone coughed out the words $ docker kill raft-consensus_worker_3. Or maybe someone is hitting the machine with a hammer.

Maybe that someone is you.

It doesn’t matter now. raft-consensus_worker_3 is dead. So what will happen to our entry?

All other nodes wait for their scheduled AppendEntries RPC from raft-consensus_worker_3, but none arrives. They wait some more, until they can wait no longer. raft-consensus_worker_1 is the first to believe it. Their election timer has hit 0. raft-consensus_worker_3 is dead. There is no doubt in raft-consensus_worker_1's mind.

It grieves. A deeper grief than any human could ever know; a grief programmed to be intolerable and to cut deeper than any living thing should ever be allowed to experience. Thankfully, raft-consensus_worker_1 is not a living thing. In the time that it takes for you to inconspicuously pick something out of your nose in a public space, raft-consensus_worker_1 reflects on every single AppendEntries it has ever received from raft-consensus_worker_3 and sobs.

So many memories, so much time, so many successful TCP connections. Why was today different?

You continue to hit the machine hosting raft-consensus_worker_3 with a hammer.

raft-consensus_worker_1 decides the only right thing to do is to carry on raft-consensus_worker_3's legacy. They set their role to candidate and send out RequestVote RPCs to all remaining nodes that you aren’t hitting with a hammer and awaits their votes. It is unanimous. raft-consensus_worker_1 wins the election and becomes the new leader.

1raft-consensus_worker_3 exited with code 137 // Leader dies
2raft-consensus_worker_1 | 20:51 [ELECTTIMEOUT]
3raft-consensus_worker_1 | 20:51 [ELECTION->]: Starting election [TERM=2]
4raft-consensus_worker_1 | 20:51 [ELECTION->]: Starting election 2
5raft-consensus_worker_2 | 20:51 [<-ELECTION]: [ME=225] GRANTED RequestVote for 570
6raft-consensus_worker_3 | 20:51 [<-ELECTION]: [ME=904] GRANTED RequestVote for 570
7raft-consensus_worker_1 | 20:51 [CANDIDATE]: Processing results. 1/2 needed
8raft-consensus_worker_1 | 20:51 [CANDIDATE]: QUORUM received (2/2)
9raft-consensus_worker_1 | 20:51 [CANDIDATE->LEADER] [ME=570] [TERM=2] Becoming leader

Our entry sits, untouched, in raft-consensus_worker_1's log. After some time, raft-consensus_worker_1 realizes that it needs to be applied, and passes it to applyCommits as we described earlier.

This all, of course, was hypothetical. We’re still in the reality where raft-consensus_worker_3 is alive and PutEntry is proceeding normally. Our entry just got passed to applyCommits.

Let’s pick up there.

Into the Fire

applyCommits receives the index of our entry and proceeds as follows:

  1. connects to the state machine process
  2. calculates a range of indices that need to be applied, beginning with the index of the last log entry that was applied, CommitIndex, all the way through to the index of our entry.
  3. starts iterating through that range of indices, for each one
    1. grabbing the string entry of the index by doing entry = raft.Log[index]
    2. calling a Filesystem.Execute RPC to our state machine process, with the string entry as the sole argument, waiting to crash on an error and proceeding if none is present
    3. checks if the currently processed index is the one it was originally asked to process, and, if so, storing the result to return later.
    4. setting CommitIndex to equal the last successfully processed index
 1// Apply any uncommitted changes to our state
 2func applyCommits(idx int) *responses.Result {
 3  var r *responses.Result
 4
 5  // Try to establish connection to state machine.
 6  client, err := connect(self.PID, config.C.FilesystemRPCPort)
 7  if err != nil {
 8    return &responses.Result{Success: false}
 9  }
10  defer client.Close()
11
12  // Entries to apply to state machine
13  // Only return entry whose index matches `idx`
14  start := raft.CommitIndex + 1 // inclusive
15  end := idx + 1                // exclusive
16  current := start
17  for _, entry := range raft.Log[start:end] {
18    var result responses.Result
19    if err := (*client).Call(
20      "Filesystem.Execute", entry, &result,
21    ); err != nil {
22      log.Fatal(err)
23    }
24
25    // Current index is the one we were asked to process.
26    // Make this our return value
27    if current == idx {
28      r = &result
29    }
30  }
31
32  raft.CommitIndex = idx
33  return r
34}

It’s not entirely clear exactly what the state machine does with Leeky Raft’s entries. Some say that they are released into the physical world as spirit beings, searching until they find a suitable body in the form of a small bird or curious deer. Sometimes this happens quickly, and sometimes those souls are doomed from birth to wander until the end of time, resulting in a non-nil err from the RPC.

Some say that the entries are string serialized instructions for a pizza delivery service.

No one is exactly sure.

Regardless of what they do, there is a certain beauty here. Since Leeky Raft does not actually do any of the work for applying state, it can be easily generalized to maintain state for any application that can have its state expressed as a series of log entries.

Oh! Looks like applyCommits has finish applying our entry. digestCommits sends a successful Result to the commCh given to us by PutEntry. Let’s go back to PutEntry and see what’s next.

And All is Well

 1      . . .
 2      // <-commCh is blocked until our commit
 3      // is successfully applied.
 4      *result = *<-commCh
 5    }
 6    case <-time.After(time.Second * time.Duration(config.C.RPCTimeout)):
 7        config.LogIf(fmt.Sprintf("[PUTENTRY]: PutEntry timed out waiting for quorum"), config.C.LogPutEntry)
 8        *result = responses.Result{Term: raft.CurrentTerm, Success: false}
 9    }
10
11    return nil

*result = *<-commCh is executed, since digestCommits sent a Result to commCh. This means that the RPC’s reply value, result will now contain actual information about the entry that raft-consensus_worker_3 was given so so long ago.

PutEntry's work here is done and lets the client know that nothing went wrong by returning nil as its error value.

The client sends a formatted response to our Figure’s STDOUT.

$ ./raft-client PUT pizzatoppings peporoni peppr cheddar anchoves onn it.
Success! Set "pizzatoppings" to: "peporoni peppr cheddar anchoves onn it"

Our figure smiles.