Engineering a Reliable Data Backfill Solution
Backfill is the process of updating or completing missing historical data.
Real-life production systems are complex and made of various moving parts that may cause issues. When a faulty deployment occurs or when your cloud provider has a serious disaster, your data processing pipeline could eventually process the wrong data, or even worse, you may lose data. Thus, we need a good solution to put things back in place when such scenarios happen. This makes data backfills an essential aspect of data engineering teams because they enable organizations to safely maintain the integrity of their data.
At Contentsquare, we currently process over 600MB/s of incoming data. At this scale, even though we strive to build reliable systems, incidents are bound to happen and looking back we realized that we were often responding with custom recovery solutions. Most of the time they required intensive efforts and were costly in terms of human resources and loss of focus. Therefore, we decided to build an industrialized backfill process.
In this article, we will explore our design decisions, the challenges that we faced, and some useful tips for implementing such a system.
What did we aim for?
The goal of our backfill process is to correct missing or malformed data in our ClickHouse clusters and in our backups.
We aimed for a mutualized, robust, single-point backfill procedure and tooling to address the various backfill use cases that we could encounter.
We wanted to ease the support processes, leave limited room for error, have trustworthy validation mechanisms, and allow monitoring.
To explain our design choices, it is important to have a comprehensive understanding of the data architecture and requirements of the data pipeline. So let’s dive in the details!
We have multiple data pipelines to support different products, each one with different requirements of scale and data reliability. In this article we will focus on the most critical one: our Core Pipeline.
In this pipeline, we have stateless (in-house microservices) and stateful components (Apache Flink). We process the incoming data in real time by performing operations such as transformation, reduce, and aggregation on the streaming data.
A backfill is needed mainly in two scenarios: when data is missing or malformed.
The use case also depends on the scope of the data involved. Indeed, our main processing pipeline ingests data from end-user browsers but also from what we call “external” sources managed by independent Contentsquare Product Units. These “external“ inputs join the main pipeline to enrich the sessions and page views, sometimes using data from third party integrations.
In this context, an “external” backfill use case means that we also need to reprocess the backfilled data received from these sources. However, if the problem only occurred in the main pipeline, we categorize it as an “internal” use case. For both of these categories, we have different action plans.
Thus, formulating a reliable backfill process raises many conceptual and technical challenges in terms of design and implementation.
Here are some of the conceptual challenges we encountered based on our design and business requirements:
Cover many possibilities of failure with a simple tool and a well-defined process.
Keep the solution (tooling & process) simple, powerful, and aligned with the evolving production pipeline.
Remain easy to understand, deploy, execute, and validate.
Give a quick response (including reducing manual operations) to minimize data downtime.
Provide transparent process and reporting.
On average, our core pipeline receives 35,000 payloads per second with a 3KB average payload size that sums to 100MB/s of data per region. This translates roughly to 4,000 sessions per second:
We need a robust design encompassing necessary tools and a readily deployable infrastructure capable of addressing all potential backfill scenarios.
To reach this objective, our design rests upon three foundational elements: backups, tooling and infrastructure.
We will dig into each of those separately.
In our backup strategy, data is stored at different stages of its lifecycle in the pipeline and with different retentions based on the format, importance, and legal regulations. It is strategic to choose where you want to place your backup services considering the costs, easy access and granularity.
For efficiency purposes, we decided to back up the data formats we call
Aggregated to have a relevant but not redundant granularity for our backups.
Backing up the data
Actively backing up large volumes of data in real time is a significant challenge.
To achieve that it is necessary to have a very reliable and robust backup tool.
We use our in-house tooling to backup data in real-time from Kafka topics to long-term object storage. The backups are partitioned by format, date, hour, and client id and they consist of files compressed with Zstandard.
Reading the backups
Let’s talk about two main challenges that we faced when reading the backups.
Challenge 1: Reading backups with a correct range without altering the already correctly processed production data during the backfill.
It is important to consider the risk of altering the production data because of the presence of stateful components and because it is streaming data. Our sessions could contain links with the data that is outside of the range of the backfill.
We address the solution to this problem in depth later in this article (“Reading stored streaming data”).
Challenge 2: Reading the backup data consistently and rapidly.
We designed a dedicated tool to read the data from backups in a reliable yet speedy way. This brings us to the next section, Tooling.
We enhanced our existing in-house tooling by enabling it to read backup data and export it in a chosen block.
Our read/export tool operates 5x faster than real-time, meaning that it is able to insert 10 hours of production data into the Kafka backfill in only 2 hours.
As we are dealing with large volumes of unpredictable data, we need a buffer to have control over the backfill process and store the in-flight data read from backups. It should be a high throughput, low latency, and scalable. Because of our extensive experience with Kafka in production, we decided to also use it for backfill so that our tool can publish backups messages to this new Kafka deployment.
Kafka for Backfill
Our main design decision here was to consolidate all topics from various Kafka production clusters into a single Kafka backfill cluster. This approach streamlines input and output points, leading to faster operations and better process monitoring.
We make sure the Kafka backfill topic retention is large enough to hold the data used to backfill in order not to repeat the reading process. The metrics of this Kafka such as topic lag, messages in vs. consumed, and offset commit rate are collected by KMinion and exported in Prometheus for monitoring.
Now, we have the tooling to read backups and a hot buffer to keep this data ready to be processed. The next question is: where to process this data in place of the big question mark?
To reconstruct the full data from backups, we had two options: we could either use our production pipeline for backfill or seek an alternative solution.
If we were to conduct backfill operations on the production pipeline, there would be significant ramifications such as overburdening the pipeline, creating latency, and modifying data due to potential errors.
On the other hand, using a sandbox environment provides us with many advantages such as having dry runs before the real production run, having separate observability and monitoring only for the backfill process, and controlling the speed of the process with custom scaling of the sandbox components.
So we decided to have an exact replica of the production pipeline in an isolated sandbox.
As seen in this schema, the backfill line is composed of four stages: backups, tooling, dedicated Kafka cluster, and the sandbox environment which is the replica of the production pipeline.
Technical Challenges and Solutions
Now, let’s talk about some major challenges associated with the reprocessing of streaming data.
We use Flink to reduce and aggregate our session data in real-time using stateful stream processing. We call the duration that contains all the user’s activity until 30 minutes of inactivity a session. Our session is composed of multiple separate payloads that may arrive in Flink unordered and late. Therefore, we keep each session in a state separately and we update these sessions if needed. We allow a maximum message lateness of 7 days, during that period we can enrich the state with new, updates or late data that belong to the session.
As mentioned above, the first proposition was only to use the isolated backfill sandbox but we had a challenge of having two states of Flink at the same time, one in the production pipeline and the other in the isolated environment that could contradict due to having more than one ground truth state.
Taking into account the tradeoffs and limitations, we arrived at a hybrid solution.
Our decision was to isolate backfills for the period in which the production Flink state expired and to utilize the production pipeline only for urgent backfills while we still have the state available.
As we are constantly improving our processes, we are exploring different options to resolve the potential issues of state collisions and ultimately use only the backfill line.
Flink Processing time vs. Event time
We run our Flink in Event time in production whereas we run our dedicated backfill Flink in Processing time. Since we are operating on a bounded stream with backfill, as soon as we stop receiving new messages, there are left no messages to move the Flink’s clock, close the windows and fire the sessions. See Timely Stream Processing for more details.
Therefore, somehow we need to advance the internal clock of Flink to make its watermarks move. That is why we are running Flink in Processing time when we are in backfill mode so that the clock is based on the system time of the machine and advances independently from the arrival of events.
We could still use Event time processing but then we would need to send some dummy payloads for every session to advance the clock artificially. This is a tedious process and keeping track of those messages would be even more complicated.
Reading stored streaming data (Half-cut Sessions Problem)
By design, we back up the data using hourly partitions.
Big amount of real-time streamed data is very difficult to read from backups as it has links with the past hours. Plus, with the hour granularity, we can never be surgically specific when we are reading data from the backups. We do not want to be too specific in reading but this creates problems for us.
If we read the disaster’s exact interval from backups, we are cutting some sessions in the middle because most of the sessions started either earlier than the disaster start time or terminated after the disaster end time. This is a real problem because most of the sessions would be impacted and not recovered correctly.
After our simulation comparisons with the production data (ground truth), we quantified that 90% of the half-cut sessions fall into
-3 hours and
+3 hours intervals from the disaster’s start and end timestamps respectively. We can adjust this parameter depending on the business requirements.
After quantifying the spread range, we found the solution to this problem. We read the backup data with a larger interval than the actual disaster interval then we apply a filter based on the
sessionStartAt timestamp to filter out the sessions that have no links with the disaster interval.
With this solution, we capture the complete content of the half-cut sessions.
Let’s get back to the sandbox environment and discuss more on the architecture and infrastructure.
As mentioned in the previous section, since we have strategic locations of the backups, when we need to deploy the sandbox environment we can select which parts of the pipeline to be deployed according to our needs.
According to the backfill need we can create this sandbox with only necessary components inside and effectively address all data recovery use cases.
Now, let’s talk about how are we deploying the chosen architecture in the sandbox environment alongside backfill Kafka in the most efficient way.
We have different options available:
Option A: deploy every component of the chosen architecture as Docker images in Docker hosted on a single EC2/VM machine.
Less flexibility in scaling
Difficult to maintain a complex docker-compose file
Option B: deploy a dedicated Kubernetes cluster for backfill on a single EC2/VM.
Potential access issues to the production environment
Less flexibility in scaling
Higher operational overhead
Option C: deploying the backfill sandbox as a namespace in the production Kubernetes cluster on EKS/AKS.
Good scalability (thanks to EKS/AKS)
Easy to operate (create/destroy isolated namespace)
Access issues resolved
Leveraging existing infrastructure metric collection and infra monitoring
Considering the advantages of Option C, we decided to deploy the backfill sandbox as a namespace in the existing production cluster.
Here is the combined view of the infrastructure and architecture:
It is important to mention that the components should have the same configurations to have the same behavior as the production environment. It is actually a major challenge and we have plans to automatically fetch and update the configs to stay always up-to-date.
The idea is to validate if our backfill process is correctly recovering the production data.
We do this validation as a first confidence check in the sandbox by running the process in a dry-run mode. Then we validate the data in ClickHouse Backfill. If the data is getting correctly restored, then we will have a go for the production run.
Here are some of the metrics we look closely at:
The number of messages inserted.
The number of sessions recovered in ClickHouse.
The cost of the backfill (from backups to production ClickHouse).
Dry runs allow us to avoid unexpected errors and to validate recovery accuracy in a sandbox ClickHouse and to make sure we will not pollute the out-of-backfill scope data in our production ClickHouse.
We leverage ClickHouse’s
ReplicatedCollapsingMergeTree engine for update insertions to replace malformed data with corrected backfill data (see the algorithm).
Backfill Process In a Nutshell
To wrap up, here is our full backfill procedure:
- Evaluate the disaster
- Reflect on the options and choose which architecture to deploy
- Read backups with the internal tool and insert messages in Kafka Backfill
- Dry run & validating the results
- Production run
Data backfills are a critical component of any successful data engineering pipeline, and an effective backfill process can make all the difference in ensuring the integrity and reliability of your data pipeline.
Here are seven tips to keep in mind to ensure a robust backfill process:
Have a comprehensive understanding of the data architecture and pipeline requirements.
Establish clear backfill goals and priorities.
Design and implement the process based on your backup strategy and infrastructure options.
Develop a robust testing and validation process.
Monitor the pipeline continuously and have a plan in place for handling any issues that may arise.
Document the process and write a compact and powerful runbook.
Have clear communication with all stakeholders during and after the process.
By following these tips, we successfully replaced our custom and costly recoveries with a robust industrialized solution. We saved time, money and we increased the quality of our recovery, our response speed, and our support standards.
We hope this article will inspire you if you need to implement a backfill process that is efficient, accurate, and reliable. Finally, remember that creating a successful backfill process is an ongoing effort and it requires continuous improvements and support to meet the evolving needs of any large organization.
Thanks to Jonathan Meyer, Leandro Ferrari Thomaz, Jean-Thierry Bonhomme, Emanuele Pirro, Pierre Lesesvre, François Violette, Robin Cassan and Sylvain Zimmer for reviewing drafts of this article.