The whole point is for these to be familiar. It’s not about the topologies themselves, but how to take already-familiar arrangements and implement them in terms of the Actor model.

Keeping it simple with synchronous pairs

Perhaps the most basic non-trivial topology imaginable, is that of the synchronous pair. The resulting barbel structure is quite simple and predictable, though the following depiction indicates only connection rather than directionality.

req rep

Merely a dyad, this topology actually pretends to be synchronous by emulating synchronous semantics. Specifically, the waits to send another message until it’s received a reply to its original message. That is to say that \(a_0\) sends a PING to \(a_1\), it won’t send another until it’s received a corresponding PONG in return.

The following activity diagram depicts the exchange between \(a_0\) and \(a_1\). Note how in this topology, the responsibility for sending messages alternates between actors.

req rep activity

The above diagrams clearly show that the actors communicate directly with one another. This is a crucial detail as there are several other request/reply patterns demonstrated by other kinds of systems. Consider the call-and-response semantics exhibited in queue-based architectures, store-forward models evident in email systems, or even publish/subscribe models depicted in later patterns such as Meshes. In contrast, these types of request/reply patterns all rely on an intermediary of some sort. For example, a queue-based architecture would require a queue, which would itself be another actor!

Where you’ve seen it elsewhere

You may have seen similarities in simple usage of Berkeley-style sockets common in POSIX systems, and the Windows Socket API (Winsock) [1]. You may also note that this topology appears nearly identical to ZeroMQ’s Request-Reply pattern [5].

Example code

This code example, written in Scala using the Akka framework, shows two actors who interact synchronously. The first is an actor defined by the Pinger class and the second is defined by the Ponger class. Note that Akka borrows the Erlang notation for sending messages using a bang mark to separate the name of the receiving actor from the message being sent.

actor ! message

This particular example is rather simple but it does show off a common theme in various actor implementations, the requirement that a program first initializes an actor system as context in which actors are created. This is only necessary in certain implementations (such as Scala’s Akka, Rust’s Riker, Python’s Thespian) but not in others (such as Elixir).

import akka.actor._

// Define message types
case object Ping
case object Pong
case object Go

// Define a "Pinger" actor (a0)
class Pinger(ponger: ActorRef) extends Actor {
  def receive = {
    case Go =>
      ponger ! Ping
    case Pong =>
      println("Pinger: received Pong")
  }
}

// Define a Ponger actor (a1)
class Ponger extends Actor {
  def receive = {
    case Ping =>
      println("Ponger: received Ping")
      sender ! Pong
  }
}

// The application to be run
object PingPong {
  def main(args: List[String]) {
    // Spin up actors inside an actor system
    val context = ActorSystem("PingPong")
    val ponger = context.actorOf(Props[Ponger], name = "ponger")
    val pinger = context.actorOf(Props(new Pinger(ponger)), name = "pinger")
    
    // Initiate an exchange
    pinger ! Go
  }
}

Message types are defined directly as empty case object definitions without bodies. These message types are matched in the body of each actor’s receive implementation, a methodology which allows an actor to specify an "else" clause for handling unknown messages. Predictably, Scala’s syntax for this uses the _ notation.

The meat of our example begins in the PingPong object’s main method. This method creates the actors and then initiates the actor pair’s interaction by sending an intial Go message.

Centralizing resources with client-server

In the 80’s, as alternatives to the mainframe emerged, industry began adopting the client-server methodologies pioneered in academia years before. Both approaches sought to overcome the same obstacle with the same approach. Namely that it was impractical to give every user the computing power needed to perform every task and so they paired the strategies of resource concentration with remote access.

Since that time, we’ve seen trends oscillate unevenly between centralized and decentralized approaches. Though the popularity of many-user, single-machine installations has surely faded, the client-server architecture marketed as a mainframe alternative has not only endured but continued to evolve.

Usually, this topology involves a single server actor and several client actors. You might easily recognize it from the following diagram below, and the general structure forms a basis for the fan-in and fan-out patterns later on.

client server

In the above diagram, any client actor may make requests of the server actor. Note that in this particular case, it remains unspecified whether or not the server is obligated to respond with a message of its own. One interesting implication of this unspecified aspect is that, in the event that a client expects no reply from a server, it may opt to "fire and forget" in a completely asynchronous fashion rather than to emulate synchronous semantics. We will address this pattern’s contrasting synchronous and asynchronous modes in our code example.

Moreover, the diagram does not preclude the initiation of an exchange by the server rather than a client. Such scenarios might prove the case in practice with server push technologies like Server-sent events (SSE), which is part of the living HTLM5 spec maintained by the WHATWG.

