At Warren Rogers we are actively working to modernize our data processing pipelines by leveraging AWS managed services. Although many processes that were once executed in batch are being done in our new real time complex event processing pipeline using Amazon Kinesis, it is still practical (and more affordable) for some processes to run in batch at regular intervals (once a day or month for example). In the legacy system, jobs against file-based data can run for hours, basically using up the valuable resources of a single large server. We needed an efficient and practical way to borrow compute power for a period of time. We soon realized that serverless functions with AWS Lambda are the perfect fit for many of these jobs for a few reasons.
Firstly, our data lake, like in many other companies is stored in Amazon S3. This turned out to be a great decision. Not only is S3 scalable and affordable, but Amazon recently released an incredibly useful optimization tool called S3 Select. Using simple SQL queries, we can pull the exact fields from the exact rows we need for our batch processes. Running S3 Select over large amounts of data (effectively using server-side filtering) can take a relatively short amount of time, which is important when running AWS Lambda functions.
Secondly, running a serverless function against S3 is perfect as Lambdas can run using hundreds of concurrent executions while S3 was built to handle massive scale. Running Lambda unfettered against something like a traditional database would be sure to take it down. In general, one needs to be very careful to take note of the resources a highly scalable and concurrent service like Lambda calls upon. In the best case your Lambdas are running at full speed, with all the allowed concurrency, against a highly scalable backend like S3 or DynamoDB. Otherwise there are ways to reduce the concurrency and hamstring Lambda if you have to (see reserved concurrency).
Thirdly, our data and its processing are naturally segmented by site. This means we can cut up processing into thousands of jobs in a very natural way. Running concurrent Lambdas against S3 has some of the same constraints you might think of when it comes to thread safety as S3 doesn’t make strong consistency guarantees. You don’t want to share data (in particular generated artifacts) across different instances of the job. We have naturally “single-threaded” data that fits nicely with this pattern.
As to the mechanism itself that runs these jobs, we discovered that we could use one Lambda function, triggered by CloudWatch at regular intervals, to grab all the sites (representing discrete jobs) from S3 and place each site as an event in an SNS topic. For each event, we could trigger a Lambda to run that job. I have noticed up to 200 invocations running concurrently using this model.
To use this pattern, there are a few constraints. First, you must be able to break up your job into discrete chunks. Secondly, the job must use no more than 3 GB of memory and run no longer than five minutes. These limits are imposed by AWS Lambda. At Warren Rogers, this fits many of our use cases.
We have found that this pattern uses less resources, costs less and is faster than doing this with say a Hadoop cluster, not to mention all of the headaches of setting up and maintaining virtual machines. Our jobs are less expensive, faster and easier to develop and update using AWS Lambda.
Monday, August 6, 2018
Wednesday, August 1, 2018
IoT Durable Queues: What we learned from database transaction locking
Data is everything at Warren Rogers. The reliable transfer and durability of our data is paramount to us and to our clients. When building the next generation of IoT sensors, it was clear that we would need a guaranteed delivery pipeline that stored data in case of failure. We would need several durable queues backing all events until they were successfully transmitted. Since the core application would be written in Java, Apache Camel with backing ActiveMQ durable queues seemed like the obvious choice.
As with most IoT sensors the volume of sensor data was quite large and so these queues were writing to disk constantly. Ironically, it turns out that in attempting to ensure guaranteed delivery, data was being written to disk so often that it would cause premature hard drive failure. It became clear that we would need to balance disk writes against the possibility of losing data in, for example, some uncontrolled shutdown like sudden power loss.
For inspiration, we looked to optimistic versus pessimistic transaction locking in databases. Turns out that if in the usual case a database transaction is likely to succeed before some other process had “dirtied” the underlying records, it is probably more efficient that you be optimistic and just try to run the transaction rather than locking beforehand. By analogy, if in the normal case we were simply going to send the data to the cloud successfully and then delete the event from disk, why not assume that the operation would succeed unless there was reason to believe otherwise? If we did fail, it was clearly the case that we should no longer assume that we would succeed and would need to persist to disk (become more pessimistic).
This analogy from database transaction locking became the inspiration for the new queue implementation we call the Optimistic Queue. It has two modes, Optimistic (where data is not stored to disk) and Pessimistic (where data is stored to disk). The key is to tune the Queue so that, unless there was a real issue communicating with the cloud service (i.e. networking issues, etc.), the “normal” mode was Optimistic. Any failure to transmit an event would immediately place the queue in Pessimistic mode and save all events in the queue to disk.
In addition, we came up with a couple of useful dials or configuration parameters: maximumQueueSize and queueSizeToReturnToOptimistic.
MaximumQueueSize relates to the size at which, even if we happen to have a healthy connection, we have so much data that a catastrophic event (an uncontrolled shutdown, etc.) would be too costly. This could also detect a degraded connection where we found the cloud service to be too slow a consumer for some reason. When entering the Pessimistic Mode, the queue should persist all events and begin persisting any incoming events.
Assuming we are now Pessimistic Mode but the events are being successfully transmitted to the server, we will want to wait till the queue has drained to some point. It is of course possible that the issue that caused the initial failure still exists but allowed one successful transmission. QueueSizeToReturnToOptimistic is the lower threshold at which we can return to Optimistic Mode. It can be 0 if the normal case when data is flowing is that you never have another event in line behind the one being sent. If instead the normal case is that you have 1, 2 or 5 events queued up at any given time, it may be better to set this variable a little higher so that it can return to Optimistic mode more quickly rather than wait for the rare occurrence when it does reach 0.
The Optimistic Queue dramatically reduced the number of writes to disk without increasing exposure too much. In some cases it was writing to disk with less than a quarter of the frequency. In the very unlikely event of an uncontrolled shutdown where the queue was backed up we could lose up to MaximumQueueSize events. Loosening the constraints on guaranteed delivery allowed for a much longer IoT life meaning more uptime and less data loss overall.
As with most IoT sensors the volume of sensor data was quite large and so these queues were writing to disk constantly. Ironically, it turns out that in attempting to ensure guaranteed delivery, data was being written to disk so often that it would cause premature hard drive failure. It became clear that we would need to balance disk writes against the possibility of losing data in, for example, some uncontrolled shutdown like sudden power loss.
For inspiration, we looked to optimistic versus pessimistic transaction locking in databases. Turns out that if in the usual case a database transaction is likely to succeed before some other process had “dirtied” the underlying records, it is probably more efficient that you be optimistic and just try to run the transaction rather than locking beforehand. By analogy, if in the normal case we were simply going to send the data to the cloud successfully and then delete the event from disk, why not assume that the operation would succeed unless there was reason to believe otherwise? If we did fail, it was clearly the case that we should no longer assume that we would succeed and would need to persist to disk (become more pessimistic).
This analogy from database transaction locking became the inspiration for the new queue implementation we call the Optimistic Queue. It has two modes, Optimistic (where data is not stored to disk) and Pessimistic (where data is stored to disk). The key is to tune the Queue so that, unless there was a real issue communicating with the cloud service (i.e. networking issues, etc.), the “normal” mode was Optimistic. Any failure to transmit an event would immediately place the queue in Pessimistic mode and save all events in the queue to disk.
In addition, we came up with a couple of useful dials or configuration parameters: maximumQueueSize and queueSizeToReturnToOptimistic.
MaximumQueueSize relates to the size at which, even if we happen to have a healthy connection, we have so much data that a catastrophic event (an uncontrolled shutdown, etc.) would be too costly. This could also detect a degraded connection where we found the cloud service to be too slow a consumer for some reason. When entering the Pessimistic Mode, the queue should persist all events and begin persisting any incoming events.
Assuming we are now Pessimistic Mode but the events are being successfully transmitted to the server, we will want to wait till the queue has drained to some point. It is of course possible that the issue that caused the initial failure still exists but allowed one successful transmission. QueueSizeToReturnToOptimistic is the lower threshold at which we can return to Optimistic Mode. It can be 0 if the normal case when data is flowing is that you never have another event in line behind the one being sent. If instead the normal case is that you have 1, 2 or 5 events queued up at any given time, it may be better to set this variable a little higher so that it can return to Optimistic mode more quickly rather than wait for the rare occurrence when it does reach 0.
The Optimistic Queue dramatically reduced the number of writes to disk without increasing exposure too much. In some cases it was writing to disk with less than a quarter of the frequency. In the very unlikely event of an uncontrolled shutdown where the queue was backed up we could lose up to MaximumQueueSize events. Loosening the constraints on guaranteed delivery allowed for a much longer IoT life meaning more uptime and less data loss overall.
Subscribe to:
Posts (Atom)