Book Review: Designing Distributed Systems

When developing distributed systems with containers, common problems surface. Common problems in programming are tackled using patterns. Brendan Burns introduces several patterns to use when designing and implementing distributed systems. Similar to the thoughts behind using patterns for software development. The author is one of the co-founders of the Kubernetes projects, so he is an expert in distributed systems.

The book is only 149 pages long and is available for free from Microsoft. It features both generic explanations of the different patterns and hands-on examples on how to implement them using Kubernetes.

The patterns are split into single-node, serving, and batch patterns. I find the single-node patterns most interesting because I think they are more broadly applicable.

Single-Node Patterns

The single-node patterns are used to split the capabilities into multiple containers running on the same node. Usually, it is implemented by running multiple containers in the same pod in kubernetes. But it can also be done with Docker Swarm using podlike.

Sidecar Pattern

This pattern allows us to split responsibility between two containers allowing different teams to have the responsibility for them. It also makes sure that the existing container does not increase in complexity.

In some cases, we need to extend a container with a new capability. It could be adding HTTPS support. For whatever reason, we might not want to build this capability into the existing container. With a sidecar, we can add the ability to a different container but deploy them together.

With Kubernetes, the two containers run inside the same pod. It makes them share resources, which allows the sidecar to connect to the other container using the loopback network interface, which means that we run no risk of exposing unencrypted HTTP traffic to the network.

Another use is to make configuration changes. If the configuration is baked into the container, it will need to be rebuild for configuration changes, which are not always a good idea.

If the container reads configuration from a file, we can make the sidecar manipulate the configuration file and signal the application to reload. It will also make it possible to create an API surface in the sidecar for changing configuration — all without changing the existing container. And the API surface can be reused across multiple services.

Since the containers share the same process-id, we can use the sidecar to create almost any augmentations we want to the existing container.

Ambassador Pattern

When applying the ambassador pattern, we introduce a container that intercepts a request from the application to the outside world. It can be used for many scenarios.

If we need to shard a service, we can introduce an ambassador that contain logic to route requests to the correct shard. That frees the application from knowing how to handle sharding.

Another use is for service broker. If we need services depending on the environment, the application is deployed to, and an ambassador can be used to resolve the service.

The example from the book is that the ambassador can be created to broker a MySQL instance. The application will always connect to the MySQL server on localhost, which is the ambassador. It is now the ambassadors’ responsibility to find and forward the requests to the correct MySQL server.

Such a set up also allows us to run experiments. Let’s say we want to load test a new MySQL version with production load – but without affecting production. Then we can configure the ambassador to route requests to both the real production MySQL and the new instance, and ignore the responses from the new version.

It also works to gradually phase in a new service, by routing more and more requests progressively to the new service. An example of this can easily be deployed using Nginx with a weight on each service it routes requests to.

As we usually end up with many applications with the same communication needs, most of the ambassador containers can easily be reused.

Adapter Pattern

Containers are built in many different languages and use many different technologies. But when making a distributed system, we want some parts of the system to be homogeneous. One example is logging if every container logs differently, it is increasingly difficult to monitor all services.

With the adapter pattern, we can make an adapter to adapt the interface of a container to comply with what we need. The example from the chapter is a Redis server. Redis has a SLOWLOG command to show which queries were slow in a given time interval. But since it only indicates live commands, we need an operator to be on the service to debug it. Hardly an option under normal conditions.

Instead, we can apply the adapter pattern using fluentd, it allows us to run the SLOWLOG command periodically and output the reply to our logging system of choice. It requires no changes to the existing container.

Health monitoring is another use of the adapter pattern. If we have sophisticated health checks, we can not always build them into the existing container. If we could, we would not always want to, because it gives a single container multiple responsibilities.

Instead, we can again use the adapter pattern to implement the health check. The example from the chapter is a rich health monitor for MySQL, built with Go. The adapter container runs alongside the application and periodically runs the health check. It also implements an HTTP interface that we can make uniform across all our services, making it easier to implement monitoring across them.

Serving Patterns

When building a microservice architecture, we need individual services to communicate and be resilient. As such an architecture grows, it becomes more and more difficult to debug, and failures will be more subtle.

To mitigate some of the problems, different patterns can be used. It makes sure that the architecture uses the same concepts everywhere, making the system easier to reason about.

Replicated Load-Balanced Services

Most orchestration providers for docker support this type by default. If the service being load balanced is stateless, it is almost a no-brainer to implement. The only thing to be aware of is that the service should expose a health check to make sure the load balancer only routes traffic to instances that are ready to receive them. In docker swarm, it can be implemented using the HEALTHCHECK directive in the Dockerfile. In kubernetes a service can be used to support it. But the concept is the same. When the health check fails, the orchestrator reboots the container.

If the service is not stateless, more care must be made to make sure requests from the same user hits the same instance on subsequent requests. It is often implemented using IP-based session tracking. But it is not always good because of NAT. So if possible, a cookie-based tracking should be used instead. It becomes increasingly difficult to handle when having an architecture with many layers.

Caching Layer

If requests to the backend service are too expensive, a cache might be able to give a better user experience.

The most simple version is to place a caching proxy in front of our HTTP service. All requests are intercepted, and only the first request to a service is served from the backend, the rest is served from memory in the cache.

The cache can be deployed using the sidecar pattern. It does have some drawbacks to deploy the cache alongside the service. It will cause the cache to have as many instances, and there are instances of the service. Furthermore, each cache will contain the same pages because they are independent. It does make the cache very resilient to failure, but it does not do any good for the cache hit rate.

Instead, we can pull the cache from the sidecar and deploy it as an independent service. It will make the cache use memory more efficient and give a higher cache hit rate.