Note that (just as when they were introduced) the terms "client" and "server" have a good deal of overlap with one another. As such, this is a fairly abstract topology which becomes significantly more meaningful once applied to specific problems.

Where you’ve seen it elsewhere

Clearly, this strongly resembles many modern protocols such as HTTP, LDAP, mail protocols like IMAP, and even the hypermodern JMAP suite of protocols.

Example code

There is no more canonical example of actor-based client-server topologies than Erlang/OTP’s gen_server behavior. Luckily for you, dear reader, we have opted to write this example not in Erlang but in Elixir using the venerable GenServer module which wraps and elides the Erlang underpinnings.

In the world of OTP, a behavior is a special module whose contents are structured to enrich other modules through the use of callback functions.

Both gen_server and GenServer stand for Generic Server, so named on account of the generic nature of the functionality provided to implementors of the behavior. In this case, the callback functions are where the real logic lies, and the behavior’s internal bookkeeping manages details which would otherwise become a source of frustration and bugs. Pesky stuff no one wants to deal with, such as state management and compatibility with the supervision trees for which the ecosystem has become famous.

The below example has callback functions for handling both synchronous requests (calls) and asynchronous messages (casts), which you can identify by looking for handle_call and handle_cast definitions respectively.

# Define the server GenServer with callback implementations
defmodule Server do
  use GenServer

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call(:ping, _from, state) do
    IO.puts("Received a synchronous ping!")

    {:reply, :pong, state}
  end

  @impl true
  def handle_cast(:ping, state) do
    IO.puts("Received a ping asynchronously!")

    {:noreply, state}
  end
end

# Spin up the server actor
{:ok, server} = GenServer.start_link(Server, [])

# Send a synchronous ping
GenServer.call(server, :ping) # returns a :pong response from the server

# Send an asynchronous ping
GenServer.cast(server, :ping) # returns :ok immediately without waiting

Elixir idiom rarely consider it good practice to use functions in the GenServer module directly. In our example, style would dictate that we instead opt to define convenience functions in the Server module itself to present a tidier public interface.

Fan out with publishers

By increasing the specificity of the client-server topology, we may articulate additional patterns for examination. In this chapter, we add directionality to derive a fan-out topology in which a single actor, the publisher, sends messages to a group of other actors, the listeners.

fan out

There are several use cases for such an arrangement, including selective publication, indiscriminate publication, and providing multiple listeners with a single facade suitable for presentation to additional actors. In all examples, the publisher is performing some service for the listeners. Let’s look into these use cases in depth and explore some of their strengths and weaknesses.

Selective publication

Routing

The ability of the publisher to direct specific messages to the a specific (hopefully correct) listener requires the publishing actor to have some understanding of message contents. While the ability to route messages could be exceedingly useful in cases of actor specialization, it may make the router actor more complex and yield to poor separation of concerns due to oversharing of information.

Load balancing

Using a scheme as simple as round robin or as complex as deterministic hashing, the publisher may distribute messages to its listeners. This allows for partitioning some work by muxing tasks, in whole or in part, onto the pool of listening actors.

Publish/subscribe

In the case of something approximating the Observer pattern to alert interested listeners of a given happening.

Indiscriminate publication

Broadcasting

Sending or forwarding a message to the entire group of listeners seems like a form of broadcasting or repetition in which a publisher clearly mediates communications to its listeners. Barring the case where other actors separately know the names/addresses of listeners, the publishing actor may even opt not to broadcast a message it receives. This could be done based on the message’s content, in which case the publisher’s refusal to transmit the message onward would equate to exerting a form of access control.

Translation/transformation

By knowingly proxying message contents, the publisher maps information from a form which is not understood by listeners to a form which is understood. This positions the publisher as an intermediary translator. Just know that this need not be a one-to-one mapping. Consider an example wherein a publisher receives a single message type, \(p\) which causes more than one message of type \(q\) to be sent to each listener. Such a case is easily conceivable in situations where a publisher learns of some task but wishes to break it down into discrete chunks (tasklets, perhaps) for distribution amongst the listening group.

Example code

Let’s consider an example supporting translation/transformation of data. In this case, there are two different receivers of messages. Those receivers use two different versions of this message. The publisher takes a message and converts appropriately for each of the child actors that are consuming. In the main function, the ActorSystem is established and messages are published to the primary actor. The messages are also shown below.

using Akka.Actor;
using Distributor.Actors;
using Distributor.Types;

namespace Distributor
{
    class Program
    {
        public static ActorSystem DistributorSystem;

