More on this book
Community
Kindle Notes & Highlights
Read between
August 2 - December 28, 2020
Another common use for grouping is collating all the activity events for a particular user session, in order to find out the sequence of actions that the user took — a process called sessionization
If you have multiple web servers handling user requests, the activity events for a particular user are most likely scattered across various different servers’ log files. You can implement sessionization by using a session cookie, user ID, or similar identifier as the grouping key
people, but a small number of celebrities may have many millions of followers. Such disproportionately active database records are known as linchpin objects [38] or hot keys.
Collecting all activity related to a celebrity (e.g., replies to something they posted) in a single reducer can lead to significant skew (also known as hot spots) — that is, one reducer that must process significantly more records than the others (see
For example, the skewed join method in Pig first runs a sampling job to determine which keys are hot
For the other input to the join, records relating to the hot key need to be replicated to all reducers handling that key [40].
This technique spreads the work of handling the hot key over several reducers, which allows it to be parallelized better, at the cost of having to replicate the other join input to multiple reducers.
sharded join method in Crunch is similar, but requires the hot keys to be specified explicitly
Hive’s skewed join optimization takes an alternative approach. It requires hot keys to be specified explicitly in the table metadata, and it stores records related to those keys in separate files from the rest.
When grouping records by a hot key and aggregating them, you can perform the grouping in two stages.
The join algorithms described in the last section perform the actual join logic in the reducers, and are hence known as reduce-side joins.
The reduce-side approach has the advantage that you do not need to make any assumptions about the input data:
However, the downside is that all that sorting, copying to reducers, and merging of reducer inputs can be quite expensive.
buffers, data may be written to disk several times as it passes through t...
This highlight has been truncated due to consecutive passage length restrictions.
if you can make certain assumptions about your input data, it is possible to make joins faster by usi...
This highlight has been truncated due to consecutive passage length restrictions.
The simplest way of performing a map-side join applies in the case where a large dataset is joined with a small dataset. In particular, the small dataset needs to be small enough that it can be loaded entirely into memory in each of the mappers.
This simple but effective algorithm is called a broadcast hash join: the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input
an alternative is to store the small join input in a read-only index on the local disk [42]. The frequently used parts of this index will remain in the operating system’s page cache, so this approach can provide random-access lookups almost as fast as an in-memory hash table, but without actually requiring the dataset to fit in memory.
In the case of Figure 10-2, you might arrange for the activity events and the user database to each be partitioned based on the last decimal digit of the user ID
This has the advantage that each mapper can load a smaller amount of data into its hash table.
Another variant of a map-side join applies if the input datasets are not only partitioned in the same way, but also sorted based on the same key.
If a map-side merge join is possible, it probably means that prior MapReduce jobs brought the input datasets into this partitioned and sorted form in the first place. In principle, this join could have been performed in the reduce stage of the prior job.
jobs, the choice of map-side or reduce-side join affects the structure of the output. The output of a reduce-side join is partitioned and sorted by the join key, whereas the output of a map-side join is partitioned and sorted in the same way as the large input
Where does batch processing fit in? It is not transaction processing, nor is it analytics.
Building search indexes
Google’s original use of MapReduce was to build indexes for its search engine, which was implemented as a workflow of 5 to 10 MapReduce jobs [1]. Although Google later moved away from using MapReduce for this purpose [43],
If you need to perform a full-text search over a fixed set of documents, then a batch process is a very effective way of building the indexes:
If the indexed set of documents changes, one option is to periodically rerun the entire indexing workflow
Alternatively, it is possible to build indexes incrementally.
Another common use for batch processing is to build machine learning systems such as classifiers (e.g., spam filters, anomaly detection, image recognition) and recommendation systems (e.g., people you may know, products you may be interested in, or related searches
The output of those batch jobs is often some kind of database: for example, a database that can be queried by user ID to obtain suggested friends for that user, or a database that can be queried by product ID to get a list of related products
performance for queries is likely to suffer. This can in turn cause operational problems in other parts of the system
Normally, MapReduce provides a clean all-or-nothing guarantee for job output: if a job succeeds, the result is the output of running every task exactly once, even if some tasks failed
much better solution is to build a brand-new database inside the batch job and write it as files to the job’s output directory in the distributed filesystem, just like the search indexes in the last section. Those data files are then immutable once written, and can be loaded in bulk into servers that handle read-only queries.
If you introduce a bug into the code and the output is wrong or corrupted, you can simply roll back to a previous version of the code and rerun the job, and the output will be correct again. Or,
This principle of minimizing irreversibility is beneficial for Agile software development
Databases require you to structure data according to a particular model (e.g., relational or documents), whereas files in a distributed filesystem are just byte sequences,
To put it bluntly, Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further
MPP databases typically require careful up-front modeling of the data and query patterns before importing the data into the database’s proprietary storage format.
Thus, Hadoop has often been used for implementing ETL processes (see “Data Warehousing”): data from transaction processing systems is dumped into the distributed filesystem in some raw form, and then MapReduce jobs are written to clean up that data, transform it into a relational form, and import it into an MPP data warehouse for analytic purposes.
Data modeling still happens, but it is in a separate step, decoupled from the data collection.
On the other hand, not all kinds of processing can be sensibly expressed as SQL queries. For example, if you are building machine learning and recommendation systems, or full-text search indexes with relevance ranking models, or performing image analysis, you most likely need a more general model of data processing.
SQL and MapReduce, was not enough: even more different models were needed! And due to the openness of the Hadoop platform, it was feasible to implement a whole range of approaches, which would not have been possible within the confines of a monolithic MPP database
The Hadoop ecosystem includes both random-access OLTP databases such as HBase
Neither HBase nor Impala uses MapReduce, but both use HDFS for storage.
On the other hand, MapReduce can tolerate the failure of a map or reduce task without it affecting the job as a whole by retrying work at the granularity of an individual task.
The MapReduce approach is more appropriate for larger jobs: jobs that process so much data and run for such a long time that they are likely to experience at least one task failure along the way.
Every task has a resource allocation (CPU cores, RAM, disk space, etc.) that is enforced using containers. Every task also has a priority, and if a higher-priority task needs more resources, lower-priority tasks on the same machine can be terminated (preempted) in order to free up resources.
resources. Batch jobs effectively “pick up the scraps under the table,” using any computing resources that remain after the high-priority processes have taken what they need.
freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster.