Friday, July 27, 2018

Data Streaming and Storage: Guaranteeing Delivery while Aggregating Data in AWS

At Warren Rogers, we ingest, store and process tens of millions of events from thousands of sites per day. My team was tasked with updating the legacy file-based storage and processing system to a more distributed, highly available and durable streaming solution in AWS.

As we looked at the requirements, it became clear that our data pipeline would need to be a strictly time-ordered stream with guaranteed delivery and very durable persistence. Our data ingestion mechanism would need to be horizontally scalable (and so stateless) and write to a durable storage (so that events could be replayed when needed) as well as publish to a real-time event processing pipeline. An additional requirement was that the data storage solution would need to be efficient at retrieving events for a day or several days for a single IoT sensor (each day holding thousands of events). Also, wherever possible, we would attempt to user AWS managed services so that we could cut on the cost and complexity of maintenance.

It took three attempts to get a solution that satisfied these requirements.

For our first attempt, we targeted various AWS managed services. Amazon S3 became our event storage backend. S3 makes very high durability and availability guarantees and is relatively inexpensive. It also makes long time data storage simpler with lifecycle management. Amazon Kinesis was a natural fit for our real-time processing pipeline and the data ingestion mechanism translated fairly easily to an Amazon Elastic Beanstalk multi-node service.

Time-ordering would be guaranteed on the IoT sensor as it would queue up data and send one event to the data ingestion service at a time. To achieve guaranteed delivery, the data ingestion service would need to place the event in S3 and then in the real-time stream. Only once the event succeeded to post to S3 and the real-time stream did the service return an acknowledgement. If the IoT sensor received an acknowledgement, it would remove the event from the queue and send the next event. Otherwise it would continue to retry the event.

Once we had implemented this system, we came across a problem. It would take tens of seconds (sometimes minutes) to reconstitute one day’s worth of data for a site. Essentially storing each event as an object in S3 made the data storage unusable. Even running a process that would aggregate events after the fact would be expensive. We saw that we needed to group events by IoT sensor (the aggregator Enterprise Integration Pattern) and store them in S3 to get efficient storage.

This highlighted the key problem, how do you aggregate data, maintain data ordering and still guarantee delivery on a multi-node service? The data must be persisted in a durable way before returning an acknowledgement as part of the guaranteed delivery contract. This means it must be stored while it is being aggregated. Obviously writing to disk in a multi-node environment is risky (a step in the wrong direction concerning durability) and would also break ordering as an IoT sensor could deliver the next event to a different node.

Our second rewrite attempted to use EFS (Elastic File System) as a distributed cache where we would group events over some period of time (an hour in site time for example) and then a node would save the file to S3. This allowed for better durability and would maintain ordering.

The problem with EFS is that it was far too slow. There was no way we could keep up with the volume of events that we needed to ingest.

For our third rewrite, we discovered Amazon Data Firehose would take a number of events and aggregate them for us before dumping them to S3. Of course these event objects would need to be grouped and organized by IoT sensor by a continuous background process.

This worked but it seemed highly inefficient to write every record to Firehose. For one, it created an undue resource drain on the data ingestion service. For another, our events were often smaller than 1kb. Firehose rounds up to the next 5kb in cost. In this system, very likely we would be paying five times more on average than we needed. Aggregating before posting to Firehose would be ideal. We  still couldn't aggregate by IoT device and keep our guaranteed delivery contract.

Once the solution dawned on us, it seemed obvious. A service could aggregate across IoT sensors. All the events that happened to be received at a particular service node in some configurable period of time would be grouped and then posted to Firehose. On successful completion, each IoT sensor would receive a confirmation. This had the added benefit of producing a natural throttling mechanism in case of (hopefully very infrequent) downtimes leading to backed up queues.

After the third rewrite, the data ingestion service has been chugging along for a few years now with very few issues.

No comments:

Post a Comment