        static void Main(string[] args)
        {
            DistributorSystem = ActorSystem.Create("DistributorActorSystem");
            IActorRef publisher = DistributorSystem.ActorOf<Publisher>("publisher");

            // String messages
            publisher.Tell("v2blah");
            publisher.Tell("v1blah");
            
            // Object messages
            MessageV1 v1msg = new MessageV1(Guid.NewGuid().ToString(), "V1 Message");
            MessageV2 v2msg = new MessageV2(Guid.NewGuid().ToString(), "V2 Message", "original message");

            publisher.Tell(v1msg);
            publisher.Tell(v2msg);

            // Terminate the publisher process
            publisher.Tell("terminate");

            DistributorSystem.WhenTerminated.Wait();
        }
    }
}
namespace Distributor.Types {
    public class MessageV1 {
        public MessageV1(string identifier, string message) {
            Identifier = identifier;
            Message = message;
        }

        public string Identifier { get; set; }
        public string Message { get; set; }
    }

    public class MessageV2 : MessageV1 {
        public MessageV2(string identifier, string message, string ctx) : base(identifier, message) {
            Context = ctx;
        }

        public string Context { get; set; }
    }
}

The actors in this scenario are shown below. In the Publisher Actor, it receives different types of messages and maps them into either a MessagesV1 or MessagesV2. Note how in Akka.NET one is able to define different Receive<T> methods based on the type of message that is being received. Once the mapping from various formats is done, the message is sent to each of the V1Processor and V2Processor Actors. Those actors then perform their functions. Finally, the Publisher Actor also listens for a "terminate" message and then terminates the ActorSystem.

using Akka.Actor;
using Distributor;
using Distributor.Types;

namespace Distributor.Actors {
    // Receive messages, convert to either v1 or v2 and send to those message processors
    public class Publisher : ReceiveActor {
        public static IActorRef v1Proc;
        public static IActorRef v2Proc;

        public Publisher() 
        {
            v1Proc = Program.DistributorSystem.ActorOf<V1Processor>("v1Proc");
            v2Proc = Program.DistributorSystem.ActorOf<V2Processor>("v2Proc");            

            // Convert a string to a Message type
            Receive<string>(msg => 
            {
                MessageV1 newv1Msg = new MessageV1(Guid.NewGuid().ToString(), msg.Replace("v1", ""));
                MessageV2 newv2Msg = new MessageV2(Guid.NewGuid().ToString(), msg.Replace("v2", ""), "converted by Publisher - v2");

                v1Proc.Tell(newv1Msg, Self);
                v2Proc.Tell(newv2Msg, Self);

                if (msg == "terminate") {
                    Context.System.Terminate();
                }
            });

            // Send a v1 message and convert to a v2 message
            Receive<MessageV1>(msg => 
            {
                MessageV2 v2Msg = new MessageV2(msg.Identifier, msg.Message, "converted by Publisher - v2");
                v1Proc.Tell(msg, Self);
                v2Proc.Tell(v2Msg, Self);
            });

            // Send a v2 message and convert to a v1 message
            Receive<MessageV2>(msg => 
            {
                MessageV1 v1Msg = new MessageV1(msg.Identifier, msg.Message);
                v1Proc.Tell(v1Msg, Self);
                v2Proc.Tell(msg, Self);
            });
        }
    }

    // V1 Message Processor
    public class V1Processor : ReceiveActor {
        public V1Processor() {
            Receive<MessageV1>(msg => {
                Console.WriteLine("Got a message {0} with ID {1}", msg.Message, msg.Identifier);
            });
        }
    }

    // V2 Message Processor
    public class V2Processor : ReceiveActor {
        public V2Processor() {
            Receive<MessageV2>(msg => {
                Console.WriteLine("Got a message {0} in context {1} with ID {2}", msg.Message, msg.Context, msg.Identifier);
            });
        }
    }
}

Fan in with consolidators

In the previous chapter we looked at building the fan-out concepts of publisher and listeners on client-server topologies by adding directionality. In this chapter, we’ll reverse that directionality to produce a fan-in topology in which a group of producers send messages to a single consolidator.

fan in

Notice how, by comparison, the above is the direct inverse of the previous chapter’s diagram. As such, they are not without similarity. Both lone actors act as focal points in their respective topologies. Here, we see the consolidator is the message sink for its producers, a situation we could put to use in a variety of ways.

Assume that every producer actor computes a value at regular intervals, perhaps based on message traffic received from its neighbors. Hypothetically, the overall system has to have some way of collecting those values for later analysis. In this case, each producer functions as a sensor and regularly "phones home" with its observations.

Now let us extend this example to include the additional constraint of the system wishing to determine received observations' order as part of its analysis. In that case, the consolidator would serve to linearize the stream of incoming messages to establish a global ordering.

