6 tips to push and discover your pipeline's limits

At Contentsquare we have a product called Session Replay, which allows our customers to see user events (clicks, scrolls, etc) in a video-like fashion. When we acquired another company in 2019, they also had a similar product. As we didn’t want to maintain two similar pipelines, we decided that one of them would handle traffic for customers of both companies. This is when we started designing a load test to validate that we were ready for this merge.

This article sums up the 6 most important tips we learned.

Tip 1 - Define the context

You always need to come with strong reasons that justify why you want to know your pipeline's limits because it's a pretty time consuming process in terms of human days and costs.

For example, when planning a migration, you need to validate that the target pipeline can handle at least as much traffic as the original one. When anticipating a future spike, like Black Friday, you need to ensure your pipeline will handle it correctly. Or as a performance optimisation, to identify bottlenecks so you can tweak them to your advantage.

In our case, we knew that one pipeline needed to ingest regular traffic of two companies.

Tip 2 - Identify your load metric

The goal of a stress test is to introduce more load than usual on your pipeline. You have to identify which limit to push in order to simulate this behavior. Below is a list of examples:

  • To ensure that an HTTP service is able to answer under high traffic you would increase the number of input requests / sec.
  • To measure the impact of a SQL query on a cluster you would increase the number of rows into a table.
  • To pick the most reliable disk type, you would increase the number of writes per second on different disks.

The main responsibility of our Session Replay pipeline is to store the pageviews visited by the users so by increasing the number of pageviews to record, we will be able to stress all the components in the pipeline.

Tip 3 - Try to use production data

There are two possible categories of data to use:

  • Fake Data: automatically generated by a program
  • Production Data: copy of what is really hitting your infra in production (backups, snapshot, Kafka MirrorMaker)

Using production data is preferable to better simulate real life scenario. Consider using fake data only if technical reasons prevent you from using production data. In our case, we created a component that was duplicating any incoming production traffic into our load test cluster. Before showing the details of this component, let's zoom in to our Session Replay pipeline today.

Session Replay Pipeline
Session Replay Pipeline Overview

This diagram shows the big picture about how the data is moving through in our Session Replay pipeline:

  • We have an HTTP service, working as an entrypoint that receive the user events (HTML, click, scroll, etc) and forward them to a Kafka cluster.
  • The Session Replay Pipeline is transforming a stream of user events into a stream of user pageviews.
  • It reads the data from Kafka, doing data normalization and inserting into Aerospike, our distributed cache.
  • We are using the change data capture feature from Aerospike to know when a pageview is going to expire from the cache and before its deletion, we are saving it into a distributed blob storage.
  • Adding audits on the critical points of our pipeline allows to track each pageview in our pipeline.

Data duplication flow
Data duplication flow

Based on the previous diagram and to duplicate the traffic, we had to:

  • Read the user payloads from the Kafka production cluster
  • Duplicate them n times
  • Sending them to a Kafka cluster deployed in a development environment

We have implemented this new component using Akka Stream Kafka and with few lines of code, we are able to use our production data to run the stress test.

object Main extends App {
    implicit val system: ActorSystem = ActorSystem()
    implicit val executionContext: ExecutionContextExecutor = system.dispatcher
    val config = ConfigFactory.load()
    // How much we want to duplicate the traffic
    val N = 3

    val committerSettings = CommitterSettings(actorSystem)

    // Consume data from a Kafka production cluster
    val consumerSettings: ConsumerSettings[String, Array[Byte]] = ???
    val consumerSettings: ConsumerSettings[String, Array[Byte]] = ???

    // Produce data into a Kafka dev cluster
    val producerSettings: ProducerSettings[String, Array[Byte]] = ???

    // Incoming streaming source
    val incomingDataSource = Consumer.committableSource(consumerSettings,
    Subscriptions.topics("input"))

    // The function that duplicates N time an input pageview
    def duplicatePageviews(pageview: Pageview, n: Int): Seq[Pageview] = ???

    incomingDataSource
      .map { msg =>
        val pageview = Pageview.parseFrom(msg.record.value())
        (pageview, msg.committableOffset)
      }.map { case (pageview, offset) => (duplicatePageviews(pageview, n), offset)
      }.map { case (pageviews, offset) =>
           val kafkaRecords =
            pageviews
              .map(mkProducerRecord)
              .to[immutable.Seq]
           ProducerMessage.multi(kafkaRecords, offset)
      }.via(Producer.flexiFlow[String, Array[Byte], CommittableOffset](producerSettings))
       .map {
        case ProducerMessage.Result(_, message) => message.passThrough
        case ProducerMessage.MultiResult(_, passThrough) => passThrough
        case ProducerMessage.PassThroughResult(passThrough) => passThrough
      }.toMat(Committer.sink(committerSettings))(DrainingControl.apply[Done])
      .withAttributes(ActorAttributes.supervisionStrategy(Main.decider))
      .run()
      .streamCompletion
}

