Building a Better Receive Loop

Written by Pete Corey on Jul 23, 2018.

I’ve been putting quite a bit of time this past week into overhauling and refactoring my in-progress Elixir-based Bitcoin node.

As a part of that overhaul, I turned my attention to how we’re receiving packets from connected peers. The way we’ve been handling incoming packets is overly complicated and can be greatly simplified by taking advantage of the Bitcoin protocol’s packet structure.

Let’s go over our old solution and dig into how it can be improved.

The Original Receive Loop

Our Bitcoin node uses Erlang’s :gen_tcp module to manage peer to peer communications. Originally, we were using :gen_tcp in “active mode”, which means that incoming packets are delivered to our node’s Elixir process in the form of :tcp messages:


def handle_info({:tcp, _port, data}, state) do
  ...
end

Because TCP is a streaming protocol, no guarantees can be made about the contents of these messages. A single message may contain a complete Bitcoin packet, a partial packet, multiple packets, or any combination of the above. To handle this ambiguity, the Bitcoin protocol deliminates each packet with a sequence of “magic bytes”. Once we reach this magic sequence, we know that everything we’ve received up until that point constitutes a single packet.

My previous receive loop worked by maintaining a backlog of all incoming bytes up until the most recently received sequence of magic bytes. Every time a new message was received, it would append those incoming bytes to the backlog and chunk that binary into a sequence of packets, which could then be handled individually:


{messages, rest} = chunk(state.rest <> data)

case handle_messages(messages, state) do
  {:error, reason, state} -> {:disconnect, reason, %{state | rest: rest}}
  state -> {:noreply, %{state | rest: rest}}
end

This solution works, but there are quite a few moving pieces. Not only do we have to maintain a backlog of all recently received bytes, we also have to build out the functionality to split that stream of bytes into individual packets:


defp chunk(binary, messages \\ []) do
  case Message.parse(binary) do
    {:ok, message, rest} ->
      chunk(rest, messages ++ [message])

    nil ->
      {messages, binary}
  end
end

Thankfully, there’s a better way.

Taking Advantage of Payload Length

Every message sent through the Bitcoin protocol follows a specific format.

The first four bytes of every packet are reserved for the network’s magic bytes. Next, twelve bytes are reserved for the name of the command being sent across the network. The next four bytes hold the length of the payload being sent, followed by a four byte partial checksum of that payload.

These twenty four bytes can be found at the head of every message sent across the Bitcoin peer-to-peer network, followed by the variable length binary payload representing the meat and potatoes of the command being carried out. Relying on this structure can greatly simplify our receive loop.

By using :gen_tcp in “passive mode” (setting active: false), incoming TCP packets won’t be delivered to our current process as messages. Instead, we can ask for packets using a blocking call to :gen_tcp.recv/2. When requesting packets, we can even specify the number of bytes we want to receive from the incoming TCP stream.

Instead of receiving partial messages of unknown size, we can ask :gen_tcp for the next 24 bytes in the stream:


{:ok, message} <- :gen_tcp.recv(socket, 24)

Next, we can parse the received message bytes and request the payload’s size in bytes from our socket:


{:ok, %{size: size}} <- Message.parse(message),
{:ok, payload} <- :gen_tcp.recv(socket, size)

And now we can parse and handle our payload, knowing that it’s guaranteed to be a single, complete Bitcoin command sent across the peer-to-peer network.

Final Thoughts

There’s more than goes into the solution that I outlined above. For example, if we’re receiving a command like "verack", which has a zero byte payload, asking for zero bytes from :gen_tcp.recv/2 will actually return all of the available bytes it has in its TCP stream.

Complications included, I still think this new solution is superior to our old solution of maintaining and continually chunking an ongoing stream of bytes pulled off the network.

If you’re eager to see the full details of the new receive loop, check it out on Github!

I’d also like to thank Karl Seguin for inspiring me to improve our Bitcoin node using this technique. He posted a message on the Elixir Slack group about prefixing TCP messages with their length to easily determine how many bytes to receive:

I’d length prefix every message with 4 bytes and do two recvs, {:ok, <<length::big-32>>} = recv(socket, 4, TIMEOUT) {:ok, message} = recv(socket, length, TIMEOUT)

This one line comment opened my mind to the realization that the Bitcoin protocol was already doing this, and that I was overcomplicating the process of receiving messages.

Thanks Karl!

Golfing for FizzBuzz in Clojure and Elixir

Written by Pete Corey on Jul 9, 2018.

I recently came across this riff on the FizzBuzz problem written in Clojure. While it’s admittedly not terribly obvious what’s going on, I thought it was a novel solution to the FizzBuzz problem.

How could we recreate this solution using Elixir? There are some obvious similarities between Clojure’s cycle and Elixir’s Stream.cycle/1. As someone who’s always been a fanboy of Lisp syntax, which solution would I prefer?

