This chapter contains a few patterns which we, the authors, have found to be useful over the years. In particular, we have worked hard to bring concepts from mainstream functional programming into the world of the Actor model.
Isolating side effects with an Entity and Effect Machine
Our notion of an Entity and Effect Machine (EEM) stems from problems we encountered when desiging systems centered around asynchronous work. In object-oriented programming, the ability to delay work may come from various language-neutral design patterns such as the Command Pattern. In functional programming, this may be done by means of an effect system. This EEM content focuses significantly more on the idea of unevaluated functions, called thunks, and therefore stems from the FP effect approach.
The whole principle can be summed up in one idea, namely that an effect machine is a data entity transformer which emits effects:
Namely that: \((Operation, Entity) \mapsto (Effects, New Entity)\).
Or, more succinctly: \(Operation(Entity) = (Effects, Entity')\).
In other words, it’s a Mealy machine for data.
This implies that the core goal of a processor is to surjectively map from a input \(Entity\) to an output \(Entity'\) by means of an \(Operation\). Additionally, the application process also results in a set of algebraic effects, which can impact the global state, called \(Effects\). The handling of such effects becomes an implementation detail, where an effect may be:
-
Executed synchronously
-
Executed asynchronously
-
Deferred
-
Placed on a queue
-
Sent elsewhere for handling
-
Discarded
Note that since we have no way of knowing if, when, or how an effect will be applied, they cannot influence the system in any way. Therefore, the set of all operations is disjoint from the set of all effects. A failure to keep them separate would introduce the possibility of self modification.
| In this document, the term "effect" is a stand-in for "algebraic effect" with the intent that it is something incidental to the mapping — for any processor, \(p\) — that \(p: Entity \mapsto Entity'\). |
We may concieve of a processor as little more than function application, should we wish to do so.
Consider a hypothetical data system which receives records. We wish for this data system to both transform records as well as do something. In other words, upon receipt of a record, we want the system to transform it and react to its content. Therefore, the state is the input, and the new state is the transformed record. And, in this case, the "reaction" is a set of effects to be executed in response to the input record’s content.
Composition
Such processes can be chained, or composed, in a functional fashion.
The functional nature of this approach permeates the entire concept. It imposes constraints such as statelessness and computational purity. These constraints enable some interesting properties, however.
- Atomic (interruptable and and resumable)
-
Since entity transitions are pure and based solely on direct input, application of an operation isn’t complete until it produces a new entity. Therefore each individual application may be interrupted at any point as effects are not executed until after an entity has also been produced by the processor.
- Distributable (non-local)
-
As there has been no locality requirement expressed for any of the above, computation could easily be distributed among different nodes in a cluster. In this case, entities likely become messages between Actors.
- Composable
-
As depicted above, processors can be composed functionally.
Illustrative code examples
The following are intended to illustrate the above and not to implement it entirely. For example, effects are largely modeled as simple functions.
Scala example
| The following does not use Higher-Kinded Types (HKTs). |
object EffectMachine {
trait Entity[T] {
def get(key: String): T
def set(key: String, value: T): T
}
type Effect = () => Unit
case class ComputationResult(effects: List[Effect], newEntity: Entity[_])
type Operation = Entity[_] => ComputationResult
def processor(op: Operation, entity: Entity[_]): ComputationResult = op(entity)
}
Rust example
trait Entity {
fn get<T>(key: &str) -> T;
fn set<T>(key: &str, val: T) -> T;
}
struct ComputationResult<S: Entity> {
// Vector of functions from () -> ()
effects: Vec<Box<dyn Fn() -> ()>>,
new_entity: S,
}
type Operation<S: Entity> = Box<dyn Fn(S) -> ComputationResult<S>>;
fn processor<S: Entity>(op: Operation<S>, entity: S) -> ComputationResult<S> {
op(entity)
}
Elixir example
defmodule VmEx do
@moduledoc """
Documentation for VmEx which represents a tiny little entity effect processor.
"""
require Logger
# entity can be represented by a simple `Map`
@type entity :: %{String.t() => any()}
# Any function which takes a entity and returns a computation result is an operation
@type operation :: (entity() -> computation_result())
# Effects are functions which takes no argumnets and always returns :ok
@type effect :: (() -> :ok)
# Defining computation result
@type computation_result :: %{new_entity: entity(), effects: list(effect())}
@doc "Processes an operation on a entity into a `ComputationResult`"
@spec processor(operation(), entity()) :: computation_result()
def processor(op, entity) do
op.(entity)
end
end
Example application: Payment requests handler
Assume a payment processing preparation system with ill-defined responsibility boundaries. Raw payment requests are input to a handler which formats the requests for subsequent processes to consume. If, during processing, the handler encounters a request for an amount exceeding \(n\) dollars, it emits a notification. There should be only a single notification for every request whose amount exceeds the threshold. The following string diagram depicts such a rudimentary system.
Consider what a full scope of responsibility would look like for the handler, as it grows to encompass many concerns.
| The list gets more granular as we drill down into specifics. |
-
Handling payment requests by:
-
Formatting them for downstream consumers.
-
Examining their amounts.
-
-
Transmitting notifications to an external system.
-
Tolerating connections and disconnection.
-
Tolerating delivery failures.
-
Connection or delivery retries.
-
Connection or delivery timeouts.
-
The above raises a variety of questions, many of which remain in play regardless of design considerations or the framework of its architectural conception.
-
How does the handler ensure that it doesn’t take in data which it doesn’t understand?
-
How does the handler ensure exactly-once delivery?
-
Could an invalid, or otherwise-unformattable payment request still trigger a notification?
-
Is this even desirable, merely tolerable, or considered an error?
-
-
How do you test such a system?
-
Is such a system both obervable and controllable sufficient to test it effectively?
-
Do you need to mock systems receiving notifications, in addition to input and output payment requests?
-
-
What happens if the handler receives the same payment request twice?
A more traditional take
Traditionally, the approach would be to have a synchronous monolith with a large scope of responsibility. This implies high coupling between internal subsystems, as the list of concerns lacks separation. The monolithic nature of the payment request handler isn’t necessarily or inherently bad. It’s the coupling, however, which poisons the well.
The traditional way that such systems have often been designed has been to start with the database. This narrows the focus almost solely to data-at-rest and encourages high coupling by limiting modularity.
TODO: Fill this out more with Brittany’s help.
A take influenced by Mealy Machines
In this model, the payment request handler is a memoryless processor which maps a raw payment request to a formatted payment request. If it were a function, \(h\), its signature would be something like
\(h: RawRequests \mapsto{} FormattedRequests\)
and, due to the likely presence of unique identifiers, would likely be bijective. However, coincidental to the payment request formatting, the handler may also interact with the outside world in the form of firing off a notification. These impure operations get modeled as effects.
\((Handle, RawRequest) \mapsto (Notifications, FormattedRequests)\)
Or, more succinctly:
\(Handle(Request) = (Notifications, Request')\)
In this way, it becomes an implementation detail as to how the notifications are, themselves, handled. Notifications might be dispatched immediately after each incoming request, or they might be deferred until a period of relative quiet. Perhaps they could be muxed onto a pool of workers, in which case they could easily be sent out of order. Or, in a testing environment, the notifications might simply be analyzed for correctness before being discarded entirely.
A consideration aimed at modularity
A functional approach, one focused on finite states and effects, lends itself to a natural separation of concerns. Specifically, the composition of components based on well-defined interfaces lends itself to the individual testing, deployment, and management of those components.
A real-world system architecture might actually be constructed as per the following string diagram.
In the above, each component of the system has a discrete responsibility which is well-defined. Their interaction with one another can take place through well-defined interfaces and data flows solely in one direction eliminating feedback loops. Morevoer, it remains an implementation detail as to how effects are handled, meaning that the formatting concern is cleanly separated from the notification concern.
Unfortunately, not all problems can be solved with such a restrictive design. Consider a case where the payment request handler is not referentially transparent or reentrant? For example, if the payment request handler must consult a database or ask an external system for help during its formatting operation. Such dependency on extra-function global state would break its compatibility with a purely-functional approach.
Let’s look to the Actor model for a more widely applicable, and certainly less strict, approach.
Looking to Actors
The Actor model allows us to structure our systems using the same concepts as before, but with more flexibility and no loss of generality. We maintain the core principles while providing a mechanism other than functions to express state, in conjunction with the temporal sensitivity and reactive nature of the Actor model itself. While we could certainly write a sample implementation in languages such as Rust (Actix) or Scala (Akka), the following example is complete in Elixir/OTP and uses third-party frameworks rather sparingly.
Since Actors cannot share state or communicate except by passing messages to one another, we must specify well-defined interfaces and data flows. We’ll rely on two kinds of messages, call (sync) and cast (async), to design our Actor communication systems.
In this case, let’s assume three Actors for right now:
-
The source of payment requests
-
The payment request handler as an EEM
-
The effect handler
The communication between them is depicted in the following diagram.
-
Raw payment requests (entities) flow from the Source into the Request Handler (an EEM) through calls.
-
The Request Handler actually formats the payment requests and (optionally) casts notifications to the Effect Handler.
-
The Request Handler responds to the Source with the formatted payment requests.
An implementation in Elixir/OTP
We can easily whip up an example using the venerable GenServer OTP behavior (generic server) found in Elixir. Note that we use GenServers only where needed to segregate runtime semantics and not for code organization (we merely need modules and functions for that). However, this does allow us to organize Actors into supervision trees where they can be managed (restarted, etc.) in case of failure.
| The notion of automatic restarts overlaps heavily with the topics of resiliency engineering and even the concept of crash-only software. |
In classic fashion, we’ll design our system with a data-first approach.
While we could eventually get fancy with something like Algae and use Algebraic Data Types (ADTs), we’ll stick with vanilla Elixir structs.
Our first module represents the system’s primary data entity, a PaymentRequest and encapsulates a payment request’s representation and logic.
It currently has three attributes, a name, a list of recipients, and an integer amount.
defmodule PaymentRequest do
@moduledoc """
Represents a fake payment request in our example system.
"""
@enforce_keys [name: "", recipients: [], amount: 0]
defstruct @enforce_keys
@type t :: %__MODULE__{
name: String.t(),
recipients: list(String.t()),
amount: integer()
}
@doc "Formats a `PaymentRequest`, right now into a simple `String`."
@spec format_payment_request(t()) :: String.t()
def format_payment_request(payment_request) do
inspect(payment_request)
end
@doc "Generate a fake `PaymentRequest` for demo purposes."
@spec bogus_payment_request() :: t()
def bogus_payment_request do
%__MODULE__{
name: "PaymentRequest #{Faker.Name.name()}",
recipients: [Faker.Code.isbn(), Faker.Code.isbn(), Faker.Code.isbn()],
amount: :rand.uniform(1000)
}
end
end
We also put payment-request-related logic in this module such as formatting, and even a helper to generate a fake payment-request for us to use in demos and testing.
Next, we write our payment request handler as an Actor implementing the GenServer behavior, called PaymentRequestHandler. It has a minimal public API.
defmodule PaymentRequestHandler do
@moduledoc """
An example Entity Effect Machine (EEM) Actor for `PaymentRequest`s which handles them.
"""
use GenServer
require Logger
# Client
@doc "Start the Actor with an initial state."
@spec start_link(map()) :: {atom(), pid()}
def start_link(local_state \\ %{requests_count: 0}) when is_map(local_state) do
Logger.info(
"Starting #{inspect(__MODULE__)} with #{local_state[:requests_count]} payment requests in history"
)
GenServer.start_link(__MODULE__, local_state, name: __MODULE__)
end
@doc "Handle a `PaymentRequest` by dispatching a message to the default `PaymentRequestHandler` Actor."
@spec handle_payment_request(PaymentRequest.t()) :: String.t()
def handle_payment_request(payment_request) do
Logger.info("Handling payment_request: #{payment_request.name}")
GenServer.call(__MODULE__, {:entity, payment_request})
end
# Server (callbacks)
@impl true
def init(local_state) do
{:ok, local_state}
end
@impl true
def handle_call({:entity, payment_request}, _from, local_state) do
formatted_payment_request = PaymentRequest.format_payment_request(payment_request)
if payment_request.amount > 500 do
effect = fn -> Logger.info("Notification: payment_request amount exceeds threshold") end
EffectHandler.handle_effect(effect)
end
new_state =
local_state
|> Map.put(:requests_count, local_state[:requests_count] + 1)
{:reply, formatted_payment_request, new_state}
end
@impl true
def handle_call(:status, _from, local_state) do
{:reply, local_state[:requests_count], local_state}
end
end
Since PaymentRequestHandler only implements handle_call, other Actors may only interact with it synchronously.
It has a very rudimentary local state for simple counting of requests it has seen since startup.
In fact, you may ask it for the requests count by sending the :status message.
Finally, we implement our third module, EffectHandler. It, too, has a minimal public API.
defmodule EffectHandler do
@moduledoc """
Serves only to handle effects implemented as functions.
"""
use GenServer
require Logger
# Client
@doc "Start the Actor with an initial state."
@spec start_link(any()) :: {atom(), pid()}
def start_link(local_state \\ nil) do
Logger.info("Starting #{inspect(__MODULE__)}")
GenServer.start_link(__MODULE__, local_state, name: __MODULE__)
end
@doc "Handle an effect by dispatching a message to the default `EffectHandler` Actor."
@spec handle_effect(function()) :: :ok
def handle_effect(effect) when is_function(effect) do
Logger.info("Received effect!")
GenServer.cast(__MODULE__, {:effect, effect})
end
# Server (callbacks)
@impl true
def init(local_state) do
{:ok, local_state}
end
@impl true
def handle_cast({:effect, effect}, local_state) do
# Execute the effect
effect.()
{:noreply, local_state}
end
end
However, since it only implements handle_cast, its interaction pattern is purely async.
As such, the Eem.Application module is pretty standard.
defmodule Eem.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
children = [
# Starts a worker by calling: Eem.Worker.start_link(arg)
# {Eem.Worker, arg}
{PaymentRequestHandler, %{requests_count: 0}},
EffectHandler
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Eem.Supervisor]
Supervisor.start_link(children, opts)
end
end
Analyzing the implementation
A great thing about Actors is that they are largely opaque to one another. Therefore, if the load grew too great to handle on any one component, that individual Actor could be distributed to another node (perhaps using Partisan) or even broken into a worker pool of multiple Actors working to cooperatively amortize message handling over the group.
For example, consider a case where the PaymentRequestHandler becomes a bottleneck due to its exclusive usage of calls.
The PaymentRequestHandler could be decomposed into a worker pool (perhaps managed by pg2) and calls routed to different worker Actors.
An allocation strategy such as round robin or random routing could be very successful.
The nature of an Elixir/OTP Application is such that it runs with individual components as child processes within the same BEAM.
Furthermore, child processes can be restarted in the event of crashes and have their entire lifecycles easily managed for maintenance purposes.
More can be learned by reading about the Supervisor and Application modules.
Enforcing boundary types with validating border sentinels
While there exist excellent formal definitions of data quality problems [4], for our purposes we will simply say that a system should not take in data which it doesn’t understand. This allows us to reason about a few different things.
-
A system must have concrete definitions of what it can and cannot understand.
-
Systems should have the ability to refuse data which it does not, or cannot, understand.
-
In an ideal situation, systems should have a mechanism for alerting sending systems as to their refusals (feedback).
We propose a special kind of actor topology, which can sit at a system’s receiving end and act as a filtering validator. For each incoming message, the sentinel validates it. Valid messages continue on to the system’s internal receiver, while invalid messages are "kicked out" into a quarantine zone. The quarantine zone is, itself, another actor. This quarantine actor notifies the sender about the kick out but leaves it up to the sender to decide what to do from there. It is out of scope for the sentinel to fix bad data. It is only responsible for preventing its ingress into the system, and (optionally) provide feedback.
By positioning the validator as the party responsible for determining the validity of incoming data, it becomes a decision point. This means that the actors comprising the system at large remain free of the knowledge required to validate information.
This is strongly reminiscent of the Programming by Contract work from the 80’s and 90’s [2]. The contract research has been applied to concurrent systems as well [3], including an implementation of contracts for actor-based systems in Erlang [1].
The Erlang contract implementation from the article specifies seven types of contract:
-
Precondition contracts
-
Postcondition contracts
-
Decreasing-argument contracts (useful for recursive functions)
-
Execution-time contracts (useful for timeouts)
-
Purity contracts (useful for ensuring a lack of computational side effects)
-
Invariant contracts
-
Type contracts
While all of these contract types could be useful for implementing the border sentinel topology, they deal solely with the implementation of functions. That is to say that they offer no advice regarding the arrangement of actors themselves.
| Consider reading the paper, it’s both provocative and well-written. |
The authors cleverly chose a macro-based approach to allow for excluding contracts in release builds. The intent of this is to prevent performance degredation in production systems. This presents an interesting tension between the desire to have well-tested code in advance of release, but also to be able to deal with invalidity prevent intake of other bad data in production.
This is where we feel that the border sentinel topology really shines as it compartmentalizes validation logic in one place. It can help to minimize program overhead by containing validation logic at a system’s boundaries. This seems preferable to allowing it to invade each and every actor in the system.
| Practically speaking you have two choices. Either your validation logic is in one place, or it’s spread all over your application. |
As a cautionary note, we do acknowledge the need to balance validation and trust. We therefore suggest that the border sentinel topology be used at boundaries of a system’s logical components and not only at the system’s physical edge.
Example code
| We have resisted the temptation to author this example in Erlang. You have been spared. You are welcome. |
Bibliography
[1] L.-Å. Fredlund, J. Mariño, S. Pérez, and S. Tamarit, “Runtime verification in Erlang by using contracts,” in International Workshop on Functional and Constraint Logic Programming, 2018, pp. 56–73.
[2] J. C. McKim, “Programming by contract: Designing for correctness,” JOOP, vol. 9, no. 2, pp. 70–74, 1996.