HDFS-3875 Report

1. Symptom

A faulty DN can mistakenly kick out a correct DN from the replication pipeline.

When a  datanode (replication factor > 1) detects a checksum error, it will be immediately kicked out of the pipeline (since both NN and client will think it’s corrupted). This behavior is problematic when the node that detected the checksum error was not the faulty node: the checksum it received from the client/other DN was corrupted.

In the user’s scenario:

“We saw this issue with one block in a large test cluster. The client is storing the data with replication level 2, and we saw the following:

Note: another production failure is of the same root-cause:

https://issues.apache.org/jira/browse/HDFS-1595

Suppose a source datanode S is writing to a destination datanode D in a write pipeline. We have an implicit assumption that if S catches an exception when it is writing to D, then D is faulty and S is fine. As a result, DFSClient will take out D from the pipeline, reconstruct the write pipeline with the remaining datanodes and then continue writing .

However, we find a case that the faulty machine F is indeed S but not D. In the case we found, F has a faulty network interface (or a faulty switch port) in such a way that the faulty network interface works fine when transferring a small amount of data, say 1MB, but it often fails when transferring a large amount of data, say 100MB.

It is even worst if F is the first datanode in the pipeline. Consider the following:

  1. DFSClient creates a pipeline with three datanodes. The first datanode is F.
  2. F catches an IOException when writing to the second datanode. Then, F reports the second datanode has error.
  3. DFSClient removes the second datanode from the pipeline and continue writing with the remaining datanode(s).
  4. The pipeline now has two datanodes but (2) and (3) repeat.
  5. Now, only F remains in the pipeline. DFSClient continues writing with one replica in F.
  6. The write succeeds and DFSClient is able to close the file successfully.
  7. The block is under replicated. The NameNode schedules replication from F to some other datanode D.
  8. The replication fails for the same reason. D reports to the NameNode that the replica in F is corrupted.
  9. The NameNode marks the replica in F is corrupted.
  10. The block is corrupted since no replica is available.

We were able to manually divide the replicas into small files and copy them out from F without fixing the hardware. The replicas seems uncorrupted. This is a data availability problem.

1.1 Severity

Critical

1.2 Was there exception thrown?

Yes.

2013-07-20 10:12:52,750 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:32950 - First Verification failed for BP-2087127

045-127.0.1.1-1374329565029:blk_7809964727882653017_1001org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

2013-07-20 10:12:52,759 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:32950 - Second Verification failed for BP-208712

7045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

Besides exceptions, there are also error messages:

2013-07-20 10:12:48,492 INFO  datanode.DataNode (BlockReceiver.java:verifyChunks(342)) - 127.0.0.1:44384 - report corrupt block BP-2087127045-127.0.1.1-1374329565029:b

lk_7809964727882653017_1001 from datanode  to namenode

2013-07-20 10:12:56,662 WARN  hdfs.DFSClient (DFSInputStream.java:readBuffer(495)) - Found Checksum error for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from 127.0.0.1:44384 at 391680

1.2.1 Multiple exceptions?

Yes. There were 2 checksum exceptions during the block scan. In fact, immediately after the data is copied, there was an INFO msg:

2013-07-20 10:12:48,492 INFO  datanode.DataNode (BlockReceiver.java:verifyChunks(342)) - 127.0.0.1:44384 - report corrupt block BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from datanode  to namenode

2. How to reproduce this failure

This failure is very hard to be reproduced. We cannot fully reproduce this failure. The reason is that it is hard to corrupt a block for 1DN (but not other DNs).

The way we reproduce it is to run the testcase:

mvn test -Dtest=TestCrcCorruption#testCorruptionDuringWrt

And in the DFSOutputStream.java (this is the client code to send the data to DN), we modify this code to simulate the data corruption:

+          int cIdx = 0;
+          if (DFSClientFaultInjector.get().corruptPacket()) {
+            // flip a byte
+            cIdx = buf.position()+buf.remaining()-1;
+            buf.array()[cIdx] ^= 0xff;
+          }
+
          // write out data to remote datanode
          blockStream.write(buf.array(), buf.position(), buf.remaining());
          blockStream.flush();
          lastPacket = System.currentTimeMillis();
+
+          if (DFSClientFaultInjector.get().uncorruptPacket()) {
+            // flip back before retransmission
+            buf.array()[cIdx] ^= 0xff;
+          }
         

But this piece of code will corrupt all the DNs, not only one of them.

2.0 Version

0.23.8: reversed the patch.

2.1 Configuration

No special configuration needed. Just run the testcase.

In the real-production setting, just need replica=2. The real challenge is how to corrupt the data.

2.2 Reproduction procedure

mvn test -Dtest=TestCrcCorruption#testCorruptionDuringWrt

2.3 Can the logs tell how to reproduce the failure?

Yes. It can tell the key events.

2013-07-20 10:12:48,342 INFO  hdfs.StateChange (FSNamesystem.java:allocateBlock(1764)) - BLOCK* NameSystem.allocateBlock: /test_corruption_file. BP-2087127045-127.0.1.1-1374329565029 blk_7809964727882653017_1001{blockUCState=UNDER_CONSTRUCTION, primaryNodeIndex=-1, replicas=[ReplicaUnderConstruction[127.0.0.1:32950|RBW], ReplicaUnderConstruction[127.0.0.1:59335|RBW], ReplicaUnderConstruction[127.0.0.1:44384|RBW]]}

  --- We can infer the pipeline.

2013-07-20 10:12:48,492 INFO  datanode.DataNode (BlockReceiver.java:verifyChunks(342)) - 127.0.0.1:44384 - report corrupt block BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from datanode to namenode

  --- This shows that the DN detected the checksum. (44384 is the last DN).

2013-07-20 10:12:52,991 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:44384 - First Verification failed for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

2013-07-20 10:12:53,000 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:44384 - Second Verification failed for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

2013-07-20 10:12:53,000 INFO  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:handleScanFailure(334)) - 127.0.0.1:44384 - Reporting bad block BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

 --- The block scanner detects checksum mismatch, and reporting again.

2013-07-20 10:12:53,003 INFO  hdfs.StateChange (NameNodeRpcServer.java:reportBadBlocks(481)) - *DIR* NameNode.reportBadBlocks

2013-07-20 10:12:53,003 INFO  BlockStateChange (CorruptReplicasMap.java:addToCorruptReplicasMap(66)) - BLOCK NameSystem.addToCorruptReplicasMap: blk_7809964727882653017 added as corrupt on 127.0.0.1:44384 by localhost/127.0.0.1 because client machine reported it

2013-07-20 10:12:56,662 WARN  hdfs.DFSClient (DFSInputStream.java:readBuffer(495)) - Found Checksum error for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from 127.0.0.1:44384 at 391680

  --- Client and NN takes that DN out...

2.4 How many machines needed?

3 (NN+3DN)

3. Diagnosis procedure

It is not very hard to notice the symptom. The complexity lies in the fix though.

3.1 Detailed Symptom (where you start)

1. Client will notice a block is under-replicated from FSCK output.

  --- From this we can infer the block ID.

2. In NN log:

2013-07-20 10:12:53,003 INFO  hdfs.StateChange (NameNodeRpcServer.java:reportBadBlocks(481)) - *DIR* NameNode.reportBadBlocks

2013-07-20 10:12:53,003 INFO  BlockStateChange (CorruptReplicasMap.java:addToCorruptReplicasMap(66)) - BLOCK NameSystem.addToCorruptReplicasMap: blk_7809964727882653017 added as corrupt on 127.0.0.1:44384 by localhost/127.0.0.1 because client machine reported

  --- We know it is corrupted, and the corrupted DN.

3. In the corrupted DN’s log:

2013-07-20 10:12:48,492 INFO  datanode.DataNode (BlockReceiver.java:verifyChunks(342)) - 127.0.0.1:44384 - report corrupt block BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from datanode to namenode

  --- This shows that the DN detected the checksum. (44384 is the last DN).

2013-07-20 10:12:52,991 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:44384 - First Verification failed for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

2013-07-20 10:12:53,000 WARN  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:verifyBlock(444)) - 127.0.0.1:44384 - Second Verification failed for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

