TODO: write Intro
TODO: Move the part about tiny files to here TODO: See if other parts of the Hadoop tuning or internals chapters should move here or v/v
3a: Hadoop Internals and Performance: Just Enough for Now . the HDFS at moderate detail . ??? job orchestration/lifecycle at light detail - how a job is born
The harsh realities of the laws of physics and economics prevent traditional data analysis solutions such as relational databases, supercomputing and so forth from economically scaling to arbitrary-sized data. By comparison, Hadoop’s Map/Reduce paradigm does not provide complex operations, modification of existing records, fine-grain control over how the data is distributed or anything else beyond the ability to write programs that adhere to a single, tightly-constrained template. If Hadoop were a publishing medium, it would be one that refused essays, novels, sonnets and all other literary forms beyond the haiku:
data flutters by elephants make sturdy piles context yields insight
Our Map/Reduce haiku illustrates Hadoop’s template:
-
The Mapper portion of your script processes records, attaching a label to each.
-
Hadoop assembles those records into context groups according to their label.
-
The Reducer portion of your script processes those context groups and writes them to a data store or external system.
What is remarkable is that from this single primitive, we can construct the familiar relational operations (such as `GROUP`s and `ROLLUP`s) of traditional databases, many machine-learning algorithms, matrix and graph transformations and the rest of the advanced data analytics toolkit. In the next two chapters, we will demonstrate high-level relational operations and illustrate the Map/Reduce patterns they express. In order to understand the performance and reasoning behind those patterns, let’s first understand the motion of data within a Map/Reduce job.
There are enough knobs and twiddles on a hadoop installation to fully stock the cockpit of a 747. Many of them interact surprisingly, and many settings improve some types of jobs while impeding others. This chapter will help you determin
-
Baseline constraints of system components: CPU, disk, memory, network
-
Baseline constraints of Atomic operations: stream, join, sort, filter
-
Baseline constraints of each stage: setup, read, mapper, spill/combine, midflight, shuffle, reducer, write, replicate, commit.
Hadoop is designed to put its limiting resource at full utilization.
best-case scenario:
-
all-local mapper tasks
-
mapper throughput at baseline rate
-
low mapper setup overhead
-
one mapper spill per record
-
low variance in mapper finish time
-
shuffle is largely complete when last merge segments come in
-
reducer throughput at baseline rate
-
low replication overhead
-
A happy Mapper spends most of its time processing data..
-
Happy Mappers all finish at the same time.
-
A happy Mapper uses local data.
-
A happy Mapper does not have unnecessary spills.
-
A happy Mapper has enough Java memory heap to avoid costly garbage collection.
-
A happy Mapper has a justifiable data rate.
-
A happy Reducer finishes its merge/sort in reasonable time.
-
A happy Reducer has enough Java memory heap to avoid costly garbage collection.
-
Happy Hadoop processes of all kinds do not keep customers waiting in line.
-
Happy Hadoop programmers have all the logs they need.
-
A happy Hadoop cluster should never over tax its name nodes, job tracker, datanodes or task tracker.
Throughout the book, we have emphasized using Hadoop from the outside. In almost all cases, the benefits to performance of fine-grained configuration changes do not exceed the opportunity cost of your time and the impact on maintainability and production confidence. It is now time to arm you with the full tool set necessary to optimize Hadoop clusters and your jobs.
The overall strategy we recommend is as follows:
-
First, most importantly and hopefully last: Optimize the programmer not the program. Write simple jobs that are easy to read and easy to reason about. If your job’s throughput per task slot compares reasonably to the baseline throughput of your cluster, there’s nothing fruitful to optimize. The previous chapter (REF) on Tuning for the Wise and Lazy prescribed a process for determining that baseline and identifying bottlenecks.
-
The best way to address a costly bottleneck is to minimize the amount of data Hadoop moves around. Almost nothing that follows will have as large an impact on performance as, say, filtering nulls before a JOIN. We have spent most of the book equipping you with techniques and best practices for making your job efficient from the outside.
-
When it is time to get hard-core, you will next want to start all the way from the inside and work your way out; this chapter will take you from the fine-grained details of how Hadoop manages memory, disk and network resources to its outward effects on machine and cluster. While we can break the system into largely decoupled parts, there are a significant number of complex interactions, so only engage this phase if you have time for careful trials.
-
Finally, there are cases where nothing but a comprehensive understanding of the system will do. We have adapted an approach called the "USCE" method made popular for low-level systems analysts by Joyent’s Brendan Gregg. (FOOTNOTE: Brendan’s original method does not include connectivity, so is called the "USE" method; putting the "C" for connectivity as "USCE" lets us still pronounce it like the word "use.") Start by enumerating every significant resource within the system — internal memory buffers, disks, thread pools and a few dozen others. Then, for each resource, characterize its utilization (how busy it is), saturation (demand versus capacity), connectivity (influence of interacting resources) and errors. It is hard work but straightforward and leaves nowhere for problems to hide.
As you learned in the previous chapter, and have surely experienced many times, the critical bottleneck of most costly jobs is the group-sort phase that must complete before your Reducer can run. We will assume you have done everything you can to minimize the amount of data coming into your Reducers and minimize skew that would unfairly burden some Reducers. We also assume you cannot solve your problem by simply commissioning more machines. Your poor little Reducers will have to handle the amount of data you are sending to it, so it is now our job to help them do so as efficiently as possible.
Although the group-sort phase cannot complete until all Map tasks have succeeded, you will notice the Reduce tasks start partway through the Map phase, as set by the slow start (FIX) parameter; a value of 0.3 causes it to launch when the Map phase has processed 30-percent of its input data. This lets the Reducers perform the group-sort in parallel with the Mappers for a significant improvement in pipeline efficiency. Setting a slow start parameter (FIX) too high leaves you doing the same amount of work in the Reducers but unnecessarily delaying it past the Map phase completion. Setting the value too low is unfair to your friend, colleagues. Cases where the progress estimate is particularly unreliable may justify reducing this parameter to 0.
If you look at the Reducer log files (TODO: find path), you will start seeing lines that read (TODO: find log message for merge during the shuffle). These should be happening throughout the time the Map phase is executing and with the amount of data you would expect for the parameters you have just set.
A Reducer receives its portion of fully-sorted midstream data, in parallel, as each Mapper successfully completes. We have been using the phrase "group-sort" to describe the full process by which the Reducer receives, sorts and groups its midstream data. For internal reasons, Hadoop describes the group-sort as being two distinct phases: the shuffle phase, where it receives Mapper output data and opportunistically group-sorts it and the sort phase, where it completes all remaining group-sort passes. (TODO: make sure you’re calling it a group-sort and not a merge/sort throughout this phase).
As data streams in from the Map tasks, the Reducer sorts the set of incoming records into the shuffle input buffer, very similarly to what you are familiar with in the Mapper. The size of this buffer is not only one of the most important Hadoop settings. It is also, in the 1.0 branch, widely and dangerously misunderstood.
As soon as the shuffle input buffer exceeds its threshold, the Reducer flushes its data to disk, resulting in a number of spilled merge chunks. After some number of spills have occurred (TODO: what is that number?), a parallel thread within the Reducer will begin merge/sorting the chunks. (TODO: keep this as merge/sorting, don’t change to group-sorting in this paragraph) For good reason, each merge/sort pass can combine only a limited number of chunks, so fully sorting large amounts of midstream data will require multiple passes.
Let’s sketch a picture of what a Reducer configured to spill 1.5 GB chunks and merge them 10 at a time might do during its shuffle and sort phases. For a job with 20 GB of midstream data per Reducer, the first merge pass would initiate soon after 10 chunks, totalling 15 GB, had been spilled. (TODO: what triggers the merge?) As this proceeds, however, the final 5 GB of midstream data continue to roll in — so when the first merge pass concludes, there are 51 segments on disk. Since the number of chunks is less than the threshold of 10, the final merge/sort can begin. Hadoop cleverly avoids writing the merged data back to disk as a single spill by feeding the merged data directly to the merge task. That means that in the shuffle and sort phases, Hadoop has written 35 GB to disk (20 GB as initial spills and 15 GB again as merge/sort output), and read 35 GB from disk (15 as merge/sort input, and 20 as reducer input).
If this job, instead, sent 200 GB of data to each Reducer, we will simplistically assume the first pass created 13 merged chunks of 15 GB each and four small merged chunks for the remaining 5 GB for a total of 17 chunks. (TODO: does it try to only merge files with similar sizes?) This means it has to do one more merge pass on 10 of the files (TODO: or 8?), producing a second-pass merge chunk of 95 GB and seven remaining first-pass merge chunks of 15 GB. In this case, the total amount of data written and read was 490 GB of data.
With 20 GB of data, the first merge pass has a 50-GB head start on Map phase completion, so should not continue for terribly long after the final Mapper output is received. The merge/sort overhead is 75-percent: an excess 15 GB above the 20 it received. In the 200 GB case, that overhead is almost 150-percent; most of the initial merge passes and all of the second-pass merge will extend past the completion of the Map phase. If this cluster’s baseline throughput is (TODO: find an actual baseline number), processing the extra 15 GB of data took about ?15 minutes? while processing 290 GB of data took ?5 hours?. The additional merge activity is enormously costly.
As you can see, the number of merge passes is governed by the size of the shuffle spills and the number of parallel merges. The larger their product, the fewer merge passes, so all other things being equal, we would like to set them as high as possible.
The I/O sort factor variable sets the maximum number of merge segments that can be combined. For production clusters, it should be set higher than the default of 10 but you cannot increase it too much. The number of parallel streams the OS can provide is limited to some modest multiple of the number of disks that can be read in parallel. (TODO: find out what the limits are on this in better detail). Setting this too low will choke parallelism and lead to excess merge/sort passes. Further increase above the parallelism the OS can reasonably supply has no effect; setting it too high will incur increasing bookkeeping costs and eventually exhaust system limit. (TODO: ? recommended value ?)
The size of the buffer is set by mapred.job.shuffle.input.buffer.percent
, as a fraction of the total Reducer heap size. For example, with a 1.5 GB Reducer heap, the default value of 0.66 will yield a 1 GB shuffle input buffer. In the 1.0 branch of Hadoop, this setting, unfortunately, does not work as advertised; instead, that fraction is applied to the lesser of the Java heap size and 2 GB. This means a Reducer with 2.0 GB heap gets a 1.3 GB shuffle input buffer — and so does a Reducer of 4, 6, 8 or anything larger!!
The final size of the spill is limited to the heap size times the shuffle input percent times a further parameter (TODO: look up name) that initiates the spill to disk before the buffer is full. If that buffer fills, the threads receiving data from the Mappers will have to wait, slowing things down globally. On the other hand, flushing too early leads to smaller initial spills, so more merge/sort passes. We would leave this one where it is.
-
Map tasks should take longer to run than to start. If mappers finish in less than a minute or two, and you have control over how the input data is allocated, try to feed each more data. In general, 128MB is sufficient; we set our HDFS block size to that value.
Assuming well-fed mappers, you would like every mapper to finish at roughly the same time. The reduce cannot start until all mappers have finished. Why would different mappers take different amounts of time?
-
large variation in file size
-
large variation in load — for example, if the distribution of reducers is uneven, the machines with multiple reducers will run more slowly in general
-
on a large cluster, long-running map tasks will expose which machines are slowest.
Assuming mappers are well fed and prompt, you would like to have nearly every mapper running a job.
-
Assuming every mapper is well fed and every mapper is running a job,
Pig can use the combine splits setting to make this intelligently faster. Watch out for weirdness with newer versions of pig and older versions of HBase.
If you’re reading from S3, dial up the min split size as large as 1-2 GB (but not
The default settings are those that satisfy in some mixture the constituencies of a) Yahoo, Facebook, Twitter, etc; and b) Hadoop developers, ie. peopl who write Hadoop but rarely use Hadoop. This means that many low-stakes settings (like keeping jobs stats around for more than a few hours) are at the values that make sense when you have a petabyte-scale cluster and a hundred data engineers;
-
If you’re going to run two master nodes, you’re a bit better off running one master as (namenode only) and the other master as (jobtracker, 2NN, balancer) — the 2NN should be distinctly less utilized than the namenode. This isn’t a big deal, as I assume your master nodes never really break a sweat even during heavy usage.
Raw ingredients:
-
scripts:
-
faker -- generates apache weblog records deterministically. Ensure the timestamp will require no complicated parsing to use in Pig.
-
swallow-m -- mapper emits 0.01% (one out of every 10_000) records, no reduce
-
most-m -- mapper emits 99.99% of records, no reduce
-
swallow-r -- mapper emits all records, reducer emits 0.01% of records
-
most-r -- mapper emits all records, reducer emits 99.99% of records
-
url-group -- project only URL and timestamp fields; group on URL; emit URL and bag of timestamps that URL was visited
-
-
files:
-
oneline -- 512 files, each with only its index
-
fakered -- faker.rb-generated 64-GB dataset as 1 64-GB file, 8 8-GB, 64 1-GB, 512 128-MB files. Re-running faker script will recreate fakered dataset.
-
-
setups:
-
max-shuffle -- workers with only reduce-slots, having max-sized shuffle buffers, no shuffle flush (i.e as close as we can get to zero shuffle)
-
baseline — large output block size, replication factor 1
-
-
setup
-
zeros -- mapper-only -- swallow
-
oneline -- mapred -- identity
-
-
read
-
fakered-128 -- mapper-only -- emit nothing
-
-
mapper
-
fakered-128 -- mapper-only -- split fields, regexp, but don’t emit
-
fakered-128 -- mapper-only -- split fields, regexp, emit
-
oneline -- mapper-only -- faker
-
-
spill/combine
-
fakered-128 -- mapred -- identity
-
oneline -- mapred -- faker
-
-
midflight:
-
xx -- free-shuffle -- swallow
-
-
shuffle; with various sizes of data per reducer
-
fakered -- lo-skew -- swallow
-
fakered -- hi-skew -- swallow
-
-
reducer
-
fakered -- mapred -- identity -- identity -- replication factor 1
-
oneline -- mapred -- identity -- faker -- replication factor 1
-
fakered -- mapred -- identity -- split fields, regexp, but don’t emit -- replication factor 1
-
fakered -- mapred -- identity -- split fields, regexp, emit -- replication factor 1
-
-
write
-
oneline -- mapred -- identity -- faker
-
-
replicate
-
oneline -- mapred -- identity -- faker -- replication factor 1
-
oneline -- mapred -- identity -- faker -- replication factor 2
-
oneline -- mapred -- identity -- faker -- replication factor 3
-
oneline -- mapred -- identity -- faker -- replication factor 5
-
-
commit
-
oneline -- mapred -- identity -- identity
-
oneline -- mapred -- identity -- swallow
-
mapper-only performance
disk-cpu-disk only
-
FOREACH only
-
FILTER on a numeric column only
-
MATCH only
-
decompose region into tiles
midflight
When tuning, you should engage in active benchmarking. Passive benchmarking would be to start a large job run, time it on the wall clock (plus some other global measures) and call that a number. Active benchmarking means that while that job is running you watch the fine-grained metrics (following the [use_method]) — validate that the limiting resource is what you believe it to be, and understand how the parameters you are varying drive tradeoffs among other resources.
-
What are the maximum practical capabilities of my system, and are they reasonable?
-
How do I determine a job’s primary constraint, and whether it’s worthwhile to optimize it?
-
If I must to optimize a job, what setting adjustments are relevant, and what are the tradeoffs those adjustments?
Coarsely speaking, jobs are constrained by one of these four capabilities:
-
RAM: Available memory per node,
-
Disk IO: Disk throughput,
-
Network IO: Network throughput, and
-
CPU: Computational throughput.
Your job is to
-
Recognize when your job significantly underperforms the practical expected throughput, and if so, whether you should worry about it. If your job’s throughput on a small cluster is within a factor of two of a job that does nothing, it’s not worth tuning. If that job runs nightly and costs $1000 per run, it is.
-
Identify the limiting capability.
-
Ensure there’s enough RAM. If there isn’t, you can adjust your the memory per machine, the number of machines, or your algorithm design.
-
Not get in Hadoop’s way. There are a few easily-remedied things to watch for that will significantly hamper throughput by causing unneccesary disk writes or network traffic.
-
When reasonable, adjust the RAM/IO/CPU tradeoffs. For example, with plenty of RAM and not too much data, increasing the size of certain buffers can greatly reduce the number of disk writes: you’ve traded RAM for Disk IO.
If you are running Hadoop in an elastic environment, life gets easy: you can tune your cluster to the job, not the other way around.
-
Choose the number of mappers and reducers
-
To make best use of your CPUs, you want the number of running tasks to be at least
cores-1
; as long as there’s enough ram, go as high as mappers =cores * 3/4
and reducers =cores * 1/2
. For a cluster purpose-built to run jobs with minimal reduce tasks, run as many mappers as cores. -
The total heap allocated to the datanode, tasktracker, mappers and reducers should be less than but close to the size of RAM on the machine.
-
The mappers should get at least twice as much total ram as your typical mapper output size (which is to say, at least twice as much ram as your HDFS block size).
-
The more memory on your reducers the better. If at all possible, size your cluster to at least half as much RAM as your reduce input data size.
-
-
Get the job working locally on a reduced dataset
-
for a wukong job, you don’t even need hadoop; use
cat
and pipes.
-
-
Profile its run time on a small cluster
For data that will be read much more often than it’s written,
-
Produce output files of 1-4 GB with a block size of 128MB
-
if there’s an obvious join key, do a total sort. This lets you do a merge join later.
-
-
Map tasks should take longer to run than to start. If mappers finish in less than a minute or two, and you have control over how the input data is allocated, try to feed each more data. In general, 128MB is sufficient; we set our HDFS block size to that value.
Assuming well-fed mappers, you would like every mapper to finish at roughly the same time. The reduce cannot start until all mappers have finished. Why would different mappers take different amounts of time?
-
large variation in file size
-
large variation in load — for example, if the distribution of reducers is uneven, the machines with multiple reducers will run more slowly in general
-
on a large cluster, long-running map tasks will expose which machines are slowest.
Assuming mappers are well fed and prompt, you would like to have nearly every mapper running a job.
-
Assuming every mapper is well fed and every mapper is running a job,
Pig can use the combine splits setting to make this intelligently faster. Watch out for weirdness with newer versions of pig and older versions of HBase.
If you’re reading from S3, dial up the min split size as large as 1-2 GB (but not
Any time you can turn a job with a reduce phase into a job without one (for instance, by using a HashMap replication join), do so. Any reduce phase involves shipping the full midstream data size over the network.
A Happy Reducer is well-balanced, has few merge passes, has good RAM/data ratio, and a justifiable data rate
-
well-balanced:
All of the below use our data-science friendly configuration parameters. It also only concerns jobs worth thinking about — more than a few dozen gigabytes.
-
What’s my map input size?
-
the min split size, file size and block size set the size of the map input.
-
a 128MB block size is a nice compromise between wasted space and map efficiency, and is the typical map input size.
-
you’d like your map tasks to take at least one minute, but not be the dominant time of the job. If all your map slots are full it’s OK if they take longer.
-
-
It’s usually straightforward to estimate the pessimistic-case output size. For cluster defaults, let’s use a 25% overhead — 160 MB output size.
-
15% (
io.sort.record.percent
) of the buffer is taken by record-keeping, so the 160MB should fit in 190 MB (at 15%), 170 MB (at 5%).
The maximum number of records collected before the collection thread will spill is r * x * q * 2^16
if your reduce task itself doesn’t need ram (eg for wukong jobs), set this to more like 0.7.
You’d like the "File bytes read" / "File bytes written" to be nil, and the spilled records close to zero. You don’t want to see spilled records >> reduce input records — this means the reducers had to do multiple layers of merge sort.
an m1.large: - 3 map tasks 300 MB raw input, 340 MB raw output (150 MB compressed), in 2 min - 1 GB in, 1 GB out (450 MB compressed) - 2 reduce tasks 700 MB in, 1.7 GB out, 50% spill - 1.5GB in, 3.5 GB out, 4 mins.
an m2.2xlarge: - 5 map tasks, each 460 MB raw input, 566 MB raw output (260 MB compressed) 1.5 min - 2.3 GB in, 2.8 GB out (1.3 GB compressed) → 2 GB / m2.2xl*min
-
overall 50 GB in, 53 GB out, 12.5 min * 6 m2.2xl = $1.12
-
for 1 TB, ~ 30 m2.2xl 50 min
[reducer_size] ==== Merge Sort Input Buffers ====
In pre-2.0 Hadoop (the version most commonly found at time of writing in 2012), there’s a hard limit of 2 GB in the buffers used for merge sorting of mapper outputs footnote[it’s even worse than that, actually; see mapred.job.shuffle.input.buffer.percent
in the tuning-for-the-foolish chapter.]. You want to make good use of those buffers, but
What we need here is a ready-reckoner for calculating the real costs of processing. We’ll measure two primary metrics:
-
throughput, in
GB/min
. -
machine cost in
$/TB
— equal to(number of nodes) * (cost per node hour) / (60 * throughput)
. This figure accounts for tradeoffs such as spinning up twice as many nodes versus using nodes with twice as much RAM. To be concrete, we’ll use the 2012 Amazon AWS node pricing; later in this chapter we’ll show how to make a comparable estimate for physical hardware.
If your cluster has a fixed capacity, throughput has a fixed proportion to cost and to engineer time. For an on-demand cluster, you should
note: I may go with min/TB, to have them be directly comparable. Throughput is typically rendered as quantity/time, so min/TB will seem weird to some. However, min/TB varies directly with $/TB, and is slightly easier to use for a rough calculation in your head.
-
Measure disk throughput by using the
cp
(copy) command to copy a large file from one disk to another on the same machine, compressed and uncompressed. -
Measure network throughput by using
nc
(netcat) andscp
(ssh copy) to copy a large file across the network, compressed and uncompressed. -
Do some increasingly expensive computations to see where CPU begins to dominate IO.
-
Get a rough understanding of how much RAM you should reserve for the operating system’s caches and buffers, and other overhead — it’s more than you think.
-
Bonnie for disk; advice, more advice
-
Bonnie++ for disk
-
Phoronix for a broad-based test
Test these with a file size equal to your HDFS block size.
-
Understand the practical maximum throughput baseline performance against the fundamental limits of the system
-
If your runtime departs significantly from the practical maximum throughput
Tuning your cluster to your job makes life simple * If you are hitting a hard constraint (typically, not enough RAM)
There are some things that should grow square-root-ishly as the size of the cluster — handler counts, some buffer sizes, and others.
Let’s think about the datanode handler count. Suppose you double the size of your cluster — double the datanodes and double the tasktrackers. Now the cluster has twice as many customers for datanodes (2x the peer traffic from datanodes and 2x the tasktrackers requesting data), but the cluster also has twice as many datanodes to service those customers. So the average number of customers per datanode has not changed. However, the number of workers that might gang up on one datanode simultaneously has increased; roughly speaking, this kind of variance increases as the square root, so it would be reasonable to increase that handler count by 1.4 (the square root of 2). Any time you have a setting that a) is sized to accommodate the peak number of inbound activity, and b) the count of producers and consumers grows in tandem, you’re thinking about a square root.
That is, however, from intra-cluster traffic. By contrast, flume connections are long-lived, and so you should account for them as some portion of the datanode handler count — each agent will be connected to one datanode at a time (as directed by the namenode for that particular block at th). Doubling the number of flume writers should double that portion; doubling the number of datanodes should halve that portion.
see -Dpig.exec.nocombiner=true
if using combiners badly. (You’ll want to use this for a rollup job).
-
Lots of files:
-
Namenode and 2NN heap size
-
-
Lots of data:
-
Datanode heap size.
-
-
Lots of map tasks per job:
-
Jobtracker heap size
-
tasktracker.http.threads
-
mapred.reduce.parallel.copies
-
Tuning and coupling constants the example GC says look at what it constraints is and look at the natural time scale of the system for instance you can turn on data into time using throughput so to think about the palm case of the reducer there’s trade-off between Emery just fine bio for network