Monday, August 6, 2018

Serverless and (Micro)Batch Processing: AWS Lambda and the S3 data lake

At Warren Rogers we are actively working to modernize our data processing pipelines by leveraging AWS managed services. Although many processes that were once executed in batch are being done in our new real time complex event processing pipeline using Amazon Kinesis, it is still practical (and more affordable) for some processes to run in batch at regular intervals (once a day or month for example). In the legacy system, jobs against file-based data can run for hours, basically using up the valuable resources of a single large server. We needed an efficient and practical way to borrow compute power for a period of time. We soon realized that serverless functions with AWS Lambda are the perfect fit for many of these jobs for a few reasons.

Firstly, our data lake, like in many other companies is stored in Amazon S3. This turned out to be a great decision. Not only is S3 scalable and affordable, but Amazon recently released an incredibly useful optimization tool called S3 Select. Using simple SQL queries, we can pull the exact fields from the exact rows we need for our batch processes. Running S3 Select over large amounts of data (effectively using server-side filtering) can take a relatively short amount of time, which is important when running AWS Lambda functions.

Secondly, running a serverless function against S3 is perfect as Lambdas can run using hundreds of concurrent executions while S3 was built to handle massive scale. Running Lambda unfettered against something like a traditional database would be sure to take it down. In general, one needs to be very careful to take note of the resources a highly scalable and concurrent service like Lambda calls upon. In the best case your Lambdas are running at full speed, with all the allowed concurrency, against a highly scalable backend like S3 or DynamoDB. Otherwise there are ways to reduce the concurrency and hamstring Lambda if you have to (see reserved concurrency).

Thirdly, our data and its processing are naturally segmented by site. This means we can cut up processing into thousands of jobs in a very natural way. Running concurrent Lambdas against S3 has some of the same constraints you might think of when it comes to thread safety as S3 doesn’t make strong consistency guarantees. You don’t want to share data (in particular generated artifacts) across different instances of the job. We have naturally “single-threaded” data that fits nicely with this pattern.

As to the mechanism itself that runs these jobs, we discovered that we could use one Lambda function, triggered by CloudWatch at regular intervals, to grab all the sites (representing discrete jobs) from S3 and place each site as an event in an SNS topic. For each event, we could trigger a Lambda to run that job. I have noticed up to 200 invocations running concurrently using this model.

To use this pattern, there are a few constraints. First, you must be able to break up your job into discrete chunks. Secondly, the job must use no more than 3 GB of memory and run no longer than five minutes. These limits are imposed by AWS Lambda. At Warren Rogers, this fits many of our use cases.

We have found that this pattern uses less resources, costs less and is faster than doing this with say a Hadoop cluster, not to mention all of the headaches of setting up and maintaining virtual machines. Our jobs are less expensive, faster and easier to develop and update using AWS Lambda.

Wednesday, August 1, 2018

IoT Durable Queues: What we learned from database transaction locking

Data is everything at Warren Rogers. The reliable transfer and durability of our data is paramount to us and to our clients. When building the next generation of IoT sensors, it was clear that we would need a guaranteed delivery pipeline that stored data in case of failure. We would need several durable queues backing all events until they were successfully transmitted. Since the core application would be written in Java, Apache Camel with backing ActiveMQ durable queues seemed like the obvious choice.

As with most IoT sensors the volume of sensor data was quite large and so these queues were writing to disk constantly. Ironically, it turns out that in attempting to ensure guaranteed delivery, data was being written to disk so often that it would cause premature hard drive failure. It became clear that we would need to balance disk writes against the possibility of losing data in, for example, some uncontrolled shutdown like sudden power loss.

For inspiration, we looked to optimistic versus pessimistic transaction locking in databases. Turns out that if in the usual case a database transaction is likely to succeed before some other process had “dirtied” the underlying records, it is probably more efficient that you be optimistic and just try to run the transaction rather than locking beforehand. By analogy, if in the normal case we were simply going to send the data to the cloud successfully and then delete the event from disk, why not assume that the operation would succeed unless there was reason to believe otherwise? If we did fail, it was clearly the case that we should no longer assume that we would succeed and would need to persist to disk (become more pessimistic).