There’s only one way to find out…

But First, an Explanation

Before we dive into our Elixir solution, we should work out what exactly this Clojure solution is doing:


(clojure.pprint/pprint
  (map vector
    (range 25)
    (cycle [:fizz :_ :_])
    (cycle [:buzz :_ :_ :_ :_])))

Clojure’s clojure.pprint/pprint obviously just prints whatever’s passed into it. In this case, we’re printing the result of this expression:


(map vector
  (range 25)
  (cycle [:fizz :_ :_])
  (cycle [:buzz :_ :_ :_ :_])))

But what exactly’s happening here? Clojure’s map function is interesting. It let’s you map a function over any number of collections. The result of the map expression is the result of applying the function to each of the first values of each collection, followed by the result of applying the mapped function to each of the second values, and so on.

In this case, we’re mapping the vector function over three collections: the range of numbers from zero to twenty four ((range 25)), the infinite cycle of :fizz, :_, and :_ ((cycle [:fizz :_ :_])), and the infinite cycle of :buzz, :_, :_, :_, :_ ((cycle [:buzz :_ :_ :_ :_])).

Mapping vector over each of these collections creates a vector for each index, and whether it should display Fizz, Buzz, or FizzBuzz for that particular index.

The result looks just like we’d expect:


([0 :fizz :buzz]
 [1 :_ :_]
 [2 :_ :_]
 [3 :fizz :_]
 [4 :_ :_]
 [5 :_ :buzz]
 ...
 [24 :fizz :_])

An Elixir Solution

So how would we implement this style of FizzBuzz solution using Elixir? As we mentioned earlier, Elixir’s Stream.cycle/1 function is almost identical to Clojure’s cycle. Let’s start there.

We’ll make two cycles of our Fizz and Buzz sequences:


Stream.cycle([:fizz, :_, :_])
Stream.cycle([:buzz, :_, :_, :_, :_])

On their own, these two cycles don’t do much.

Let’s use Stream.zip/2 to effectively perform the same operation as Clojure’s map vector:


Stream.zip(Stream.cycle([:fizz, :_, :_]), Stream.cycle([:buzz, :_, :_, :_, :_])) 

Now we can print the first twenty five pairs by piping our zipped streams into Enum.take/2 and printing the result with IO.inspect/1:


Stream.zip(Stream.cycle([:fizz, :_, :_]), Stream.cycle([:buzz, :_, :_, :_, :_])) 
|> Enum.take(25)
|> IO.inspect

Our result looks similar:


[
  fizz: :buzz,
  _: :_,
  _: :_,
  fizz: :_,
  _: :_,
  _: :buzz,
  ...
  fizz: :_
]

While our solution works, I’m not completely happy with it.

Polishing Our Solution

For purely aesthetic reasons, let’s import the function’s we’re using from Stream, Enum and IO:


import Stream, only: [cycle: 1, zip: 2]
import Enum, only: [take: 2]
import IO, only: [inspect: 1]

This simplifies the visual complexity of our solution:


zip(cycle([:fizz, :_, :_]), cycle([:buzz, :_, :_, :_, :_]))
|> take(25)
|> inspect

But we can take it one step further.

Rather than using Stream.zip/2, which expects a left and right argument, let’s use Stream.zip/1, which expects to be passed an enumerable of streams:


[
  cycle([:fizz, :_, :_]),
  cycle([:buzz, :_, :_, :_, :_])
]
|> zip
|> take(25)
|> inspect

And that’s our final solution.

Final Thoughts

To be honest, I’ve been having troubles lately coming to terms with some of Elixir’s aesthetic choices. As someone who’s always admired the simplicity of Lisp syntax, I fully expected myself to prefer the Clojure solution over the Elixir solution.

That being said, I hugely prefer the Elixir solution we came up with!

The overall attack plan of the algorithm is much more apparent. It’s immediately clear that we start with two cycles of :fizz/:buzz and some number of empty atoms. From there, we zip together the streams and take the first twenty five results. Lastly, we inspect the result.

Which solution do you prefer?

Ping, Pong, and Unresponsive Bitcoin Nodes

Written by Pete Corey on Jul 9, 2018.

The last piece of low-hanging fruit required to finish up the connectivity of our in-progress, Elixir-powered Bitcoin node is to implement a system to detect unresponsive peer connections and prune them from our list of active peers.

Once an inactive peer is removed, our current system will automatically connect to a new peer to take its place.

There are several potential solutions for building out this kind of timeout system, and I’ve been weighing their pros and cons in the back of my mind for several weeks. I think I’ve come to a relatively simple and elegant solution that tackles the problem with minimal technical and mental overhead.

Let’s dive in!

Who Cares About Unresponsive Nodes?

In its current state, our Bitcoin node will connect to up to one hundred twenty five peer nodes. We assume that each of these nodes is a fully functioning and active part of the Bitcoin peer-to-peer network. If we don’t receive any messages from them, or if messages dwindle over time, we just assume that the network doesn’t have much to tell us.

