How we scaled a stateful microservice using Redis

At LogicMonitor, ingesting and processing time series metric data is arguably the most critical portion of our AI-powered hybrid observability platform. In order to fully prepare for growth, scale, and fault-tolerance, we have evolved what we refer to as our Metrics Processing Pipeline from a monolith to a microservice-driven architecture. We previously detailed our evolutionary journey in a series of articles:
However, the further we ventured into our evolution from a monolith system into a distributed microservice and message-driven architecture, new problems arose. In this article, we will detail one such problem and how we architected the solution.
First, let’s consider a high-level partial architectural overview. After ingesting time series metric data, the data eventually makes its way onto a Kafka topic where it is consumed and processed by a microservice. This runs on Kubernetes and was written using the Quarkus framework. We run multiple instances of this microservice, and they join the same Kafka consumer group. The partitions of the Kafka topic are assigned to the consumers in the group, and when the service scales out, more instances are created and join the consumer group. The partition assignments will be rebalanced among the consumers, and each instance gets one or more partitions to work on.
This microservice is a highly compute-intensive application, and we leverage the Kubernetes Horizontal Pod Autoscaler (HPA) to automatically scale instances of the application based on cpu utilization metrics.
At LogicMonitor, we support multiple different datapoint metric types for the time series data we ingest. After collecting the data from the source, we need to have some additional processing of the raw data in order to produce the actual value of a datapoint as dictated by its metric type. As a requirement of this processing, we need to cache some prior existing data for each datapoint as we process incoming messages from Kafka. For more details about how we use Kafka in this way, check out this article.
Now, we arrive at the crux of the problem. In order to maximize our scalability and throughput, we’ve built a multi-instance message-consuming application, which scales in and out based on computational load. Additionally, the membership of Kafka consumer group is extremely dynamic in nature, where various partitions can move over from one consumer to another in the same group.
However, as we mentioned above, each datapoint we process has some state associated with it – the cached existing data. Thus, a Kubernetes pod getting killed due to a scale-down event is not a loss-less incident. Now we lose the context associated with the data points that this pod was processing. Similarly, a Kafka partition reassignment is also not a loss-less incident. The new consumer that gets a partition either doesn’t have the context of the data points in the partition, or it has older, out-dated context.
Whenever this loss of context occurs, we experience ephemeral inconsistencies with our metrics processing. We need to address this loss of context that occurs due to a Kubernetes pod shutdown or Kafka partition reassignment.
At first glance, it would seem like there is an obvious solution for this: replace the in-memory cache we have been using to save the context with a distributed cache of some sort. However, there are other factors which make that solution more complicated:
The natural solution is a middle ground between in-memory cache and external distributed cache. We continue to store the contextual data in memory. There are two scenarios which cause the loss of this data:
If we can detect when these two events occur and trigger a persistence of the contextual data to an external distributed cache, we should be able to save the “state.” Subsequently, when we’re looking up contextual data, if we do not find it present in the in-memory cache, we can look it up from the external cache and if found, insert it back into the in-memory cache, thus restoring the “state.”
We can lose context without incurring too much overhead by saving the contextual data into an external distributed persistent cache during container shutdown and partition rebalancing, we avoid losing the contextual data. By only looking up the contextual data from the external cache (if it’s not found in the in-memory cache), we avoid incurring too much increased overhead.
We chose Cluster-mode AWS ElastiCache Redis as our distributed cache. Here are a few of the main reasons:
Here is how we implemented our solution:
LogicMonitor continues to migrate our monolithic services to microservices to improve how we develop, deploy, and maintain our services. Check back for more articles about our experience during the journey.
Blogs
See only what you need, right when you need it. Immediate actionable alerts with our dynamic topology and out-of-the-box AIOps capabilities.