How to optimize AWS Lambda & Kinesis to process 5 million records per minute

The learning outcomes of optimizing the Kinesis Data Stream flow to support high throughput.

Ravi Sharma
Towards AWS

--

Amzaon Kinesis data stream can be easily integrated with AWS Lambda to process records. Add a trigger to Lambda with Kinesis as a source and it will start consuming records from the Kinesis. However, it requires more than just adding a trigger when dealing with a high volume of records. Lambda & Kinesis Data Stream requires optimization to support the increased load. The article talks about 5 million records (actually, our system is capable of handling up to 15 million, but the peak we have processed to date is 5 million) but you can tune it further to support much higher volume as needed. The idea is to share the measures taken to improve the performance.

The Challenge

Last year, our analytics team migrated their infrastructure to Snowflake. Their primary goal was to support the growing number of clients — while ensuring a quicker processing time.

Our service supports analytics, we have an ingestion pipeline that eventually publishes all the data to the analytics service. A lambda processes the records received from the kinesis pipeline & performs some actions, and transformations on the data & pushes it further to the analytics service managed by the analytics team. The analytics service is then used by users to perform analysis of the data.

Analytics Ingestion Flow

As Kinesis and Lambda serve as intermediaries before the data reaches the centralized analytics service, it was critical to improve our system before onboarding new large clients on the analytics pipeline. We already had alerts in place to detect any anomalies in the pipeline. The next step was to perform a load test.

During the load test, we encountered two alerts related to kinesis & one related to lambda.

  • The first was related to the High Iterator Age in kinesis, and the second was for ProvisionedThroughputExceededException. Our kinesis pipeline was overwhelmed.
  • So were the lambda processing records from it. The third one was related to the timeout. It depends on the latency of the external system and lambda processing capability.

Analysis

The importance of metrics and CloudWatch is paramount while analyzing issues. I recommend enabling “Enhanced (shard-level) metrics” at the Kinesis level for more in-depth analysis. Keep in mind that this will incur additional costs.

  1. High Iterator Age: During analysis, I discovered that the processing speed of lambda was slower than the ingestion rate into the pipeline. This caused the Iterator age of the kinesis to increase. If not addressed, this could lead to the dropping of events once the data retention time passed. The default retention period for data streams is 24 hours. Additional charges apply for data streams with a retention period of over 24 hours. Because the data is critical for analysis, we extended the retention period to 7 days( incurring extra costs).
  2. ProvisionedThroughputExceededException: The write capacity of kinesis per shard is 1000 records/second. The ingestion rate into the kinesis was breaching this limit thus leading to this exception. The service attempts retries in case of ingestion failure to kinesis but the retries threshold was also breached leading to the dropping of the events.
  3. Timeout: The lambda was timing out before processing the event. The limit for lambda to process events was 3 seconds. Some of the events were breaching that limit leading to errors. The alert was triggered once the success rate dropped below 99.9% i.e. more than 0.1% errors were observed in the last 10 minutes.

There was another error unrelated to performance, rather an incorrect record data. I will be discussing strategies to handle or process records with corrupt data in another article as it requires careful observation.

Measures

With the analysis part done, it was required to put in place the measure to resolve these issues & support a 5 million rpm load in the pipeline.

Increase the shard count of Kinesis

As previously discussed, Kinesis has a write limit of 1000 records/shard. To handle 5 million records per minute, approximately 34 shards are required. However, the number of shards was only 8, insufficient to handle this volume.

Though it may appear that increasing the number of shards to 34 would solve the problem, it's important to understand that messages are not equally distributed among shards. The distribution of messages depends on the partitioning key, and even with a randomized partition key, achieving an even distribution of messages across shards can be challenging.

Therefore, it is always better to have more shards, in this case, at least 50. While increasing the shard count does increase the cost of retention and capacity, nothing much can be done about it. Even using an enhanced fan-out consumer would not address the issue, as we require a high write limit of records, rather than data.

Improve Partitioning Logic in Service

After increasing the Kinesis shard count, we still encountered "ProvisionedThroughputExceededException". However, the ‘Enhanced (shard-level) metrics’ helped to find the root cause. By analyzing the shard-level metrics in Cloudwatch, I discovered that a few shards still exceeded the threshold of 1000 writes per second.

As we discussed earlier, message distribution is dependent on the partitioning key. Our service was generating fewer randomized keys, which caused some shards to receive more throughput than others. We improved the partitioning key logic, resulting in better distribution and thus resolving the error.

Increase the batch size

Batch Size

Batch size refers to the maximum number of records that can be sent to a function in each batch, with a limit of 10,000 records. Increasing the batch size can improve efficiency by reducing the lambda invocation count. It allows you to process more records in a single execution. However, keep in mind that this can impact the processing time of the lambda. We will discuss how to configure the function timeout in the next section.

We conducted experiments with different batch sizes and discovered that using 50 as the batch size worked best. Increasing the batch size further caused problems in the analytics service because there is a limit to the number of records it can process in a single request.

If you want to use a batch size greater than 10, you must specify the batch window. The batch window determines the maximum amount of time in seconds that records can be collected before the function is invoked. If you are dealing with a high load, having a batch window will not significantly impact performance because the batch will be received regardless. However, if you have low throughput and a high batch size, it is best to use a batch window so that Lambda can wait to gather the records.

Increase Concurrent batches per shard

In the previous measure, we increased the batch size per shard to 50. With 50 shards in place, a maximum of 2500 records can be processed concurrently. By default, one Lambda invocation is done per shard. To increase concurrency further, you can process multiple batches from each shard in parallel. Lambda can process up to 10 batches in each shard simultaneously. If you increase the number of concurrent batches per shard, Lambda still ensures in-order processing at the partition-key level.

Concurrent batches

With a concurrency level of 10, the total concurrent lambda invocations can be increased to 500 per shard. With 50 records per batch, the processing capacity became 25000 records. Even if each lambda takes 1 second to process the batch, you could still process around 1.5 million events per second. That’s nice!

This helped to scale up the processing throughput & after some time to clear the lag, IteratorAge decreased significantly.

Increase lambda memory

We were using the default memory of 128 MB for our lambda function. With this memory size, the lambda took an average of 500 ms to process each batch. While there were occasional timeouts due to various reasons, the average processing time was around 500 ms. Based on this timing, the processing power of the system was 3 million per second, which is quite impressive. However, to be on the safe side and allow for any future increase in throughput, it was decided to have some buffer.

The performance of the lambda function was tested with different memory settings. By increasing the memory to 384 MB, processing time was reduced from 500 ms to under 100 ms. Increasing the memory further did not have any impact on the performance, as the lambda function was dependent on external latency. Yet, by decreasing the average processing time by a factor of 5, the processing capacity increased to 15 million.

You pay for the amount of time that your function code runs, multiplied by the amount of memory used. Test your function with different memory settings to see how it affects performance, and find the right balance between cost and speed for your use case.

Increase timeout (Optional)

When using Lambda, your code can only run for a specific length of time before it times out. This time limit is called the timeout limit and is measured in seconds. By default, the timeout limit is set to 3 seconds, but you can adjust it to a maximum of 15 minutes in increments of 1 second.

An appropriate timeout value is required for a function to avoid unexpected timeouts. If the timeout value is set too close to the average duration of a function, it can increase the risk of unexpected timeouts. The duration of a function can vary depending on various factors such as data processing, transfer, and the latency of any services it interacts with.

Our lambda function depends on external services and high latency in API calls can lead to an increase in error count due to timeout. We have set a threshold limit of 0.1% errors, i.e. only 1 error is allowed for every 1000 invocations. This threshold was occasionally crossed whenever the external service was slow to respond, resulting in alerts. To provide more processing time to function, we increased the timeout to 10 seconds. Please note that most of the processing was still under 100 ms.

While analyzing the function timing, I noticed that we called the secret manager to fetch a token during every lambda invocation. The token expiry was one day. It was unnecessary to fetch it every time. AWS Secrets Lambda Extension was implemented in the code to address this issue. I have also written an article that explains how this issue was resolved.

Earlier I mentioned an additional error that was not related to performance. This error caused certain shards to become stuck in retries due to corrupt records, resulting in a high IteratorAge alert. The solution requires configuration changes at the lambda end, and an article detailing these changes is currently in the process of being written. Once the article is complete, I will provide the link here

Summary

In the article, I showcased how to improve the capabilities of Lambda & Kinesis with the help of a practical use-case scenario. The purpose is to help you understand the challenges and the lessons learned. Although the primary focus was to achieve optimal performance for processing 5 million records, the configuration changes can be further fine-tuned to support even higher workloads.

--

--

SDE 4 at Adobe | Microservices | Cloud | Web Development | Game Development | Previously worked at Paytm, Nearbuy.com(Groupon India), Nagarro