This analogy from database transaction locking became the inspiration for the new queue implementation we call the Optimistic Queue. It has two modes, Optimistic (where data is not stored to disk) and Pessimistic (where data is stored to disk). The key is to tune the Queue so that, unless there was a real issue communicating with the cloud service (i.e. networking issues, etc.), the “normal” mode was Optimistic. Any failure to transmit an event would immediately place the queue in Pessimistic mode and save all events in the queue to disk.

In addition, we came up with a couple of useful dials or configuration parameters: maximumQueueSize and queueSizeToReturnToOptimistic.

MaximumQueueSize relates to the size at which, even if we happen to have a healthy connection, we have so much data that a catastrophic event (an uncontrolled shutdown, etc.) would be too costly. This could also detect a degraded connection where we found the cloud service to be too slow a consumer for some reason. When entering the Pessimistic Mode, the queue should persist all events and begin persisting any incoming events.

Assuming we are now Pessimistic Mode but the events are being successfully transmitted to the server, we will want to wait till the queue has drained to some point. It is of course possible that the issue that caused the initial failure still exists but allowed one successful transmission. QueueSizeToReturnToOptimistic is the lower threshold at which we can return to Optimistic Mode. It can be 0 if the normal case when data is flowing is that you never have another event in line behind the one being sent. If instead the normal case is that you have 1, 2 or 5 events queued up at any given time, it may be better to set this variable a little higher so that it can return to Optimistic mode more quickly rather than wait for the rare occurrence when it does reach 0.

The Optimistic Queue dramatically reduced the number of writes to disk without increasing exposure too much. In some cases it was writing to disk with less than a quarter of the frequency. In the very unlikely event of an uncontrolled shutdown where the queue was backed up we could lose up to MaximumQueueSize events. Loosening the constraints on guaranteed delivery allowed for a much longer IoT life meaning more uptime and less data loss overall.

Friday, July 27, 2018

Data Streaming and Storage: Guaranteeing Delivery while Aggregating Data in AWS

At Warren Rogers, we ingest, store and process tens of millions of events from thousands of sites per day. My team was tasked with updating the legacy file-based storage and processing system to a more distributed, highly available and durable streaming solution in AWS.

As we looked at the requirements, it became clear that our data pipeline would need to be a strictly time-ordered stream with guaranteed delivery and very durable persistence. Our data ingestion mechanism would need to be horizontally scalable (and so stateless) and write to a durable storage (so that events could be replayed when needed) as well as publish to a real-time event processing pipeline. An additional requirement was that the data storage solution would need to be efficient at retrieving events for a day or several days for a single IoT sensor (each day holding thousands of events). Also, wherever possible, we would attempt to user AWS managed services so that we could cut on the cost and complexity of maintenance.

It took three attempts to get a solution that satisfied these requirements.

For our first attempt, we targeted various AWS managed services. Amazon S3 became our event storage backend. S3 makes very high durability and availability guarantees and is relatively inexpensive. It also makes long time data storage simpler with lifecycle management. Amazon Kinesis was a natural fit for our real-time processing pipeline and the data ingestion mechanism translated fairly easily to an Amazon Elastic Beanstalk multi-node service.

Time-ordering would be guaranteed on the IoT sensor as it would queue up data and send one event to the data ingestion service at a time. To achieve guaranteed delivery, the data ingestion service would need to place the event in S3 and then in the real-time stream. Only once the event succeeded to post to S3 and the real-time stream did the service return an acknowledgement. If the IoT sensor received an acknowledgement, it would remove the event from the queue and send the next event. Otherwise it would continue to retry the event.

Once we had implemented this system, we came across a problem. It would take tens of seconds (sometimes minutes) to reconstitute one day’s worth of data for a site. Essentially storing each event as an object in S3 made the data storage unusable. Even running a process that would aggregate events after the fact would be expensive. We saw that we needed to group events by IoT sensor (the aggregator Enterprise Integration Pattern) and store them in S3 to get efficient storage.

