[hadoop]MAPREDUCE-4843 Report

1. Symptom

Sometimes MapReduce jobs fail with the following exception:

13/07/26 16:31:25 INFO mapred.JobClient: Task Id : attempt_201307261629_0001_m_000004_0, Status : FAILED

Error initializing attempt_201307261629_0001_m_000004_0:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find taskTracker/ding/jobcache/job_201307261629_0001/job.xml in any of the configured local directories

        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:429)

        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:160)

        at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1354)

        at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1226)

        at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2603)

        at java.lang.Thread.run(Thread.java:662)

In our cluster, some times job will failed due to below exception:

… …

1.1 Severity

Critical

1.2 Was there exception thrown?

Yes.

1.2.1 Were there multiple exceptions?

Yes. But they are really close to each other. In jobtracker’s log:

2013-07-26 16:31:25,167 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201307261629_0001_m_000004_0: Error initializing attempt_201307261629_0001_m_000004_0:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find taskTracker/ding/jobcache/job_201307261629_0001/job.xml in any of the configured local directories

        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:429)

        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:160)

        at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1354)

        at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1226)

        at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2603)

        at java.lang.Thread.run(Thread.java:662)

2013-07-26 16:31:25,168 ERROR org.apache.hadoop.mapred.TaskStatus: Trying to set finish time for task attempt_201307261629_0001_m_000004_0 when no start time is set, stackTrace is : java.lang.Exception

        at org.apache.hadoop.mapred.TaskStatus.setFinishTime(TaskStatus.java:145)

        at org.apache.hadoop.mapred.TaskInProgress.incompleteSubTask(TaskInProgress.java:700)

        at org.apache.hadoop.mapred.JobInProgress.failedTask(JobInProgress.java:3004)

        at org.apache.hadoop.mapred.JobInProgress.updateTaskStatus(JobInProgress.java:1185)

        at org.apache.hadoop.mapred.JobTracker.updateTaskStatuses(JobTracker.java:4461)

        at org.apache.hadoop.mapred.JobTracker.processHeartbeat(JobTracker.java:3296)

        at org.apache.hadoop.mapred.JobTracker.heartbeat(JobTracker.java:2991)

These exceptions were propagated across jobtracker, tasktracker, and client logs...

2. How to reproduce this failure

This is a concurrency bug, requiring specific timing order btw the two threads in Task tracker. We reproduced this by inserting sleep in the code.

2.0 Version

1.1.1

2.1 Configuration

Standard config.

2.2 Reproduction procedure

1. Modify the src by introducing a sleep:

src/mapred/org/apache/hadoop/mapred/JobLocalizer.java:

public class JobLocalizer {

  .. ..

  public JobLocalizer(JobConf ttConf, String user, String jobid)

      throws IOException {

    .. .. ..

    this.ttConf = ttConf;

    lfs = FileSystem.getLocal(ttConf).getRaw();

    this.localDirs = createPaths(user, localDirs);

    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);

+    if (user.equals("ding")) {

+      LOG.info("Ding: user is ding, about to sleep for 50 seconds. the other job should finish by then...");

+      try{

+        Thread.sleep(50000);

+      } catch (InterruptedException e) {}

+    }

    Collections.shuffle(this.localDirs);

.. ..

2. submit a mapreduce job (wordcount) as user “ding”

3. submit a mapreduce job (e.g., wordcount) as user “gux” immediately after 2.

2.2.1 Timing order

See the graph: the order btw threads must be ensured.

2.2.2 Events order externally controllable?

No. This is concurrency bug.

2.3 Can the logs tell how to reproduce the failure?

No --- the timing cannot be inferred.

2.4 How many machines needed?

1.

3. Diagnosis procedure

3.1 Detailed Symptom (where you start)

Exception:

2013-07-26 16:31:25,167 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201307261629_0001_m_000004_0: Error initializing attempt_201307261629_0001_m_000004_0:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find taskTracker/ding/jobcache/job_201307261629_0001/job.xml in any of the configured local directories

        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:429)

        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:160)

        at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1354)

        at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1226)

        at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2603)

        at java.lang.Thread.run(Thread.java:662)

3.2 Backward inference

TaskTracker.java:

  Path initializeJob(final Task t, final RunningJob rjob,

    .. ..

    //search for the conf that the initializeJob created

    //need to look up certain configs from this conf, like

    //the distributed cache, profiling, etc. ones

    Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(

           userName, jobId.toString()), getJobConf());

    return initializedConf;

  }

 --- NOTE: In the failure case, the task tracker’s thread serving user “ding”’s MR job will throw the exception. This is because another thread set the ttconf.JOB_LOCAL_CTXT to “tasktracker/gux/jobcache...”. However, this call “getLocalPathToRead” is trying to read “tasktracker/ding/jobcache...”.

  static String getLocalJobConfFile(String user, String jobid) {

    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;

  }

 

It is dead here. In fact, it is not hard to recognize that this file:

taskTracker/ding/jobcache/job_201307261629_0001/job.xml”

here:

JobLocalizer.java:

public class JobLocalizer {

  .. ..

  protected static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";

  public JobLocalizer(JobConf ttConf, String user, String jobid) {

    this(ttConf, user, jobid,

        ttConf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));

  }

  public JobLocalizer(JobConf ttConf, String user, String jobid,

      String... localDirs) throws IOException {

    .. ..

    ttConf = new JobConf(ttConf);

    this.localDirs = createPaths(user, localDirs);

     // Creates a new path: taskTracker/ding/jobcache

    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);

     .. ..

  }

}

4. Root cause

Race btw. the two threads in task tracker. The main problem is that they share the same “JobConf” object “ttConf”. The fix is simply make a separate ttConf for each thread.

4.1 Category:

Data race.

5. Fix

5.1 How?

Basically it is to make “ttConf” per thread. So one thread set ttConf.JOB_LOCAL_CTXT won’t affect the other thread.

   public JobLocalizer(JobConf ttConf, String user, String jobid)

       throws IOException {

@@ -108,10 +108,10 @@ public class JobLocalizer {

       throw new IOException("Cannot initialize for null jobid");

     }

     this.jobid = jobid;

-    this.ttConf = ttConf;

-    lfs = FileSystem.getLocal(ttConf).getRaw();

+    this.ttConf = new JobConf(ttConf);

+    lfs = FileSystem.getLocal(this.ttConf).getRaw();

     this.localDirs = createPaths(user, localDirs);

-    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);

+    this.ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);

     Collections.shuffle(this.localDirs);

     lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);

     JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;