Example code

Our example, written in TypeScript, will refrain from imposing any particular domain challenges. Instead, our three producers send integers to our single consumer. All the action takes place in an asynchronous function called main which makes heavy use of the async and await syntax now common throughout ECMAScript’s ecosystem.

import { createSystem, ActorRef } from 'comedy';

const main = async () => {
  const system = createSystem({});

  const rootActor = await system.rootActor();

  const consolidator = await rootActor.createChild({
    consume: (num: number) => {
      console.log(`${num}`);
    }
  });

  const makeProducer = async () =>
    await rootActor.createChild({
      produce: (i: number, consolidator: ActorRef) => {
        consolidator.send('consume', i + 1);
      }
    });

  const producers = [
    await makeProducer(),
    await makeProducer(),
    await makeProducer()
  ];

  producers
    .map((producer, index) => producer.send('produce', index, consolidator))
    .forEach(async (producerPromise) => await producerPromise);

  return system;
};

main().then((system) => system.destroy());

The first thing you may notice is that the only function directly imported from the Comedy library is createSystem. Once we’ve constructed an ActorSystem we are able to get a handle on the root Actor which we use to create the actors in our system. We create the consolidator Actor with a single handler for consume messages. The three producer Actors created by the makeProducer function respond only to produce messages which contain an integer to increment and a reference to the consolidating Actor to whom producers should fan in. Finally we iterate over the producers, sending each the produce message with the appropriate data and the rest falls into place.

Note that we must finally signal that no more activity should be expected by destroying the ActorSystem we created in order to allow the runtime to terminate. Were we to skip this step, the program would simply hang until killed manually.

Linear composition with pipelines

As we learned from the last two topologies, directionality can have a tremendous impact on the way we understand and use actor patterns. However, let’s take a step back from \(n\) to \(m\) models and go back to our roots. Namely, the humble pair.

In this pattern, we’ll learn about an extension to the pair, the \(n\)-ary pipeline, and how it enables the linear composition of actors.

A pipeline consists of actors sending messages to one another, one at a time, in simple sequence. A pipeline with 5 actors could look like the following diagram.

pipeline

In accordance with this depicted pipeline, we might be tempted to imagine messages originate with \(a_0\) as the source and travel through the three intermediate actors (\(a_1\), \(a_2\), \(a_3\)) until reaching the sink of \(a_4\). This is a tempting and seductive idea, but beware its beguiling simplicity! In practice, pipelines are rarely chains of mere repetition but more often sequences of stepwise computation.

The pipeline topology really shines when it’s used to break up computation into discrete steps which can be cordoned off in different actor definitions. This allows for segregation of responsibilities without leaking abstractions from the entire group. For example, all that actor \(a1\) knows is that it gets a certain kind of message from \(a0\) which it knows how to process before sending the result along to \(a2\). Actor \(a1\) has no idea about its place in the larger system or the overall topology of which it is a part. This is a good thing because it allows for the application of the Single Responsibility Principle (SRP) to actor design.

Example code

In this example we use a Python actor library called Thespian to construct our pipeline. We specifically write the pipeline to be self constructing insofar as each actor spins up the next actor in the sequence. This showcases three related things:

  1. Actors should be cheap to spin up, an idea which is strongly dependent upon the implementation being used.

  2. If actors are cheap, they should spawn other actors rather liberally.

  3. Actors can multiply rather rapidly.

With these principles in mind, we see the definite possibility for our actors to rapidly multiply in number. In fact if a Source always creates a Worker which always creates a Sink, that means that for every time the Source gets a "go" message, it will result in the creation of two additional actors. This could be a significant flaw in our design unless the underlying implementation has some sort of garbage collection mechanism for stale or dormant actors.

from thespian.actors import *

from dataclasses import dataclass
import hashlib
import uuid


@dataclass
class Data:
    data: dict

    def __str__(self) -> str:
        return str(self.data)


class Sink(Actor):
    def receiveMessage(self, message, sender):
        print(f"Got hashed value: {message.data['contents']}")


class Worker(Actor):
    def receiveMessage(self, message, sender):
        contents = message.data["contents"]
        sha = hashlib.sha256(contents).hexdigest()
        data = {
            "kind": "finished",
            "contents": sha
        }
        msg = Data(data)
        sink = self.createActor(Sink)
        self.send(sink, msg)


class Source(Actor):
    def receiveMessage(self, message, sender):
        if "go" == message.data["kind"]:
            data = {
                "kind": "process",
                "contents": uuid.uuid4()
            }
            msg = Data(data)
            worker = self.createActor(Worker)
            self.send(worker, msg)


