Real-Time events reconciliation with ClickHouse

In many projects, small delays in data acquisition don’t impact your insights or your reporting. On the other hand, special events that have a huge impact on your revenue, like Black Friday for ecommerce, you need real-time insights — waiting an hour for metrics is no longer an option.

At Contentsquare, some of our customers had already expressed interest to get insights about errors in real time: they wanted additional information, like the context of that error, on which web page it happened, and what the error type was.

The hard truth is real-time systems are often complex to operate, especially with databases not designed to support massive updates and analytical access patterns.

And with SLA commitments of under 1 minute between event generation to event insertion, we knew that we would need to find a way to unify different events altogether.

Here comes real-time data aggregations and event reconciliation with ClickHouse.

A real-time Architecture

Real-time constraints

A real-time use case comes with various constraints. Firstly, we have to ensure an extremely low latency and high availability because any delay in the message delivery is promptly visible to end users.

It means that any modification to the components in the pipeline (such as a release or update) must be done as quickly as possible. The ideal scenario is a rolling update architecture, where your whole service is never down or interrupted when you release a new feature.

It also implies that external dependencies should be avoided as much as possible. We can’t afford any pipeline interruption because an external service suffers an outage.

Adding to an existing architecture

Our solution should also integrate smoothly within our existing ecosystem. We have to support existing features, like our segmentation engine. We don’t want to introduce another database (which could seem more appropriate for the need), because we don’t want to maintain several pieces of logic to address the same needs - building ClickHouse queries. That would not be a maintainable solution.

Finally, every architecture we build at Contentsquare should have scalability in its core. Our real-time architecture should function seamlessly with 20k events/s, 100k events/s, or more!

Stateful streaming has its limits

Our current architecture is based on streaming pipelines. However, our component handling the reconciliation between events (for instance associating Click event to a context like the associated web page, the user and the underlying errors) is stateful. It means we are handling the reconciliation in a dedicated system before sending it to our ClickHouse database. This is done for several reasons:

  • Precompute business objects
  • Reduce the number of small inserts
  • Guarantee exactly once semantics

During a specific time window, we grab all the events coming from our streaming pipelines and reconcile them together to build a unified view of every event happening on a web page. If 30mn goes by without us receiving any new update, we consider this window session over and send it to another component to be inserted.

This means that a release of the pipeline can take a long time. In most cases, rolling updates are not possible as you need to keep a common state through your cluster. Also, “stateful” means you need to load your state before being able to process any data.

The issues with the stateful pipeline

Regarding our constraints, using this stateful pipeline to solve our problem was not doable, and getting rid of the time window wasn’t possible either as the whole system depends on it.

Meeting our 1 minute SLA was starting to look pretty complex given the state of the system and time constraints. We really had to figure out an alternative architecture.

Delivering Real-Time through Stateless application

In addition to our main data pipeline, we built a stateless component whose responsibility was to filter relevant events, ensure data quality and format it to suit our real-time data model.

It matched all of our constraints:

  • Rolling updates are fairly easy as it’s an independent system
  • There are no external dependencies
  • Operations are seamless

There are still a few issues with this solution though:

  • We only have partial data, at the event level
  • We can’t reconcile it
  • We are likely inserting duplicates, because the pipeline guarantees only at least once message delivery semantics
  • Makes queries more complex to create for the use case (for instance counting the number of a specific errors happening on a specific page having some specific metadata during a specific time frame).

And here happens some ClickHouse magic. The AggregatingMergeTree engine.

ClickHouse to the rescue

In a classic database schema, your fields are defined by a name and a data type (String, Integer, Float, Boolean, etc) But what if your data type was an aggregation? This is all the AggregatingMergeTree engine is about: all your fields (except your primary key) are aggregations. So, instead of having the following schema:

number_of_errors : Int

You would have:

number_of_errors : SimpleAggregateFunction(Sum, Int)

How does it work? When you want to insert some data for number_of_errors you will still insert integers. But under the hood, ClickHouse will apply the aggregating function, in our case a Sum, for all the rows belonging to the same Primary Key.