This assumption can lead to trouble. If we continue to persist our connections to unresponsive nodes, it’s conceivable that eventually every node we’re connected to will become unresponsive for some reason or another.

At that point, our Bitcoin node is dead in the water. It’s unable to send or receive any information, and it’s unable to fetch any additional peers to reestablish its place in the peer-to-peer network. At this point our only course of action would be to restart the node and try again.

And that’s not a very robust solution…

Detecting Slow Connections

Instead, we should be proactive about pruning unresponsive nodes from our set of peers. The first piece of low hanging fruit was can go after is adding a timeout to our :gen_tcp.connect/2 call:


:gen_tcp.connect(
  IP.to_tuple(state.ip),
  state.port,
  options,
  Application.get_env(:bitcoin_network, :timeout)
)

If a node takes too long to respond to our initial connection request (in this case, :timeout is set to thirty seconds), we’ll retry the connection a few times and then ultimately remove the node from our set of peers.

Detecting Unresponsive Nodes

The next step in aggressively pruning our peer list is to watch for unresponsive nodes. We’ll do this by setting up a timeout between every message we receive from our peer. If we don’t receive another message before a certain cutoff time, we deem the peer unresponsive and break our connection.

We’ll start by adding a call to a new refresh_timeout/1 helper function in our :tcp info handler:


def handle_info({:tcp, _port, data}, state) do
  state = refresh_timeout(state)
  ...
end

The first time refresh_timeout/1 is called, it schedules a :timeout message to be sent to the current process after a certain amount of time. A reference to that timer is stored in the process’ current state:


defp refresh_timeout(state) do
  timer = Process.send_after(self(), :timeout, Application.get_env(:bitcoin_network, :timeout))
  Map.put_new(state, :timer, timer)
end

Subsequent calls to refresh_timeout/1 cancel the existing timer, and create a new one:


defp refresh_timeout(state = %{timer: timer}) do
  Process.cancel_timer(timer)
  refresh_timeout(Map.delete(state, :timer))
end

Now we need to add a callback to handle the scheduled :timeout message:


def handle_info(:timeout, state) do
  {:disconnect, :timeout, state}
end

Whenever we receive a :timeout message, we simply kill the current process, effectively disconnecting the associated peer.

Ensuring A Constant Stream of Messages

So now we’re disconnecting peers if we don’t receive a message from them within a certain period of time (thirty seconds in my case), but we have no way of guaranteeing that we should receive messages this frequently. What if there are no new blocks or transactions on the network?

To guarantee what we receive regular periodic messages, we need to set up a ping/pong loop.

Every so often we’ll send our peer node a “ping” message. If they’re still responsive, they’ll immediately respond with a “pong”. The peer will ensure our responsiveness by sending their own “pings”, which we’re already responding to.

According to the woefully under-documented Bitcoin protocol, we can’t send our first “ping” until we send back our “verack” message. Any messages sent prior to our “verack” will mark our node as “misbehaving” and risk a disconnection.


defp handle_payload(%Version{}, state) do
  with :ok <- Message.serialize("verack") |> send_message(state.socket),
       :ok <- Message.serialize("getaddr") |> send_message(state.socket),
       :ok <-
         Message.serialize("ping", %Ping{
           nonce: :crypto.strong_rand_bytes(8)
         })
         |> send_message(state.socket) do
    {:ok, state}
  else
    {:error, reason} -> {:error, reason, state}
  end
end

Now that we’ve sent our “ping”, we can expect to receive a “pong” in reply. When we receive the peer’s “pong” response, we want to schedule another “ping” to be sent a short time in the future. We do this by scheduling a :send_ping message to be sent to the current process after a short interval:


defp handle_payload(%Pong{}, state) do
  Process.send_after(self(), :send_ping, Application.get_env(:bitcoin_network, :ping_time))
  {:ok, state}
end

Our :send_ping handler sends another “ping” message, completing the ping/pong cycle:


def handle_info(:send_ping, state) do
  with :ok <-
         Message.serialize("ping", %Ping{
           nonce: :crypto.strong_rand_bytes(8)
         })
         |> send_message(state.socket) do
    {:noreply, state}
  else
    {:error, reason} -> {:error, reason, state}
  end
end

And that’s all there is to it!

As long as :ping_time is reasonably less than our :timeout, we should always have a constant stream of “ping” messages to keep our timeout timer from firing. If one of our peers ever fails to send their “pong”, we kill their corresponding Node process.

Final Thoughts

As far as I’m concerned, that wraps up the networking portion of our in-progress Elixir-based Bitcoin node project. In the future we’ll turn our attention to the actual guts of a Bitcoin node: processing blocks and transactions.

At some point we might also slap a fancy user interface on top of our node. Everything’s better with a great UI.

Stay tuned!