This highlighted the key problem, how do you aggregate data, maintain data ordering and still guarantee delivery on a multi-node service? The data must be persisted in a durable way before returning an acknowledgement as part of the guaranteed delivery contract. This means it must be stored while it is being aggregated. Obviously writing to disk in a multi-node environment is risky (a step in the wrong direction concerning durability) and would also break ordering as an IoT sensor could deliver the next event to a different node.

Our second rewrite attempted to use EFS (Elastic File System) as a distributed cache where we would group events over some period of time (an hour in site time for example) and then a node would save the file to S3. This allowed for better durability and would maintain ordering.

The problem with EFS is that it was far too slow. There was no way we could keep up with the volume of events that we needed to ingest.

For our third rewrite, we discovered Amazon Data Firehose would take a number of events and aggregate them for us before dumping them to S3. Of course these event objects would need to be grouped and organized by IoT sensor by a continuous background process.

This worked but it seemed highly inefficient to write every record to Firehose. For one, it created an undue resource drain on the data ingestion service. For another, our events were often smaller than 1kb. Firehose rounds up to the next 5kb in cost. In this system, very likely we would be paying five times more on average than we needed. Aggregating before posting to Firehose would be ideal. We  still couldn't aggregate by IoT device and keep our guaranteed delivery contract.

Once the solution dawned on us, it seemed obvious. A service could aggregate across IoT sensors. All the events that happened to be received at a particular service node in some configurable period of time would be grouped and then posted to Firehose. On successful completion, each IoT sensor would receive a confirmation. This had the added benefit of producing a natural throttling mechanism in case of (hopefully very infrequent) downtimes leading to backed up queues.

After the third rewrite, the data ingestion service has been chugging along for a few years now with very few issues.

Tuesday, July 24, 2018

Why we switched to Kotlin (Pt. 3) - Value Objects


In my series of posts on why our team switched from Java to Kotlin, I have touched on how Kotlin more clearly communicates architectural and business decisions concerning nullability and immutability (specifically collections). Last post I described how the following signature was much clearer than the alternative in Java as it guarantees that it will not modify the list.

// Kotlin
fun removePreferredUsers(List<User> list) : List<User&gt
There is still one thing that we cannot tell from this method signature. How can we tell whether the method will alter the User object even if it doesn't change the list I passed in? Is the returned list the same User objects or different ones?

Since Java and Kotlin always store objects in collections by reference, even if the method can't change the list, it can possibly change the objects in the list. We probably would assume that the method above would probably not modify the User objects based on the single responsibility principle (i.e. calculate and set preferred user status on the objects as a side effect), but let's take this signature and let's assume that the User object has a "status" property that is set on the User by this method after looking at other metadata about the user:

// Java
public List calculateStatus(List users);

// Kotlin
fun calculateStatus(users: List) : List
To drive the point even further, consider this signature:

// Java
public User calculateStatus(User user);

// Kotlin
fun calculateStatus(user: User): User 
There is really only one way to know whether the User object returned by the method is a copy (with perhaps one field overwritten) or the exact same object with a mutated field (without looking at them implementation that is). We must know whether the User object allows mutation of that field.

Of course there are many benefits to making objects as immutable as possible, including clarity in cases like these. Item 15 in Effective Java describes some of them as well as how to make objects immutable in Java. The problem is that immutable objects are very unwieldy in Java, making the right road a hard road. To use immutable objects effectively, one would probably need to implement the Value Object Design Pattern. A Java User Value Object might look like this:

// Java
public class User {
    private final String firstName;
    private final String lastName;
    private final UserStatus status;
    private final String birthday;
    private final String joinDate;

    public User(String firstName, String lastName, UserStatus status, String birthday, String joinDate) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.status = status;
        this.birthday = birthday;
        this.joinDate = joinDate;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public UserStatus getStatus() {
        return status;
    }

    public String getBirthday() {
        return birthday;
    }

