An RM restart can cause thousands of MapReduce jobs to hang.
Blocker
yes.
1. RM shut-down & restart;
2. AM log: Multiple java.lang.reflect.UndeclaredThrowableException
Caused by: com.google.protobuf.ServiceException: java.io.IOException:
Caused by: java.io.IOException: Failed on local exception: java.io.EOFException;
Caused by: java.io.EOFException
3. RM log: ERROR org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: AppAttemptId doesnt exist in cache appattempt_1374764974894_0001_
000001
Yes.
One user job.
0.23.X --- I used an internal revision “1190121”. See reproduction steps.
Standard configurations.
yarn-site.xml:
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
mapred-site.xml:
<?xml version="1.0"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
First, download the version:
source code. Ref: Hadoop version control system. By default the local dir is trunk.
Next, set $HADOOP_PREFIX to trunk/hadoop-dist/target/hadoop-0.24.0-SNAPSHOT.
Also set $HADOOP_CONF_DIR to $HADOOP_PREFIX/etc/hadoop
$ start-dfs.sh
$ yarn-daemon.sh start resourcemanager
$ yarn-daemon.sh start nodemanager
Put some files into “/input” in the hdfs
$ hadoop jar ~/research/hadoop/hadoop-2.0.2-alpha-src/hadoop-dist/target/hadoop-2.0.2-alpha/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.0.2-alpha.jar wordcount /input /output
--- run word count
ding@baker221:~$ hadoop jar ~/research/hadoop/hadoop-2.0.2-alpha-src/hadoop-dist/target/hadoop-2.0.2-aha/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.0.2-alpha.jar wordcount /input /output1
14/07/04 23:39:26 INFO ipc.YarnRPC: Creating YarnRPC for org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
14/07/04 23:39:26 INFO mapred.ResourceMgrDelegate: Connecting to ResourceManager at /0.0.0.0:8181
14/07/04 23:39:26 INFO ipc.HadoopYarnRPC: Creating a HadoopYarnProtoRpc proxy for protocol interface org.apache.hadoop.yarn.api.ClientRMProtocol
14/07/04 23:39:26 INFO mapred.ResourceMgrDelegate: Connected to ResourceManager at /0.0.0.0:8181
14/07/04 23:39:26 WARN conf.Configuration: fs.default.name is deprecated. Instead, use fs.defaultFS
14/07/04 23:39:26 WARN conf.Configuration: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
14/07/04 23:39:27 INFO input.FileInputFormat: Total input paths to process : 3
14/07/04 23:39:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/07/04 23:39:27 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/04 23:39:27 INFO mapreduce.JobSubmitter: number of splits:3
14/07/04 23:39:27 INFO mapred.YARNRunner: AppMaster capability = memory: 2048,
14/07/04 23:39:27 INFO mapred.YARNRunner: Command to launch container for ApplicationMaster is : $JAVA_HOME/bin/java -Dlog4j.configuration=container-log4j.properties -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR> -Dyarn.app.mapreduce.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Xmx1536m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr
14/07/04 23:39:27 INFO mapred.ResourceMgrDelegate: Submitted application application_1404531530566_0001 to ResourceManager
14/07/04 23:39:27 INFO mapreduce.Job: Running job: job_1404531530566_0001
14/07/04 23:39:28 INFO mapreduce.Job: map 0% reduce 0%
14/07/04 23:39:37 INFO mapred.ClientServiceDelegate: Tracking Url of JOB is 0.0.0.0:8088/proxy/application_1404531530566_0001/
14/07/04 23:39:37 INFO mapred.ClientServiceDelegate: Connecting to localhost:51351
14/07/04 23:39:37 INFO ipc.YarnRPC: Creating YarnRPC for org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
Then run this:
$ yarn-daemon.sh stop resourcemanager right at this moment!!!
This timing is important!
The above timing order is super important. See the graph: this ensures that the RM shuts down when AM succeeds after registering but before sending the heartbeat.
Yes.
Yes. The key event “RM shutting down” is of particular order. How to figure this out automatically?
1 (AM+RM, of course, also requires NN+DN)
Hang. The client log just hang there;
If we look at RM’s log, we have:
2013-07-25 11:11:41,506 ERROR org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: AppAttemptId doesnt exist in cache appattempt_1374764974894_0001_000001
--- This just repeats itself.
If we look at AM’s syslog, earlier we have:
2013-07-25 11:10:29,903 ERROR [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: ERROR IN CONTACTING RM.
java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.yarn.api.impl.pb.client.AMRMProtocolPBClientImpl.allocate(AMRMProtocolPBClientImpl.java:72)
at org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.makeRemoteRequest(RMContainerRequestor.java:126)
at org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator.getResources(RMContainerAllocator.java:436)
at org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator.heartbeat(RMContainerAllocator.java:154)
at org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator$1.run(RMCommunicator.java:236)
at java.lang.Thread.run(Thread.java:662)
Caused by: com.google.protobuf.ServiceException: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: "baker221/127.0.1.
1"; destination host is: ""0.0.0.0":8030;
.. ..
2013-07-25 11:10:32,908 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s).
2013-07-25 11:10:33,909 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 1 time(s).2013-07-25 11:10:34,910 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 2 time(s).
2013-07-25 11:10:35,911 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 3 time(s).
2013-07-25 11:10:36,912 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 4 time(s).
2013-07-25 11:10:37,913 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 5 time(s).
2013-07-25 11:10:38,914 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 6 time(s).
2013-07-25 11:10:39,915 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 7 time(s).
2013-07-25 11:10:40,916 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 8 time(s).
2013-07-25 11:10:41,917 INFO [RMCommunicator Allocator] org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 9 time(s).
2013-07-25 11:10:41,919 ERROR [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: ERROR IN CONTACTING RM.
java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.yarn.api.impl.pb.client.AMRMProtocolPBClientImpl.allocate(AMRMProtocolPBClientImpl.java:72)
at org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.makeRemoteRequest(RMContainerRequestor.java:126)
.. .. ..
Repeats
--- This clearly indicates that there are problems connecting to RM (looking at RM’s log, we know RM is down). So we can infer that before the hang, there were connection exceptions btw RM & AM.
Now, let’s look at the code. First, locate the error msg from RM:
2013-07-25 11:11:41,506 ERROR org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: AppAttemptId doesnt exist in cache appattempt_1374764974894_0001_000001
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnRemoteException {
.. ..
AMResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
LOG.error(message);
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
throw RPCUtil.getRemoteException(message);
--- Clearly, RM will throw an exception back to AM when this occur.
}
This is expected: since RM restarted during the MR job, the App ID is lost in RM’s cache. So RM sends AM an exception, in fact, hoping it to restart!
Let’s next look at how AM handles this exception:
First, while it is hanging, the following is shown in AM’s syslog:
2013-07-25 11:11:41,504 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Before Scheduling: PendingReduces:1 ScheduledMaps:3...
2013-07-25 11:11:41,508 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor: getResources() for application_1374764974894_0001:...
2013-07-25 11:11:41,508 INFO [RMCommunicator Allocator] org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: After Scheduling: PendingReduces:1 ScheduledMaps:3
2013-07-25 11:11:43,508 INFO [RMCommunicator Allocator]
---- These 3 messages just repeat themselves.
If we look at the code, we can see getResources is the one that handles the response from RM:
private List<Container> getResources() throws Exception {
.. ..
AMResponse response = makeRemoteRequest();
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
.. ..
-- It never bothered to handle the exception in “response”!!!
return newContainers;
}
Therefore the error is clear: AM, when received an exception from RM asking AM to reboot, simply chose to ignore the exception --- Forgot to handle it.
AM did not handle the “response.reboot” exception from RM!
Incorrect exception handling.
Add proper error handling:
if (response.getReboot()) {
- // TODO
LOG.info("Event from RM: shutting down Application Master");
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+ this.getContext().getApplicationID());
}
private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
- AMResponse response = makeRemoteRequest();
+ AMResponse response;
+ /*
+ * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+ * milliseconds before aborting. During this interval, AM will still try
+ * to contact the RM.
+ */
+ try {
+ response = makeRemoteRequest();
+ // Reset retry count if no exception occurred.
+ retrystartTime = System.currentTimeMillis();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
+ if (response.getReboot()) {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+ this.getContext().getApplicationID());
+ }
Acknowledgement:
This bug was initially reproduced and studied by Dongcai Shen. Our group has reproduced them independently thereafter and provided additional analysis and findings.