org.apache.hadoop.fs.ChecksumException: Checksum failed at 391680

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.verifyChecksum(BlockSender.java:555)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:460)

        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:622)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyBlock(BlockPoolSliceScanner.java:408)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.verifyFirstBlock(BlockPoolSliceScanner.java:489)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scan(BlockPoolSliceScanner.java:644)

        at org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner.scanBlockPoolSlice(BlockPoolSliceScanner.java:624)

        at org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.run(DataBlockScanner.java:94)

        at java.lang.Thread.run(Thread.java:662)

2013-07-20 10:12:53,000 INFO  datanode.BlockPoolSliceScanner (BlockPoolSliceScanner.java:handleScanFailure(334)) - 127.0.0.1:44384 - Reporting bad block BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001

4. In the client log:

2013-07-20 10:12:56,662 WARN  hdfs.DFSClient (DFSInputStream.java:readBuffer(495)) - Found Checksum error for BP-2087127045-127.0.1.1-1374329565029:blk_7809964727882653017_1001 from 127.0.0.1:44384 at 391680

So we can infer: the last DN detected the corruption, and caused itself to be removed from the pipeline. From the logs we have no idea why it detected the corruption (we cannot know that it’s because the up-stream DN sent an invalid checksum).

3.2 Backward inference

It’s not hard to locate this piece of code:

verifyChunks() {

 /* Compare the client-provided checksum with local one.*/

 if (!clientChecksum.compare(checksumBuf, checksumOff)) {

   if (srcDataNode != null) {

     /* Report this to the Namenode!! */

     LOG.info("report corrupt block " + block + " from datanode " +..);

     nn.reportBadBlocks(new LocatedBlock[] {lb});

   }

   throw new IOException("Unexpected checksum mismatch " +

           "while writing " + block + " from " + inAddr);

 }

}

So all the other behaviors that later caused this DN to be removed is in fact, by design.

4. Root cause

The last DN shouldn’t report to the NN & client too soon that it is corrupted.

The correct behavior: DN1 -> DN2, and DN2 detects a checksum error, then instead of shutting down DN2, DN1 should be kicked out of the pipeline... In this case, since DN1 is of faulty, then we kick it out and things will become normal.

Client(not corrupt) d1(not corrupt) d2(not corrupt) d3(corrupt), where d3 for some reason sees only corrupt data.

The patch explained in detail below.

4.1 Category:

Incorrect exception handling: the DN2’s checksum exception is not handled correctly. This is hard to test: it requires them to understand the error scenario… Not as simple as statement coverage.


@@ -605,15 +619,24 @@

      buf.position(buf.limit()); // move to the end of the data.

-      /* skip verifying checksum iff this is not the last one in the
-       * pipeline and clientName is non-null. i.e. Checksum is verified
-       * on all the datanodes when the data is being written by a
-       * datanode rather than a client. Whe client is writing the data,
-       * protocol includes acks and only the last datanode needs to verify
-       * checksum.
-       */
-      if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
-        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+      if (shouldVerifyChecksum()) {
+        try {
+          verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        } catch (IOException ioe) {
+          // checksum error detected locally. there is no reason to continue.
+          if (responder != null) {
+            try {
+              ((PacketResponder) responder.getRunnable()).enqueue(seqno,
+                  lastPacketInBlock, offsetInBlock,
+                  Status.ERROR_CHECKSUM);
+              // Wait until the responder sends back the response
+              // and interrupt this thread.
+              Thread.sleep(3000);
+            } catch (InterruptedException ie) { }
+          }
+          throw new IOException("Terminating due to a checksum error." + ioe);
+        }
+
        if (needsChecksumTranslation) {
          // overwrite the checksums in the packet buffer with the
          // appropriate polynomial for the disk storage.

 --- If “verifyChunks” throws an exception, the previous code will ignore, causing this DN eventually be kicked out of the pipeline. Now, it will handle this exception by:

1. Still enqueue an ack to the upstream, but with “ERROR_CHECKSUM” status;

2. throw out the exception.

The fix below is the fall-through code, that when the verifyChecksum did not throw exception, it will reach the code below, and a “SUCCESS” will be enqueued.


@@ -695,6 +718,11 @@
      }
    }

+    if (responder != null && shouldVerifyChecksum()) {
+        ((PacketResponder) responder.getRunnable()).enqueue(seqno,
+          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
+    }
+
    if (throttler != null) { // throttle I/O
      throttler.throttle(len);
    }
@@ -890,7 +918,7 @@
  }

=========================================================  
  /**
-   * Processed responses from downstream datanodes in the pipeline
+   * Processes responses from downstream datanodes in the pipeline
   * and sends back replies to the originator.
   */
  class PacketResponder implements Runnable, Closeable {  
@@ -943,9 +971,11 @@
     * @param offsetInBlock
     */
    synchronized void enqueue(final long seqno,
-        final boolean lastPacketInBlock, final long offsetInBlock) {
+        final boolean lastPacketInBlock,
+        final long offsetInBlock, final Status ackStatus) {
      if (running) {
-        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
+            ackStatus);
        if(LOG.isDebugEnabled()) {
          LOG.debug(myString + ": enqueue " + p);
        }
@@ -1071,20 +1101,31 @@
              }
            }

+            Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus;
            // construct my ack message
            Status[] replies = null;
            if (mirrorError) { // ack read error
              replies = new Status[2];
-              replies[0] = Status.SUCCESS;
+              replies[0] = myStatus;
              replies[1] = Status.ERROR;
            } else {
              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
                  : ack.getNumOfReplies();
              replies = new Status[1+ackLen];
-              replies[0] = Status.SUCCESS;
+              replies[0] = myStatus;
              for (int i=0; i<ackLen; i++) {
                replies[i+1] = ack.getReply(i);
              }
+              // If the mirror has reported that it received a corrupt packet,
+              // do self-destruct to mark myself bad, instead of the mirror node.
+              // The mirror is guaranteed to be good without corrupt data.
+              if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
+                running = false;
+                removeAckHead();
+                LOG.warn("Shutting down writer and responder due to a checksum error.");
+                receiverThread.interrupt();
+                continue;
+              }
            }
            PipelineAck replyAck = new PipelineAck(expected, replies);
 -- This code is in PacketReceiver.run: The fix here is that we will terminate the packet receiver IF THE DOWNSTREAM DNJREPORTED ERROR_CHECKSUM (replies[1] == ERROR_CHECJSUM)! This is to implement the logic: the upstream DN is kicked out instead of the DN which actually detected the error.

             
@@ -1103,6 +1144,14 @@
              removeAckHead();
              // update bytes acked
            }
+            // terminate after sending response if this node detected
+            // a checksum error
+            if (myStatus == Status.ERROR_CHECKSUM) {
+              running = false;
+              LOG.warn("Shutting down writer and responder due to a checksum error.");
+              receiverThread.interrupt();
+              continue;
+            }
        } catch (IOException e) {
          LOG.warn("IOException in BlockReceiver.run(): ", e);
          if (running) {
@@ -1146,11 +1195,14 @@
    final long seqno;
    final boolean lastPacketInBlock;
    final long offsetInBlock;
+    final Status ackStatus;

-    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock,
+        long offsetInBlock, Status ackStatus) {
      this.seqno = seqno;
      this.lastPacketInBlock = lastPacketInBlock;
      this.offsetInBlock = offsetInBlock;
+      this.ackStatus = ackStatus;
    }

    @Override
@@ -1158,6 +1210,7 @@
      return getClass().getSimpleName() + "(seqno=" + seqno
        + ", lastPacketInBlock=" + lastPacketInBlock
        + ", offsetInBlock=" + offsetInBlock
+        + ", ackStatus=" + ackStatus
        + ")";
    }
  }
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java        (revision 1484482)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java        (working copy)
@@ -1655,13 +1655,25 @@
    }
   
    // check replica length
-    if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
-      throw new ReplicaNotFoundException("Unmatched length replica " +
-          replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
-          " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
+    long bytesAcked = rbw.getBytesAcked();
+    long numBytes = rbw.getNumBytes();
+    if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
+      throw new ReplicaNotFoundException("Unmatched length replica " +
+          replicaInfo + ": BytesAcked = " + bytesAcked +
+          " BytesRcvd = " + numBytes + " are not in the range of [" +
          minBytesRcvd + ", " + maxBytesRcvd + "].");
    }