Let’s take the following example:

Each time one of our customers encounters an error on our website, we send an “error event” with the customer name.

The inserted data looks as follows:

customer_name (String, PrimaryKey)number_of_errors (Sum, Int)
Alice1
Bob1
Alice1
Alice1

When ClickHouse will eventually merge the data, it will apply all the column’s defined functions (here, a sum) for each PrimaryKey (here, our customers). Our dataset will look like this:

customer_name (String, PrimaryKey)number_of_errors (Sum, Int)
Alice3
Bob1

As you can see, all our data is now aggregated, and we also reduced the number of rows. This is the main purpose of the AggregatingMergeTree engine: reducing the size of your data by applying some specific aggregations. This will play an important role in our real-time use case.

Extending it to event reconciliation

The AggregatingMergeTree engine is powerful. By derivating its behavior, we can build an idempotent event reconciliation system. Immutable data can become a Min or a Max. Arrays data can become a GroupUniqArray etc.

Be careful as all aggregations are not idempotent.

Let’s increase the complexity of our data model. We will receive two kinds of events which will only share a customerId.

Pageview Event: Telling us a user saw a specific web page.

CustomerId : Int
Customer Name: String
Customer City : String
Path: String

Error Event: Telling us the user faced an error

CustomerId: Int
Error Code : Int
Error Name : String

What we want here is a table containing the name, city and errors faced by a specific user. In a classic data model, it would look like this:

CustomerId: Int (Primary Key)
customer_name: String
customer_city: String
errors(Code, Name): Array(Int, String)

Here is how it could look like in the schema of Aggregating Merge Tree:

CustomerId: Int (Primary Key)
Customer Name: SimpleAggregateFunction(max, String)
Customer City: SimpleAggregateFunction(max, String)
Errors: SimpleAggregateFunction(groupUniqArrayArray, Array(Tuple(Code : Int, Name: String)))

The groupUninqArrayArray function is the combination of the groupUniqArray and the -Array suffix. This allows us to create an Array of unique arrays.

Wonderful! Now we receive Order and Errors events. What would happen if we received an Order event and two Errors Events for our dear Alice?

customer_idcustomer_namecustomer_cityerrors
42AliceVivec[]
421[(code: 1, name: voucher expired)]
42[(id: 2, name: checkout failed)]

After the merge process, all our events will be reconciled:

customer_idcustomer_namecustomer_cityerrors
42AliceVivec[(id: 1, name: voucher expired), (id: 2, name: checkout failed)]

And what happens if we receive a duplicate Product Event? Thanks to our idempotent groupUniqArrayArray, it will be deduplicated:

customer_idcustomer_namecustomer_cityerrors
42AliceVivec[(id: 1, name: voucher expired), (id: 2, name: checkout failed)]
42[(id: 2, name: checkout failed)]

After the merge process, it stays the same:

customer_idcustomer_namecustomer_cityerrors
42AliceVivec[(id: 1, name: voucher expired), (id: 2, name: checkout failed)]

As we can see, our events have been reconciled all together and duplicates have been removed, thanks to the groupUniqArrayArray function.

Be careful about the drawbacks

This specific ClickHouse AggregatingMergeTree engine allowed us to build an idempotent data model with event reconciliation.

Because our error field excludes duplicates, it means if Alice faced two times the same error we will lose this information. All is a matter of tradeoff and, as always, you need to carefully build your system to suit your needs.

Also, the ClickHouse merge process is asynchronous. It means you never know when it happens and you may request some unmerged data. You will have to handle this behavior into your queries for fresh data, for example by recreating your aggregations at query time. This comes at the cost of worsening performance, especially for unmerged data.

Our opinion after six months

We are very happy about this real-time reconciliation project. Our data flow is now only a few seconds between event generation and insertion. The cost is low and, the most important thing, our customers are happy.

Thank you Pawel Gontarz, Karim Kidiss, Alexander Beloglazov and Ryad Zenine for this wonderful project. May you stay a yolo engineer forever.