if __name__ == "__main__":
    source = ActorSystem().createActor(Source)
    ActorSystem().ask(source, {"kind": "go"})

Our example’s notion of work is rather trivial. The Source generates a UUID which gets passed to the Worker for hashing using the SHA256 hash algorithm using the standard implementation from the hashlib module included in Python’s standard library.

Yet, the self-assembling nature of our pipeline’s design raises a fascinating question. Namely, what are the alternatives? Since actors need to know the name of another actor in order to be able to send it messages, if each actor didn’t create its subsequent actor, we would need some method to define and communicate the overall structure of the pipeline. This is certainly possible, and perhaps even desirable.

In our case, the pipeline’s structure could simply be created by the Source and included in the message structure as a property.

In this case, it’s a simple list.

from thespian import Actor
from dataclasses import dataclass
from typing import List


@dataclass
class Data:
    msg: dict
    pipeline: List[Actor]

    def __str__(self) -> str:
        return str(self.data)

Meshes

Like the previous pattern of pipelines, meshes eschew traditional notions of client and server. Instead, a mesh topology usually opts for direct connections between actors which must interact. The most famous application of mesh topologies on the internet were certainly the peer-to-peer (p2p) filesharing networks of the early 2000’s. In fact, the network known as Gnutella [2] was the subject of much inquiry by researchers who wished to observe a large mesh topology (called a swarm) in the wild.

Meshes are often characterized by direct connectivity between components, called peers, and new connections become possible following the introduction of peers to one another via intermediaries. In the Actor model, peers are simply actors and introductions take place by sending names to one another via messages.

mesh

Later iterations of file sharing networks introduced a few refinements worth mentioning, such as the superpeer/supernode model popularized by KaZaA which were also the subjects of much research [4]. In the following example, most actors would not speak to one another directly, but coordinate through their "local" superpeers (represented by hexagons) who would speak with the larger network on their behalf.

mesh supernode
Figure 1. Hexagons are superpeers

The introduction of superpeers spurred some degree of selective centralization with superpeers becoming like the "servers" of their neighborhood. This innovation was able to capture the benefits of a client-server architecture with the robustness of a decentralized network. Introducing the superpeer concept partitioned clusters of peers into "localities" and effectively organized all peers into two layers: leaf peers and superpeers. This model became so effective that the network itself would be designed to preserve the concept of a superpeer in case of peer failure. When a superpeer left the swarm, a leaf peer would be "promoted" to superpeer dynamically using a variety of algorithms [3].

Example code

While the implementation of a superpeer network, complete with dynamic promotion, is a bit beyond the scope of this book, we do give an implementation of a mesh peer actor. The purpose of this example mesh is to return search results from the network’s collection of files. It does this using a simplified set of semantics modeled loosely on the original Gnutella desitself. In particular, this actor, written using the GenServer behavior must be initialized with a list of peers already in the swarm and a list of filenames to which it has access.

defmodule Peer do
  use GenServer

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call({:result, peer}, _from, state) do
    IO.puts("Got result from #{peer}!")

    {:reply, :thanks, state}
  end

  @impl true
  def handle_cast({:search, filename, origin, ttl}, state) do
    IO.puts("Received a search for '#{filename}' from #{origin}")

    # If filename exists locally
    if Enum.member?(state[:files], filename) do
      # Tell the originator of the query that we have it
      GenServer.call(origin, {:result, self()})
    else
      # Otherwise, forward the query to our peers if TTL > 0
      if ttl > 0 do
        state[:peers]
        |> Enum.each(fn peer ->
          GenServer.cast(peer, {:search, filename, origin, ttl - 1})
        end)
      end
    end

    {:noreply, state}
  end
end

Bibliography

[1] Microsoft, “Winsock — Windows Dev Center.” 2018, [Online]. Available: https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-reference.

[2] A. Oram, “Gnutella,” Peer-to-Peer: Harnessing the power of disruptive technologies, 2001.

[3] L. Xiao, Z. Zhuang, and Y. Liu, “Dynamic layer management in superpeer architectures,” IEEE Transactions on parallel and distributed systems, vol. 16, no. 11, pp. 1078–1091, 2005.

[4] B. B. Yang and H. Garcia-Molina, “Designing a super-peer network,” in Proceedings 19th international conference on data engineering (Cat. No. 03CH37405), 2003, pp. 49–60.

[5] ZeroMQ, “ZGuide — Ask and Ye Shall Receive.” 2013, [Online]. Available: http://zguide.zeromq.org/page:chapter1#Ask-and-Ye-Shall-Receive.