This website uses cookies | More info

The whys and hows of Change Data Capture

Blog postby Dan Nastasă
8 min read

One of the challenges we faced when building an asynchronous microservice architecture was committing one or more database rows and, in a transactional manner, also sending an event to our internal event bus. Obviously, not achieving atomicity, in this case, can lead to unwanted results.

Let me give you a quick introduction: here at Mambu, we have defined Aggregates using Domain-Driven Design (DDD) practices. In order to build and manage their internal state, we are using the Event Sourcing pattern. When persisting each Domain Event, we want to also publish it to our internal message broker, such that all other microservices interested in that specific Domain Event can pick it up and process it accordingly. Our initial implementation looked a lot like this:

cdc

The problem

As you can see, there is an obvious issue with this solution: what happens if we persist and publish some events, but in the middle of the for loop something happens and the transaction is rolled back? Database communication goes down, our app is unexpectedly shut down, basically, anything can happen. Since the transaction fails, no event will be stored in the database. However, some of the published events are already out there and there is nothing much we can do about it.

The challenge

Of course, the Saga pattern might help us in triggering some compensatory actions in order to reconcile the current state, however that pattern is mostly aimed towards business transactions, while what we have here is a mostly technical error. But just as Murphy’s law states, given that anything that can go wrong will eventually go wrong, we needed a way to improve our design.
Ideally, we would need to have the confirmation that all database entries have been successfully made durable within that transaction, before actually publishing them to our message broker. And also, when attempting to publish them, we need to also make sure that they are actually successfully published.

The solution

Let’s take things one at a time. How can we start publishing internal messages only when the entire transaction has been successfully committed?
Most of the databases (including MySQL, which we are currently using) already contain a component that proves really helpful in our case: the Transaction Log is a simple list which contains all operations that have been committed (made durable) to that database since the beginning of time. This log is used internally by the database in case of failure: if it has reached an inconsistent state due, the internal database management system uses the transaction log to reconcile the current state with the latest consistent state. However, there are some ways it is useful for us as well.

The general structure of the transaction log is an append-only list, containing, besides other metadata, the type of operation: INSERT, UPDATE, DELETE along with the actual values being persisted {“amount”: 100, “creditor”: “John Doe”, ...}, in a specific format.
Let's get back to our example. Our next idea went like this: if we could read and interpret the transaction log of the database and then find a way to guarantee the persisted events get published to the message broker, we would achieve our goal. This way, the saveEvents method would only be responsible for persisting all events (in a transactional manner), and then a separate process would wait for the transaction to be executed, and only once it has been properly committed, publish the necessary events.

This pattern is called Change Data Capture (CDC), and its most straight-forward definition is very similar to what we have described previously: capture changes made to a database, and propagate them to any third party system: be it another database, another application, a message broker, or anything else. The CDC pattern is what we will use so we can address the second challenge: making sure events get successfully published.

We needed an implementation of this pattern, and we have picked an Open Source project called Maxwell. Maxwell’s Daemon is a lightweight utility which reads a MySQL Transaction log and publishes events to a number of different streaming platforms. This tool has extensive documentation regarding how it can be configured and run, so we will not go into much details from this point of view. We will focus more on how we have been actually using it.

Maxwell has pre-built integrations with Kafka, RabbitMQ or other popular streaming platforms, but it is easily extensible with connectors to any kind of event bus. And this is exactly what we did. We have created a custom EventPublisher which would pick up rows inserted in the events table and publish them to our internal message broker. So from two problems, one has been solved: events will be published only when they have been made durable. But what about the second part, guaranteeing each event will be successfully published?

A single Maxwell instance is running at any given time, and it will sequentially process each entry from the Transaction Log. If it is unable to publish the current event to the message broker, it will keep retrying until successful, without moving onto the next log entry. Maxwell also uses an internal database schema to hold information regarding the index of the latest processed log entry, such that in case it is shut down or restarted, on initialisation, it will pick up from where it left off. This way, we are covered from two angles: if Maxwell fails, when it is back up it will continue processing exactly where it left off so the system will remain consistent. If the message broker fails, Maxwell will not proceed, again, making sure the system remains consistent.
Using this utility, our current implementation looks like this:

cdc2

Here, our send method receives a RowMap which encapsulates the log entry, and based on it will compute the event type (toDestination(rowMap)) to which it will send the event payload (json(asBusinessEvent(rowMap))) to the message broker (exposed through a REST API). The httpClient has been configured to retry any kind of transient error: SocketException, NoHttpResponseException, SocketTimeoutException, etc. Note that there might be instances where even though the event publishing failed, we would like to mark that transaction log entry as processed and move forward. This case is covered by the if block, which checks if the http response represents a non-retryable failure.
These being said, our current solution looks like this:

cdc3

Making it Continuous Deployment-ready

Once this was implemented and working as expected, the next question we have asked ourselves is how can we make it work in a Continuous Deployment environment. Maxwell itself has some prerequisites which need to be executed before starting to actually process the database transaction log. For example, some of the required operations are:

  1. Create a Maxwell-specific schema where Maxwell holds information regarding currently processed transaction log positions.
  2. If this is the first time Maxwell runs, bootstrap all historical entries, in order to bring the system to the current state, and only then start processing new entries.

While Maxwell covers the first operation by default, the second one is only partially supported: we can only input a single database and table to be bootstrapped. Given our current multi-tenant architecture, this is not really applicable, so we have further extended this tool in order to support providing multiple schemas and/or tables.

The actual bootstrapping logic is fairly simple: inside the internal Maxwell schema, store a row for each table which needs to be bootstrapped together with the bootstrap status (start timestamp, end timestamp if finished, success/failure, etc.). Then, when the app starts, before processing any new event, it will notice that there are tables to be bootstrapped, so it will start reading the historical entries first.

Given that some of the captured databases and tables are fairly large for existing customers, running these prerequisite steps might take a fairly large amount of time during which the app would appear unhealthy. We currently run our infrastructure on Kubernetes, so we preferred to split these steps into separate jobs. Thus, we have defined two init containers which will run the following operations:

  1. Create internal Maxwell schema if it does not already exist and capture all necessary database/tables information.
  2. Initiate bootstrap for the configured tables and databases, if a bootstrap was not already executed.

Only once these two jobs are successfully completed, the main process will start running the actual transaction log processing. Doing it like this on each deployment of a new version, we leverage some of the built-in management logic of Kubernetes to make sure our CDC application only starts running when the environment is properly set up.
Our init container configuration looks like so:

cdc4

Conclusion

(Micro)Service asynchronous communication has its benefits, but without a doubt comes together with some challenges as well. One of these challenges is the simulation of transactional behaviour over multiple services while maintaining system reactivity and general decoupling.
And this is how we do it. Our current solution and the way we use the CDC pattern in order to insert new database rows and publish internal events in a transactional manner. Alongside it, also a Continuous Deployment approach for such use-case.

Dan Nastasă

At Mambu Dan Nastasă is a Software Engineer within the Payments Team. During the past two years, Dan has been focusing on microservices, Domain Driven Design, asynchronous communication patterns, Kubernetes and many others. For over a year his team has been running Mambu's services in production while having full vertical ownership: everything related to development, testing, delivery, maintenance and support.

Dan Nastasă