EMQX v5: Under Magnifier
Whether you have been using EMQX for many years, are just hearing about it for the first time, or are required to use it by your managers, it is useful to learn about the new architecture of EMQX (also known as EMQX v5). Regardless of whether you currently use EMQX, the concepts in this new version can provide valuable insights for designing your distributed applications. In this story, I will explore EMQX v5 to examine its new architecture. By comparing it to EMQX v4, we will highlight its benefits.
I will not delve deeply into its underlying logic or related topics such as the MQTT protocol, distributed systems, RPCs, etc. Instead, I will provide an overview of these topics along with some technical details about EMQX. It doesn’t matter whether you should use it or not; what matters is understanding its purpose. I am not focused on comparing it to other message brokers like NATS or Kafka. The main concern here is the logic behind EMQX. If you are looking for an endorsement of MQTT and a recommendation to use it, you can stop reading this article. However, if you are willing to set aside any biases, consider EMQX as the only message broker you know, and observe how it has been implemented. I will not make any judgments or claim whether EMQX is good or bad. As the title indicates, I will simply review their documents, articles, and statements in detail. Think of this as an insightful exploration into the world of EMQX.
Introduction
During my time as a cloud engineer, I worked with EMQX as a centeral message queue. Specifically, my task was to investigate the split-brain issue associated with EMQX and ultimately find a solution to prevent this problem. At that time, we had recently upgraded our EMQX cluster from v4 to v5, and none of us, including myself, fully understood how it operated. After some time, we began exploring their documents and source code. Our goal was to gather as much information as possible to understand the semantics of our cluster.
EMQX
First things first, let’s talk about EMQX so we could have a basic viewpoint of it. EMQX is a message broker that operates using the MQTT protocol. As they said, “EMQX is a large-scale distributed MQTT messaging platform that offers unlimited connections, seamless integration, and anywhere deployment”.
EMQX is a message broker that can function both as a single-node component and a distributed cluster. Like other applications, you may want to scale it when faced with increased load. As with any distributed system, it offers a mechanism to replicate data across multiple nodes. Additionally, it aims to ensure fault tolerance, maintain high availability, reduce latency, and facilitate easy maintenance through automated methods for fault detection and self-healing. However, the semantics of these features are not very clear, and the documentation could be improved.
Their approach is certainly one of the best available. Before delving into the details of EMQX v5, let’s first examine v4. The reason we focus on the newer version is that the evolution from v4 to v5 represents a significant transformation; they have fundamentally changed many aspects of the system.
EMQX v4
At its core, EMQX functions like a typical message broker, without any particularly unique features. Developed using Erlang, it utilizes an integrated distributed database called Mnesia, which serves to replicate data between cluster nodes. It serves as an embedded storage and replication layer for OTP applications requiring low read latency and high availability.
The nodes data consists of two important components.
- Routing tables, which is a metadata to get data from client and send data between nodes to give back to desired client
- Topic tress, which were used to partition data in order to be able to manage them
// An example of a EMQX topic tree
chat/room/1
chat/*
sensor/10/temperature
sensor/+/temperature
sensor/#
// It also includes system topics
$SYS/brokers
$SYS/brokers/${node}/uptime
The publish and subscribe mechanism relies on this data. When a new subscriber joins, it dynamically builds topic trees and updates the routing table. Subsequently, a node must replicate this data. Since a client’s request is independent and does not require complex logic or validation, nodes can simply share their routing and topic trees.
Overall, the system is straightforward. However, what could go wrong? One significant issue is that the cluster topology is fully meshed, meaning that all N nodes are interconnected. This design leads to numerous RPC calls when attempting to replicate data. Furthermore, if a split-brain scenario occurs, the situation can become quite problematic. As each node performs read and write operations independently, each section can inadvertently create its own cluster without anyone noticing.
Let’s consider a cluster with N nodes. For each transaction (we’ll use a subscribe request as an example, since it is more complex), we can calculate the total number of RPC calls required to commit this transaction:
- The client sends a request to any node (RPC_Count = 0).
- The node stores the user request (RPC_Count = 0).
- The node calls the other nodes to update their routing tables (RPC_Count = N — 1).
- For commitment, the nodes share their data with each other (RPC_Count = N * (N — 1)).
In total, the RPC calls for this transaction would be 0 + 0 +(N−1) + N∗(N−1) = (N-1)*(N+1) = N² — 1.
Therefore, the overall number of RPC calls for a single subscribe request is on the order of O(N²). This is not ideal, as increasing the number of nodes results in an exponential growth in RPC calls.
Challenges
This architecture leads to high latency, increased network traffic, no fault tolerance, a higher likelihood of split-brain scenarios, and concerns regarding data safety. Additionally, we must consider the publish transactions that also need to be routed between nodes. Given these factors, it does not make sense to use more than five nodes in such a configuration.
EMQX v5
In v5, EMQX introduced an enhanced version of Mnesia called Mria. As they said “Mria is an extension for Mnesia database that adds eventual consistency to the cluster”. Moreover in their ACM article they stated “We analyze the limitations of Mnesia’s replication protocol scalability in large clusters under high load. To address these limitations, we developed Mria, an extension to the Mnesia database that provides eventual consistency within a cluster and achieves better horizontal scalability”.
This built-in database includes a significant new feature: RLOG. The RLOG feature allows nodes to operate in one of two modes: CORE or REPLICANT. In this setup, only CORE nodes are responsible for managing routing tables, topic trees, and write operations. All other operations, such as read and publish routing, are handled by REPLICANT nodes. As they mentioned “data is read locally on all nodes, but only a few nodes actively participate in the transaction. This allows to improve write throughput of the cluster without sacrificing read latency, at the cost of strong consistency guarantees”.
For performance reasons, Mnesia tables are separated into disjunctive subsets called RLOG shards. Transactions for each shard are replicated independently. Currently transaction can only modify tables in one shard. Usually it is a good idea to group all tables that belong to a particular OTP application in one shard.
Since CORE nodes are responsible for storing and managing data, they are stateful. In contrast, REPLICANT nodes are stateless, which means they do not require a persistent volume.
In the new architecture, each REPLICANT is connected to a CORE node. The CORE shares its tables in batches with its REPLICANTs. To reduce the number of RPC calls between CORE and REPLICANTs, each REPLICANT maintains its own local database, which is eventually synchronized with the CORE nodes. As the said “dirty reads and read-only transactions run locally on the replicant. The semantics of the read operations are the following: they operate on a consistent, but potentially outdated snapshot of the data”.
CORE nodes function similarly to the normal nodes in v4, forming a fully meshed topology and sharing data through RPC calls (which they use gen_rpc for it). However, they do not handle routing or read operations, which reduces the overall workload on CORE nodes. As a result, the overall latency is decreased.
New Features
EMQX v5 incorporates a new module called Ekka, which provides features such as auto-healing, auto-recovery, heart beat checking, centralised monitoring/managing, and cluster rebalancing.
With this new architecture, you can increase the number of REPLICANTs as needed. Since the number of required CORE nodes is relatively small, the likelihood of split-brain scenarios is reduced. Furthermore, because the topology is fully meshed, you can utilize a majority factor to detect network partitions and remove split nodes.
By keeping the critical logic simple and concentrated in a few nodes, EMQX v5 provides a fault-tolerant cluster. Operations can be performed in parallel, and monitoring can be easily managed by a central unit (which consists of all CORE nodes).
Additionally, REPLICANTs can operate on a separate network, creating an isolated partition for CORE nodes. This allows you to establish a connection link between COREs and REPLICANTs, effectively preventing the influence of faulty nodes on your CORE nodes.
The main idea is to divide our operations into two categories: Idempotent and Non-idempotent. Read operations fall under the idempotent category, meaning they are safe and do not alter the system’s state. In contrast, write operations are non-idempotent, as they change the system’s state. The important point to consider is that determining whether an operation is truly idempotent is not always straightforward. For instance, even when handling a read operation, data consistency is crucial. You must ensure that the data is valid and up-to-date every time.
In the case of EMQX, they implement eventual consistency. Additionally, they reduce the time required for idempotent operations by moving them to REPLICANT nodes. This allows CORE nodes to focus more on maintaining consistency and performance.
Conclusion
The concept of separation within a distributed system architecture is highly advantageous for several reasons. By managing COREs and REPLICANTs independently, we not only enhance the overall maintenance of the system but also improve its semantics, leading to clearer definitions of responsibilities and behaviors within each component. This independent management allows for focused optimizations and adjustments without impacting the entire system, thereby fostering a more modular and manageable architecture.
Additionally, this separation significantly increases fault tolerance. Since the majority of nodes are stateless and can be easily replaced, the system can continue to function effectively even in the face of individual node failures. This design mitigates the risks associated with hardware malfunctions or temporary outages, ensuring that service availability remains high. Furthermore, the architecture allows for the integration of additional modules designed to automate various tasks related to system management. These modules can handle critical functions such as rebalancing workloads, detecting failures, managing nodes, and more, thereby streamlining operations and reducing the need for manual interventions.
However, while the separation of COREs and REPLICANTs offers numerous benefits, there are several critical points that require careful attention. Firstly, our approach introduces a potential single point of failure within the system. Although there are multiple CORE nodes intended to distribute the workload, they could still become a bottleneck. In situations where a particular CORE node becomes overloaded or fails, it can adversely affect the entire system, leading to reduced performance or complete system outages. As said, the architecture remains vulnerable to split-brain scenarios, where multiple parts of the system may operate independently without a unified understanding of the overall state. This situation necessitates the implementation of robust mechanisms to detect and remove faulty nodes from the system promptly. Failure to address this can lead to data inconsistency and degraded system performance, undermining the reliability that the architecture aims to provide.
Finally, it is important to note that REPLICANT nodes are not fully synchronized with the CORE nodes. Their data is updated in batches sent by the CORE nodes, resulting in eventual synchronization/consistency rather than immediate consistency. This inherent delay means that data consistency across REPLICANT nodes cannot always be guaranteed at any given moment, potentially leading to discrepancies that could impact the user experience or the accuracy of operations dependent on real-time data.
In summary, while the separation of COREs and REPLICANTs in our architecture offers significant advantages in terms of maintenance, fault tolerance, and modularity, it also presents challenges that must be proactively managed to ensure system reliability and data consistency.
I hope this was useful to you.
Thanks for your time.
Amir Zadeh.
References
For more information, you can use following references.