+    // Truncate the potentially corrupt portion.
+    // If the source was client and the last node in the pipeline was lost,
+    // any corrupt data written after the acked length can go unnoticed.
+    if (numBytes > bytesAcked) {
+      final File replicafile = rbw.getBlockFile();
+      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+      rbw.setNumBytes(bytesAcked);
+      rbw.setLastChecksumAndDataLen(bytesAcked, null);
+    }
+
    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);

  --- This code is that:

forces datanodes to truncate the block being recovered to the acked length. Since the nodes in the middle of write pipeline does not perform checksum verification and writes data to disk before getting ack back from downstream, the unacked portion can contain corrupt data. If the last node simply disappears before reporting a checksum error up, the current pipeline recovery mechanism can overlook the corruption in written data.

Since this truncation discards potentially corrupt portion of block, we do not need any explicit checksum re-verification on checksum error.”


   
Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java        (revision 1484482)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java        (working copy)
@@ -292,6 +292,9 @@
    private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
    volatile boolean hasError = false;
    volatile int errorIndex = -1;
+    /** The last ack sequence number before pipeline failure. */
+    private long lastAckedSeqnoBeforeFailure = -1;
+    private int pipelineRecoveryCount = 0;
    private BlockConstructionStage stage;  // block construction stage
    private long bytesSent = 0; // number of bytes that've been sent

@@ -506,10 +509,22 @@
                " sending packet " + one);
          }

+          int cIdx = 0;
+          if (DFSClientFaultInjector.get().corruptPacket()) {
+            // flip a byte
+            cIdx = buf.position()+buf.remaining()-1;
+            buf.array()[cIdx] ^= 0xff;
+          }
+
          // write out data to remote datanode
          blockStream.write(buf.array(), buf.position(), buf.remaining());
          blockStream.flush();
          lastPacket = System.currentTimeMillis();
+
+          if (DFSClientFaultInjector.get().uncorruptPacket()) {
+            // flip back before retransmission
+            buf.array()[cIdx] ^= 0xff;
+          }
         
          if (one.isHeartbeatPacket()) {  //heartbeat packet
          }
@@ -738,6 +753,24 @@
        ackQueue.clear();
      }

  --- This code is in client output code:

+            cIdx = buf.position()+buf.remaining()-1;
+            buf.array()[cIdx] ^= 0xff;
will corrupt a block. It is only used in testing (where corruptPacket() will return true).
 
+      // Record the new pipeline failure recovery.
+      if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
+         lastAckedSeqnoBeforeFailure = lastAckedSeqno;
+         pipelineRecoveryCount = 1;
+      } else {
+        // If we had to recover the pipeline five times in a row for the
+        // same packet, this client likely has corrupt data or corrupting
+        // during transmission.
+        if (++pipelineRecoveryCount > 5) {
+          DFSClient.LOG.warn("Error recovering pipeline for writing " +
+              block + ". Already retried 5 times for the same packet.");
+          lastException = new IOException("Failing write. Tried pipeline " +
+              "recovery 5 times without success.");
+          streamerClosed = true;
+          return false;
+        }
+      }
+
      boolean doSleep = setupPipelineForAppendOrRecovery();
     
      if (!streamerClosed && dfsClient.clientRunning) {

 --- “Another new feature added to the latest patch is to terminate hdfs client when pipeline recovery is attempted for more than 5 times while writing the same data packet. This likely indicates the source data is corrupt. In a very small cluster, clients may run out of datanodes and fail before retrying 5 times.”


Index: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
===================================================================
--- hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java        (revision 0)
+++ hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java        (working copy)
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting faults in DFSClient and DFSOutputStream tests.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class DFSClientFaultInjector {
+  public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+
+  public static DFSClientFaultInjector get() {
+    return instance;
+  }
+
+  public boolean corruptPacket() {
+    return false;
+  }
+
+  public boolean uncorruptPacket() {
+    return false;
+  }
+}