    public String getJoinDate() {
        return joinDate;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof User)) return false;
        User user = (User) o;
        return Objects.equals(firstName, user.firstName) &&
                Objects.equals(lastName, user.lastName) &&
                status == user.status &&
                Objects.equals(birthday, user.birthday) &&
                Objects.equals(joinDate, user.joinDate);
    }

    @Override
    public int hashCode() {

        return Objects.hash(firstName, lastName, status, birthday, joinDate);
    }
    
    
}
To change a single field on this object, you would have to copy of the fields to a new object and override the one field you wanted like so:

// Java
User newUser = new User(user.getFirstName(), 
  user.getLastName(), 
  UserStatus.PREFERRED, 
  user.getBirthday(), 
  user.getJoinDate());
If you really can't change to Kotlin, at least consider using Lombok. The following user is the equivalent of the class above except that it adds "with" methods that return copies of the object with one field changed. @Whither is an experimental feature though, so use with caution. For more information on these annotations, check out the Lombok documentation for Value Objects.

// Java
@Value
@Wither
public class User {
    String firstName;
    String lastName;
    UserStatus status;
    String birthday;
    String joinDate;
}
With this class, you can set a user with a preferred status like so:

// Java
User newUser = user.withStatus(UserStatus.PREFERRED);
In contrast, a Kotlin Value Object looks like this:

// Kotlin
data class User(val firstName: String,
                val lastName: String,
                val status: UserStatus,
                val birthday: String,
                val joinDate: String)
In Kotlin, the keywords "val" and "var" signal that a property is immutable or mutable respectively. To create a new object with preferred status, you would use the following code:

// Kotlin
val newUser = user.copy(status = UserStatus.PREFERRED)
Note that you can pass as many named arguments to the "copy" method as there are fields. To modify more than one property in the Lombok Java implementation, you would have to chain calls to "with" for as many fields as you wanted to overwrite, creating intermediate objects each time, which is of course slightly less efficient.

Although, Lombok makes immutability much easier, Kotlin allows for full language support for Value Objects rather than having your code instrumented by another library (as with Lombok). The fact that Kotlin makes immutable Value Objects much easier to implement and work with, lending to simpler code that is easier to reason about, is another reason our team moved to Kotlin.

Wednesday, July 18, 2018

Why we switched to Kotlin (Pt. 2) - Immutable/Mutable Collection Interfaces


In the previous post I explained that the reason our team made the jump to Kotlin was primarily because of Kotlin's ability to express business and architectural decisions in a much clearer way than Java can. In the last post I focused on nullability as a key business and architectural decision which, regretablly, has no top level expression in Java but which is a first class concept in Kotlin. In this post I will discuss how immutability, in particular with regard to collections, is also an important design decision which is buried in the implementation in Java but are surfaced in signatures in Kotlin. In the next post I will compare immutability with regard to objects in Java and Kotlin.

Although decisions regarding immutability are often less business decisions than architectural ones, stating them explicitely in the language does allow one to reason about code and understand the intention in software much better. Take for example the following ambiguous signature in Java.

    public List<User> removePreferredUsers (List<User> list)
   

Does the method modify the list I give it? Is the returned list a new list or the same list but now with preffered users removed? Assuming that my intention is to have a list of all users and a list with preferred users excluded, there really would be no way to know whether it is safe to pass in the list of all users or whether we have to create a copy first. Methods like these give me new appreciation for the functional programming constraint that one not modify the inputs. To know that the method is respecting that functional paradigm, I would have to read through the implementation, read possibly existent and possibly accurate Javadocs or write exploratory unit tests. The implementer of this method almost certainly did not mean to communicate that the List should be modifiable. It is merely the default collection interface, the least number of keystrokes.

