MapReduce jobs hang in reduce phase.
Blocker.
No.
No.
Since MR job.
We didn’t reproduce this failure. I only simulated the inmemory merge.
2.0.2-alpha
In mapred-site.xml:
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.map.log.level</name>
<value>DEBUG</value>
</property>
<property>
<name>mapreduce.reduce.log.level</name>
<value>DEBUG</value>
</property>
<property>
<name>mapreduce.reduce.merge.inmem.threshold</name>
<value>1</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.005</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.memory.limit.percent</name>
<value>0.004</value>
</property>
Start a wordcount job on a large input.
There is particular thread interleaving order requirement (see the graph above): the in-memory merger thread should finish after all the other fetcher threads.
No.
Yes.
1 machine. 1 node (Application Manager).
We couldn’t reproduce this failure, so we can only analyze the logs provided by the reporter.
2013-07-18 04:32:57,900 INFO fetcher#4 org.apache.hadoop.mapreduce.task.reduce.Fetcher: fetcher#4 - MergerManager returned Status.WAIT ...
.. .. ..
2013-07-18 04:32:57,904 INFO fetcher#3 org.apache.hadoop.mapreduce.task.reduce.Fetcher: fetcher#3 - MergerManager returned Status.WAIT …
By looking at the code that prints this log, we can infer:
MergerManager.java:MergerManager.reserve() {
if (usedMemory > memoryLimit) {
LOG.info(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return stallShuffle;
}
}
Fetcher.java:Fetcher.copyMapOutput(.. ..) {
if (mapOutput.getType() == Type.WAIT) {
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
}
So we know that the cause of this msg is usedMemory is too large. We also have the following in the log:
2013-07-28 21:10:14,875 INFO [InMemoryMerger - Thread to merge in-memory shuffled map-outputs] org.apache.hadoop.mapreduce.task.reduce.MergeManager: Initiating in-memory merge with 2 segments...
2013-07-28 21:10:14,879 INFO [InMemoryMerger - Thread to merge in-memory shuffled map-outputs] org.apache.hadoop.mapred.Merger: Merging 2 sorted segments
MergerManager.java:MergeManager.closeInmemoryFile
if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
commitMemory + " > mergeThreshold=" + mergeThreshold +
". Current usedMemory=" + usedMemory);
2013-07-28 21:10:14,797 INFO [fetcher#5] org.apache.hadoop.mapreduce.task.reduce.MergeManager: Starting inMemoryMerger's merge since commitMemory=733845 > mergeThreshold=703496. Current usedMemory=733845
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
inMemoryMerger.startMerge(inMemoryMapOutputs);
---- This will start the in memory merger.
}
We can also infer that in the failure case, the in-memory merger dragged long (the following msg appeared very late):
2013-07-28 21:10:14,880 INFO [InMemoryMerger - Thread to merge in-memory shuffled map-outputs] org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 733828 bytes
At the end of the In-memory merge, they should have check for inputs again and finish all the remaining merge jobs, instead of relying on fetchers to start the merging again.
https://issues.apache.org/jira/secure/attachment/12555933/MAPREDUCE-4842.patch
Concurrency (semantic).
At the end of the In-memory merge, they should have check for inputs again and finish all the remaining merge jobs, instead of relying on fetchers to start the merging again.
https://issues.apache.org/jira/secure/attachment/12555933/MAPREDUCE-4842.patch