Handling Bulk Data in the opd Project
When we first started http://www.openpatentdata.com we already had a lot of experience regarding handling large amounts of data sets inside and outside of the cloud. Nevertheless, the requirements of each project differ, and so we learned a lot this time as well.
Let me first present a draft of the situation.
We committed on using bibliographic data as provided by the EPO in DocDB format. We made this single-source decision because we were already using a multitude of sources in the Deparom Profil project and knew from experience that this always meant a lot of ongoing work due to changes in the format of the sources--which happened more often than we would have liked.
We committed on using image data for the document drawings as provided by the EPO via their OPS interfaces. The advantage of this was that the data is for free and online available; the disadvantage was that it comes slowly, and access is restricted by a fair-use policy. Basically this means we cannot harvest drawing images for all patent documents we provide, so we harvest the images only for the documents which at least one user seems to be interested in. (*This is likely to change soon.)
We committed not to provide any full text of the documents (yet), so all we handle are bibliographic data and drawing images.
Based on these commitments we set up our basic task structure:
Import the DocDB information as soon as they arrive (online via a paid download site).
Harvest the drawing image information on demand when the document becomes interesting due to user interaction.
This blog post will be about importing the DocDB information.
The EPO provides each week several dozens up to hundreds of XML files with new (or amended) bibliographic data of patent documents. These XML files are compressed using zip, then the .zip files are packaged--also using zip--together with some other information like table-of-contents, DTD files, etc. While the XML files vary in sizes of around 100MB at most, the ZIP packages typically have a size of up to 2.2GB. For one week, this sometimes sums up to 6 or 8 GB, the whole year 2012 summed up to around 50GB. And the numbers are increasing from year to year.
Our goal was to store each document's bibliographic data in various AWS DBs (S3, DynamoDB, and CloudSearch) as a single JSON value.
So the task was clear: Extract the information of the DocDB packages and store it into the AWS cloud. Since source (the packages) and target (AWS) were available online, it was feasible to do this in the cloud, using EC2 instances.
In our first approach the EC2 instances working as importers
got their specific task (the URL of the package) from an SQS queue,
unpackaged the outer ZIP files,
unpackaged the inner ZIP files,
converted the document information properly to JSON, and
stored the values in AWS (S3, DynamoDB).
Using this setup, it was easy to have more than one importer instance work on the SQS queue, thus distributing the load.
It turned out to be a problem that a single package could be large. The way SQS queues are designed, they are fit for relatively small tasks which can be handled in at most eight hours. SQS queues are designed for failure; that means they work like this: Workers get their task from the queue and state during getting how long they will need for the task at most. This maximum duration can be at most eight hours. When given to a worker, a task gets invisible in the SQS queue. If and when a worker manages to finish processing its task, it informs the SQS queue about this success, then the (invisible) task is finally permanently removed from the queue. But if the worker (for whatever reason) fails to signal success in time, the invisible task gets visible again, and any other worker might get it as well.
By this concept, a worker who does not finish its task successfully (i. e. who fails) will be backed up by a colleague after the maximum duration. The whole system needs to be designed accordingly; it must tolerate that more than one worker might process the same task, even concurrently.
In our situation, handling one package could take longer than eight hours, at least in some cases. With SQS queues in the standard fashion this was an unsolvable problem because no worker would ever finish the task in time, thus the message on the queue would never be removed from the queue permanently. To solve the problem we understood that we needed to split up the packages.
Since the paid EPO download site would not provide permanent access to the packages anyway, we decided to split the process into two stages: Storing and Importing
The first process in the second approach is called storing. It means to unpackage an existing package, extract all XML files therein (and compress them using gzip because gzip can be streamed) and store them in S3 (as .xml.gzfiles). Even with the largest packages provided by the EPO, this can easily be achieved in less than eight hours. So we were in the parameter range of the SQS queues again.
The second process is called importing. It now only works on single XML files which are downloaded (and uncompressed) from S3. Since we now only have to handle a single XML file and not a package of hundreds of them in one task, the eight hour limit doesn't pose a problem anymore at this step as well.
When importing more than one week, we needed to ensure that all XMLs from one week had to be completed before the next week was started. This is necessary because the EPO might issue corrections (i. e. basically corrected repetitions) of documents. In order to always have the newest version in the DBs, the weeks must be processed in order.
To ensure this, we can use the massively parallel workers only for one week at the time. The XMLs which take longest determine the time used for one week. Only when all are done, the next can be scheduled.
For this, we have a special scheduler facility which creates the tasks for the XMLs of one week for all targets (S3, DynamoDB, CloudSearch, thus three tasks for each XML). Then it waits until these tasks are completed, then it schedules the next week.
We found that S3 is much slower than DynamoDB and CloudSearch. It posed to be the clutch in the system which resulted in each week taking about two to three hours to be imported (depending on size). Without S3, the imports went much faster (in about one hour). And of course, the need to import everything in order only applied to each target for its own. We can start importing week n+1 on DynamoDB if week n has been completed on DynamoDB before; it doesn't matter if week n was completed on S3 beforehand.
There are lots of ways to enhance the system taking this new thought into account. The way we chose was to schedule the slow target by itself, so there is a scheduler for S3 alone, and one for the other two DBs.
Currently we imported just the last three months (from week 2012/45 to the present) into our system; we also have all the data dating back to centuries past. Importing these can be achieved later by walking backwards in time instead of forwards. Instead of blindly overwriting existing data, we then will only store the data if there is not yet anything for the specific document (and otherwise ignore the outdated document).
A large amount of work went into a proper handling of all the unpredicted situations like errors occurring during parsing of the XML sources, converting it to our JSON version, etc. The normal way of handling errors (raising an exception which leads to an abort of the program with a proper stack trace to easy finding the error) is not feasible when handling millions of documents on a farm of workers. Simple aborts would lead to worker instances idling around (producing cost but nothing else) and not marking a task as done on the corresponding SQS queue. After its timeout, such a task would be re-given to another worker; if the error was reproducible, it would eventually abort all workers, thus effectively stopping the whole show.
Exceptions can be handled and traces dumped to avoid aborts. A simple try/except in the body of the specific loop does this. It is normally not a tricky task; you just dump the current stack trace into a log file and add (hopefully) enough context information to give the developer a clue what might have raised the problem or at least to allow a reproduction of the problem in a debug environment. But we handle bulk data in gigabyte amounts and expect to look at the collected problems only after millions of documents have been processed.
It took us a while to set up our error handling mechanisms in a way so that even after days of produced log files on dozens of instances we could still figure out which input file at which task was the trigger for the exception. The main task in this was to reduce the size of the log files to the necessary minimum (to avoid having gigabytes of log files to analyze) without losing vital information.
In the end, importing ≈8 million documents produced less than 50 MB of logs with only around 165 logged errors, most of them network connection problems.