Extended cache layer

An added benefit for having a cache layer is that the services often support additional features like rate limiting, DOS defending, SSL termination, and more.

Sharded Services

The purpose of a sharded service is to route request in a specific way. Instead of allowing any request to hit any of the instances of a service, requests are routed to the same instance every time. Usually, sharding is used when a service can not fit on a single machine because of the size of the state being handled.

The book contains a deep dive into why and how sharded caches are implemented and which considerations to make. Right now, I have not worked with large, enough systems to need sharding. But when you need it, there is some good food for thoughts in the chapter.

Scatter/Gather

All the above patterns focus on improving scalability to allow us to serve more requests per second. And let us handle huge state sets. They do not affect the time it takes to process any single request. It is the focus of the scatter/gather pattern.

The pattern is useful if we can split the request into many smaller parts, distribute the parts to individual nodes(scatter) and aggregate the sub-results into the complete response(gather). It allows all the individual smaller requests to be processed in parallel.

The canonical use case for this type is distributed document search in different forms. Think Google search. It is often using root-sharding. The root node that receives the request can easily split the request into smaller requests and distribute them and later aggregate the responses.

Functions and Event-Driven Processing

Much of the request volume we work with are web requests initiated by a user browsing a website or using an app. A different type of requests is event-driven requests where we only need a system to come alive for a short period of time to handle the request. It could be a Git push webhook or “new user-created” webhook or similar.

To support this style of computing function-as-a-service(FaaS) as been invented also called serverless. It will usually be used as part of a complete architecture, not standalone.

The benefits of FaaS is time to market. There is no need to deploy an artifact. The code is directly deployed into the FaaS. The function will be autoscaled. It will be restarted automatically on failure and so on.

The challenges are that each function becomes independent, and the only communication is across the network. There is no option for local memory, and the ecosystem is not mature, making debugging difficult. Since FaaS is an event processing model, background processing is not really possible. It is a bad fit for long-running computations.

There exist a number of patterns for this process model aswell, Decorator, and Event Pipelines. They are covered in the chapter.

Ownership Election

In a distributed system, in many cases, we need to know who owns a given task. Ownership is essential to avoid multiple actors processing the task. When having a single server, the ownership is straightforward, and we can use standard locking mechanisms to make sure all is well.

With a distributed system, we need a way to establish ownership, called a Master Election Protocol.

First, avoid it if you can, it creates a much more complex system. One way is only to run a single master. It simplifies the architecture and if the master crashes a new master is spun up, taking over the responsibility. It does create a little bit of downtime, but it makes the system much more straightforward.

To create a Master Election Protocol we need a consensus algorithm, multiple options exist in existing distributed key-value stores, for example, etcd, ZooKeeper, and consul. They all implement what we need to create distributed locks.

Implementing the Master Election Protocol is part of the application you build. Either as part of the application container or as a sidecar.

Batch Computational Patterns

Batch processes are background processes that need to run for a short period and not continuously. They usually process a large amount of data. One well-known pattern is MapReduce, popularized by Google.

Work Queue System

It is the simplest form of batch processing. All work is independent of other work so that it can be split into individual tasks and added to a queue. Any number of workers process work from the queue. If the queue grows, the number of workers can be scaled to process the work on time.

To build such a system in a generic way, we can deploy a Work Queue Manager Container that talks to the actual queue software using an ambassador. The ambassador frees us from depending directly on the queue implementation.

The workers are independent containers that query the Work Queue Manager. The proposal in the book is to use a file-based API. Where a worker is spun up with a file with the work mounted in the container. The purpose is to create an extra security layer so nothing from outside can create work for the workers.

To know how many workers to create, we must make sure we track both the arrival and completion rate for jobs. Using the two metrics, we can calculate how many workers are needed to keep up with incoming tasks.

Multi-Worker Pattern

If we need multiple jobs to be performed on the same task, we could build a special purpose worker to handle all tasks. But that reduces our option for composition.

Instead, we can build a unified container that delegates the work to specialized containers. It is an adaptation of the adapter pattern.

Event-Driven Batch Processing

If our tasks need to have multiple different steps done on them, we end up with a pipeline/workflow system. In this type of systems, we combine work queues to form different pipes, where the result of one worker is added to a new queue. It is called event-driven because the events are the completion of work from the previous queue.

To support building a workflow, different patterns are used, a copier, a filter, a splitter, a sharder, and a merger. The patterns allow us to build arbitrarily complex workflows. And each of the patterns can be created generically to support reuse.

Copier

If we need to perform different independent jobs on the same task, we can use a copier to copy the task to different queues. For example, if a video file needs to be encoded to different formats, a copier can be used to copy the task to the various queues to support it.

Filter

Filter the events and remove matching tasks from the queue. It can be implemented as an ambassador that wraps an existing queue.

Splitter

Instead of just removing events with a filter, a splitter can split events into multiple queues based on criteria.

Sharder

A more generic splitter divides a stream of events into an evenly distributed collection of work items. One reason to do it is to create a more resilient flow. If a component in the pipeline breaks the rest of the shards will still be able to process.

It can also be used to distribute the load evenly across multiple resources.

Merger

Merge multiple queues to a single queue.

Publish/Subscriber Infrastructure

How do we implement the event-driven batch processing system? One option is to use one of the existing pub/sub implementations like EventGrid, Simple Queue Service, or Kafka.

Coordinated Batch Processing

In some cases, we have work to process that requires coordination. If we have multiple queues with tasks that are processed, we might need to wait for different parts of work to be processed before we can aggregate and create the result. So we need to a way to synchronize.

Synchronization is done through a join or barrier synchronization. Since a join reduces the parallelism in the pipeline, it is worth to avoid it if possible.