In this article, I will show you the most common issues that might occur in a very simple Big Data application and, based on these problems, I will explain the core concepts from the world of Big Data.
Let’s imagine that we are hired to implement an application to count the number of visits to a very popular social media portal. We have to track user logins and how many times a user enters a certain page. The results should be available in real-time.
It doesn’t sound complicated, does it? A simple listener attached to our web controllers and a database with a single table should be just enough. Whenever a user enters a certain page, the event is logged and stored in the database. If that user visits the page again, the count value is updated accordingly.
Sample data would look like this:
All is going well so far, but we know that it’s only the lull before the storm. As the traffic of the website increases, we will have to solve new problems that will arise.
The first major problem that we will probably have to face will be latency. With the increased number of users and number of reads/writes, our table size will also increase. Each user request generates a database update and with a huge number of visits, it can create a bottleneck in our database. Of course there are many ways to solve this problem but I particularly want to focus on batch processing.
Instead of updating the database after each visit, we can wait for a specified number of events (or an amount of time) and then perform a single, large update. We could introduce a queue that holds objects before writing them to the database. That way, we can decrease the number of writes to our storage and eventually make it more responsive.
Naturally, there are downsides to this approach:
- loss of real-time data; our data is only as accurate as the gaps between batch updates
- data does not persist, yet can be lost
These two downsides can be easily overcome by using, for example Kafka as temporary storage. However, it is not a subject of this article.
Not enough space
Sooner or later, we will have to deal with another problem — lack of disk space. It’s obvious that a single disk won’t be enough and we will have to add another. But how can we span our database across more than one disk? This is where sharding comes in. Sharding is a pattern of splitting a table into partitions, each containing a subset of rows. Sharding means horizontal partitioning; we don’t need to change the table schema to increase database capacity. But we need to implement an algorithm for finding the shard containing the row we are looking for. Here comes the concept called partition key. It is a value that determines which shard should be used to store the row. In our case the most natural value would be ‘User’ column, but of course, the decision depends on the aggregations that the client wants.
We also have the option for vertical partitioning. In this case, we split columns into different tables and put these tables on separate servers.
No access to a shard
Our application is now up and running. We have finally solved problems with latency and free space by adding batch processing and by introducing shards. Now, instead of only one database server, we have more instances. Some of these instances are on separate machines, which means better load balancing but, on the other hand, more machines to maintain and additional reasons for our application to lose access to the database. Failure of a single shard makes our data invalid, thus unusable. A remedy for such a situation lies in replication. Instead of having only one instance holding a certain subset of data, we should have more of them, ideally on a different machine. That way, in case of failure, our dispatcher can fetch data from a different machine.
Machine 1 is a primary node for Shard A data and backup node for Shard B data. When Machine 2 goes down, Machine 1 will serve Shard B data. There is no single point of failure, which is very important for distributed systems. Of course, replication is not a feature that you get free of charge. With replication comes issues connected with data synchronisation between primary and backup nodes.
System state recovery
So far so good. We have identified and provided solutions for common issues. With some coding we were able to handle latency problems and we increased fault tolerance. But there are issues that are impossible to predict and, what is worse, impossible to fix. Namely, problems caused by human mistakes. In the majority of situations, a simple fix in the code is enough. However, like I said, sometimes you can’t fix everything. Imagine an error that caused our visit count to be updated twice resulting in increasing it by 2. Once deployed to production, it is very hard to find such a bug, because nothing suspicious is in the system. Even when you finally locate that issue, you won’t know how much of the data was broken.
In situations similar to this one, a very useful approach is to avoid mutability and use immutable objects as much as you can. For us, this means reworking the database. Instead of having a table that holds user visits, we should store events. Each user visit creates an event with user login, date and requested path. No need to update anything. Simple and fast insert. This approach allows us to restore the application state to a given date — it is called event sourcing.
Nevertheless, the events table won’t give us the number of visits. Events are raw data. Therefore, we have to create a view that will aggregate it for us. Data (events) is used to obtain Information (number of visits). As a developer, you can’t predict what kind of information will be needed in the future but having these events stored in a safe place, you can be sure that sooner or later a data scientist will make use of this data.
Online wholesale example
As an example I would like to show you a database design for an online wholesale shop. Our goal is to store basic information about companies and their orders. Everything is stored in an immutable manner.
The first table contains companies and the second one contains actions that companies perform regarding their order lists. You can see that company Quantum Cosmetics has made a few orders (and cancelled two of them). To count the number of active orders that Quantum Cosmetics has, you perform a simple query that filters out orders that they have cancelled and counts what is left.
Of course, one can say that it might be time consuming to aggregate all the actions Quantum Cosmetics performed. This is true and a good reason to use lambda architecture. Basically speaking, at given dates you run the batches that aggregates data from current events so that you won’t have to calculate it from the beginning. In our case, one aggregation that could be stored in a separate table would be the “active” orders. We call it batch view. Now to obtain the current list of active orders, we have to merge the batch view with events that came after the batch run and are in the speed layer.
What happens when Quantum Cosmetics moves the headquarters to a different town?
A new entry is added to the table. How to get the current version of “Quantum Cosmetics”? Simply take the row with Company Id equals to 2 and with the latest timestamp.
Data accuracy versus data latency
Let’s go back to our website. Over time, we are getting more and more data. Now is a good moment to think about mining valuable information from our database. We track user interaction with our website and store their activity as events. These events will allow us to build a path that the website visitor has followed. Such a path is called clickstreams or, interchangeably, clickpaths. Having a huge number of active users across the world who visit our portal, we can build very interesting reports. At the end of each day, we can run a batch process to aggregate this data. For example, if you notice that users from a certain region are regularly asking for a vet, it is a clear tip to set up a veterinary clinic.
But what if we want to obtain real-time insight into user interactions? We can’t just query the database over and over again, because that would result in an overload. Instead, we should consume incoming data as it comes to our system. We need to use streaming. The queue that we introduced to our application earlier can be treated as an observable entity — a topic. Having a similar queues for each type of events allows us to set up more data consumers. We can have more than one consumer for a certain topic, each doing a different thing. These pipelines can be very simple, like the one from our first case — which buffers the events and persists them in the database every 60 seconds. We say that data is processed in a window. However, we don’t have to limit ourselves and wait 60 seconds, we can use different time spans.
What if we have numerous consumers attached to each topic each processing hundreds of thousands of events per second? Network bandwidth will become our bottleneck and we will suffer from a data latency. In smaller systems, we make the assumption that the data must be accurate. But in the world of Big Data, perfect data accuracy isn’t actually holy grail. There are situations when data latency is more important. When you aggregate millions of records to calculate an average — do you care if a few records are missing? Would you rather suffer from delays or have results delivered more quickly?
Messaging systems semantics
There is always a trade-off between data accuracy and data latency. Event-driven stream processing engines like Kafka allow us to set up a contract between a producer and a topic.
If we want our system to be as precise as possible and we don’t care about latency, we must ensure that every message is processed exactly once. In this case, the producer sends the same event to the topic until it gets confirmation that the event was written. The topic, on the other hand, must check if the same event hasn’t been written more than once. This adds overhead on both sides, slowing down the system.
Disabling checks on the topic side leads to a situation in which one event is processed at least once. If we don’t care about duplicates in our application (or our application idempotent), this solution will improve throughput and latency of the system. One of the biggest online shops in the world uses this approach. If you are the lucky one, you might receive two packages instead of one. If you are unlucky… you might be charged twice — of course there is a special reimbursement service to deal with such a case. The shop decided to improve user experience and make the system more responsive, sacrificing accuracy. In their case it’s better to pay compensation to, let’s say, every millionth customer, and have more satisfied clients in total.
The aforementioned shop could also make a different choice. They could process messages at least once. In this case, our event producer sends the message and doesn’t check if that message was written. Imagine yourself ordering a Christmas Eve gift for your child. Even if you have paid for it, your order might not be processed further, because the corresponding event was not processed. You will not get the promised package and probably resign from shopping on that website in the future. In this case the previous option was much better.
In this article I have briefly described common issues applications have. In Big Data, it’s not a question of whether or not these problems will arise, but rather when. Because these problems are common, they were already addressed in numerous solutions like Kafka or within the Hadoop stack — for example with HDFS and HBase.
- Big Data — Nathan Marz
- Making sense of NoSQL — Dan McCreary, Ann Kelly
- Hadoop in Practice — Alex Holmes