By duplicating the production traffic and sending it to another Kafka cluster, we now have a way to stress test all the components in the pipeline.

Tip 4 - Control your load test cost

Doing a stress test might require deploying components like a database, a Kubernetes cluster or a Kafka cluster. This could add extra cost in terms of money and you should be careful to avoid any bad surprises at the end.

Here is what we suggest:

  • Identify the operations having the biggest cost in your pipeline and challenge if they should take part in the test. In our Session Replay pipeline, the last operation is to save the data in S3 but we have decided to mock it because we don't have any doubts about the scalability of S3. We cannot skip the part that uploads them (see Tip 5) but it still reduces the costs since we have less PUT operations in S3.
  • There is no need to run a stress test longer than a day. Running for a few hours should already give you good insights. We have decided to duplicate the Session Replay traffic only on the peak 4-hour window of each day.
  • Doing an estimation of the input traffic will help to correctly size the pipeline components (such a Kafka or a Database).
  • Always consider destroying the components that you have created when your tests are done.

Tip 5 - Define your validation check

We always need a check to validate the hypothesis made before running the stress test. For example:

  • You have tried a new SQL query. And, measuring its execution time helps you to know if you really did some improvements.
  • You made a lot of HTTP requests to a service and you always get a response from it.

By doing so, it's like giving you a goal to reach, like the final output for your stress test. For the Session Replay pipeline, we did this validation by comparing the number of pageviews recorded during our test against the number recorded in production. We know the factor of duplication of input traffic (x3 or x6) so that, we have an approximation of how many pageviews we are expecting at the end.

In the critical parts of our pipeline, we send audits in the JSON format to our Kafka cluster and they are consumed by a component that uploads them into a S3 bucket. To explore these audits, we are using AWS Athena to query the data in a SQL-style to compare the number of recorded pageviews in production vs the ones generated by our test.

Tip 6 - Monitor everything

With this kind of test, you have the opportunity to identify what are the main bottlenecks that may cause incidents in the future. You may have multiple iterations and each iteration is kind of a feedback loop that provides insights about your pipeline's state.

Let's take some concrete situations:

  • You have decided to test a new SQL feature that takes too much time to answer so one possible action is to scale (up or out) the cluster to give it more power.
  • You have performed a lot of writes on a disk that it seems to slow your whole system's performance, so that one possible action is take another disk instance with a better throughput.

To be able to take the right actions, monitoring should be enabled everywhere in the pipeline and your decisions should be based on your monitoring metrics. In our use case, we were able to identify two bottlenecks:

Aerospike

Aerospike is a distributed cache, that is storing keys in RAM and the values either in RAM, or on SSD. We have decided to store them in SSD but it has a cost. And, before this stress test, we were unable to know what is the maximum write rate we could reach by node. The first iteration gives us a precious insight: when the number of writes / sec / node was too high regarding the cluster size, our Aerospike client is starting to get a "Device Overload" error that indicates a node cannot handle writes anymore. That's really critical because it means a potential data loss. To mitigate this issue, we have implemented a back-off strategy that retries the failed writes a bit later but even with that, the cluster was not correctly sized to handle the input traffic.

Device overload
Number of Aerospike device overload reached during a load test

So after this iteration, we decided to increase the number of nodes to better partition the writes on disk. This time, the load test runs without any data loss so the key lesson we learned here is that we now have a metric that helps us to decide when we should scale out our Aerospike cluster.

Kubernetes

The majority of our components are running in Kubernetes and adding new pods with k8s is a pretty easy task. But did you already try to deploy more than 200 pods into a k8s cluster? Well, for the highest traffic scenario, we had to deploy exactly 256 pods to avoid too much Kafka lag on the consumer group and our k8s cluster is backed by Amazon EKS.

To be able to deploy new pods, the k8s cluster should have enough machines otherwise it's going to create the new machines in the k8s cluster. Popping new EC2 instances is not something that happens immediately. In this situation, if your Kafka consumer starts to accumulate too much lag it will start to consume the data from the disk instead of the page cache. This will create more I/O on the Kafka cluster and put the consumers under pressure until they catch up.

The key lesson that we learned is that we should have a k8s cluster correctly sized to cover this use case instead of updating the cluster dynamically.

Conclusion

The most important lessons we learned with this exercise are:

  • a stress test is a feedback loop split into multiple iterations
  • you need to consider the trade-off about what you test and how to test it
  • monitoring is the key to have the right insights and take the right actions

We hope that this checklist will help you as much as it helped us.

Thanks to Tim Carry, Joël Trigalo, Emre Karagozoglu, Yohann Jardin and Valerio Figliuolo for reviewing drafts of this.