Not only that but, since the constraint is not enforced at compile time, it is possible that at one point the method was not mutating the list whereas, on a later update, it now is. This could lead to bugs that are very difficult to trace down. It is true that we could merely change the signature to UnmodifiableList and force the client code to wrap the list, but I have never seen this done in practice. It is just too unweildy and, honestly, mutation of arguments is not something people really stop and think about (me included). They just generally assume one way or the other (probably the same reason I have never seen a method take an Optional as an argument, though it is arguably Java's answer to the problem of nullability).

Josh Bloch in his signature book Effective Java states two principles in Item 15 regarding mutability, "Classes should be immutable unless there's a very good reason to make the mutable" and "If a class cannot be made immutable limit its mutability as much as possible" (Kindle Location 1876). He presents the case for these principles more effectively than I could and I recommend you read it. A good language is one that makes best practices easier than the alternative and Java, in this case, does not.

In Kotlin, this method would looks something like this:

    fun removePreferredUsers(List<User> list) : List<User>
   
or this:

    fun removePreferredUsers(MutableList<User> list) : List<User>
   
The List interface, like the Set and Map interfaces, are immutable by default in Kotlin, hiding the mutating methods found in their corresponding interfaces in Java. The intention is much clearer in each of the Kotlin signatures. The first indicates that, at least in the case that there is one or more preferred users, it will return a new list. It guarantees that it will not change the original. Of course, for reasons stated above, I would prefer the first to the second but at least with the second I know what I am exposing the list to and I can make a copy to pass in as an argument if I want to keep the original list unmodified. The implementer of the method chose to add more keystrokes for a reason to arrive at MutableList and that communicates a clear intention to modify.

Using the List interface in the Java example gives us no real indication as to the intentions of the implementer of that method regarding whether the method plans to modify the original argument whereas the signatures in Kotlin do communicate this decision, enforce it at compile time and, as a bonus, make the default implementation the safer one.

Sunday, July 15, 2018

Why we switched to Kotlin (Pt. 1) - Nullability


As an architect/team lead, I don’t take changing our primary backend language lightly. Moving from Java to Kotlin would mean retraining the entire scrum team. Coming up with new coding standards and tools can be a painful process. I certainly wouldn’t undergo this change for mere syntactic sugar. Removing boilerplate may make us faster developers up front, but clever unreadable code wastes the team’s time in the end.

Change would have to come for some other reason. In this case, the reason was primarily around the preciseness and effectiveness of the language in conveying important architectural and business rules. One of the principles I gleaned from Domain Driven Design was that these important rules should not be buried in implementations but should be first class citizens in the model (see the section on Policy on location 754 of the Kindle version). It turns out that Kotlin supports easy and direct representation of two important decisions that are absent in Java, each with compile time support (violations are discovered at compile time rather than runtime). These two are immutability and nullability. I will deal with nullability here and immutability in my next post.

Kotlin has nullable types where the default is non-nullable (making the better practice the default). Any type can be made a nullable type by adding “?”. This is incredibly useful when trying to express a domain's rules. Take for example the following signature in Java.
public void createUser(String firstName,
                       String lastName,
                       String email,
                       String username)
I can decipher nothing about the rules regarding the identity of a User from this signature. I would naturally assume that either username or email or both will serve as unique natural identifiers to distinguish this user from all others (as certainly first and last name could not be guaranteed to be unique). I would expect that the identifying property would be required but the signature tells me nothing about which fields are required. On the other hand, the engineer that implements this signature, in the best case, buries the business rule as an early null check while checking other preconditions. In the worst case, he or she forgets to implement this check leading to more complex bugs in the software or inconsistencies in datastores (I have seen and caused far too many NullPointerExceptions in my career).

In Kotlin, this same signature might be represented as follows:
fun createUser(firstName: String,
               lastName: String,
               email: String,
               username: String?)
        
Here I can see that email is non-nullable but username is nullable. Rather than testing the method with different parameters to see what the requirements for constructing a user are or reading through the implementation (or trusting Javadocs that may or may not be there, and provided they are, may not be accurate), I merely look at the types. Not only that but the compiler will also enforce these rules, adding another layer of safety and correctness to the code.

Exposing this sort of clarity about the domain rather than burying it in a null check in the method makes it easier to reason about and share these rules with others, including business experts. I think this is worth the cost of transitioning from Java to Kotlin.