Browse Source

Revert the change made by HDFS-101 from branch 0.20

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@899506 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 years ago
parent
commit
2f06a0d82b
2 changed files with 47 additions and 53 deletions
  1. 0 3
      CHANGES.txt
  2. 47 50
      src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

+ 0 - 3
CHANGES.txt

@@ -81,9 +81,6 @@ Release 0.20.2 - Unreleased
 
 
     HADOOP-6428. HttpServer sleeps with negative values (cos)
     HADOOP-6428. HttpServer sleeps with negative values (cos)
 
 
-    HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
-    datanode failure. (hairong)
-
     HADOOP-5623. Fixes a problem to do with status messages getting overwritten
     HADOOP-5623. Fixes a problem to do with status messages getting overwritten
     in streaming jobs. (Rick Cox and Jothi Padmanabhan via tomwhite)
     in streaming jobs. (Rick Cox and Jothi Padmanabhan via tomwhite)
 
 

+ 47 - 50
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -75,7 +75,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   DatanodeInfo srcDataNode = null;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private Checksum partialCrc = null;
   private DataNode datanode = null;
   private DataNode datanode = null;
-  volatile private boolean mirrorError;
 
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
                 String myAddr, boolean isRecovery, String clientName, 
@@ -174,19 +173,21 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
 
 
   /**
   /**
    * While writing to mirrorOut, failure to write to mirror should not
    * While writing to mirrorOut, failure to write to mirror should not
-   * affect this datanode.
+   * affect this datanode unless a client is writing the block.
    */
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
   private void handleMirrorOutError(IOException ioe) throws IOException {
-    LOG.info(datanode.dnRegistration + ": Exception writing block " +
+    LOG.info(datanode.dnRegistration + ":Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
              StringUtils.stringifyException(ioe));
-    if (Thread.interrupted()) { // shut down if the thread is interrupted
+    mirrorOut = null;
+    //
+    // If stream-copy fails, continue
+    // writing to disk for replication requests. For client
+    // writes, return error so that the client can do error
+    // recovery.
+    //
+    if (clientName.length() > 0) {
       throw ioe;
       throw ioe;
-    } else { // encounter an error while writing to mirror
-      // continue to run even if can not write to mirror
-      // notify client of the error
-      // and wait for the client to shut down the pipeline
-      mirrorError = true;
     }
     }
   }
   }
   
   
@@ -395,8 +396,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     
     
     setBlockPosition(offsetInBlock);
     setBlockPosition(offsetInBlock);
     
     
-    // First write the packet to the mirror:
-    if (mirrorOut != null && !mirrorError) {
+    //First write the packet to the mirror:
+    if (mirrorOut != null) {
       try {
       try {
         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
         mirrorOut.flush();
         mirrorOut.flush();
@@ -514,8 +515,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       if (clientName.length() > 0) {
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
                                new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   Thread.currentThread()));
+                                                   replyOut, numTargets));
         responder.start(); // start thread to processes reponses
         responder.start(); // start thread to processes reponses
       }
       }
 
 
@@ -700,21 +700,18 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     DataOutputStream replyOut;  // output to upstream datanode
     DataOutputStream replyOut;  // output to upstream datanode
     private int numTargets;     // number of downstream datanodes including myself
     private int numTargets;     // number of downstream datanodes including myself
     private BlockReceiver receiver; // The owner of this responder.
     private BlockReceiver receiver; // The owner of this responder.
-    private Thread receiverThread; // the thread that spawns this responder
 
 
     public String toString() {
     public String toString() {
       return "PacketResponder " + numTargets + " for Block " + this.block;
       return "PacketResponder " + numTargets + " for Block " + this.block;
     }
     }
 
 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets,
-                    Thread receiverThread) {
+                    DataOutputStream out, int numTargets) {
       this.receiver = receiver;
       this.receiver = receiver;
       this.block = b;
       this.block = b;
       mirrorIn = in;
       mirrorIn = in;
       replyOut = out;
       replyOut = out;
       this.numTargets = numTargets;
       this.numTargets = numTargets;
-      this.receiverThread = receiverThread;
     }
     }
 
 
     /**
     /**
@@ -851,26 +848,24 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
       }
 
 
       boolean lastPacketInBlock = false;
       boolean lastPacketInBlock = false;
-      boolean isInterrupted = false;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
 
         try {
         try {
+            boolean didRead = false;
             long expected = -2;
             long expected = -2;
             PipelineAck ack = new PipelineAck();
             PipelineAck ack = new PipelineAck();
-            long seqno = -2;
             try { 
             try { 
-              if (!mirrorError) {
-                // read an ack from downstream datanode
-                ack.readFields(mirrorIn);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
-                }
-                seqno = ack.getSeqno();
+              // read an ack from downstream datanode
+              ack.readFields(mirrorIn);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("PacketResponder " + numTargets + " got " + ack);
               }
               }
+              long seqno = ack.getSeqno();
+              didRead = true;
               if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
               if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
                 ack.write(replyOut); // send keepalive
                 ack.write(replyOut); // send keepalive
                 replyOut.flush();
                 replyOut.flush();
-              } else if (seqno >= 0 || mirrorError) {
+              } else if (seqno >= 0) {
                 Packet pkt = null;
                 Packet pkt = null;
                 synchronized (this) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -882,13 +877,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                     }
                     }
                     wait();
                     wait();
                   }
                   }
-                  if (!running || !datanode.shouldRun) {
-                    break;
-                  }
                   pkt = ackQueue.removeFirst();
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   expected = pkt.seqno;
                   notifyAll();
                   notifyAll();
-                  if (seqno != expected && !mirrorError) {
+                  if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " for block " + block +
                                           " expected seqno:" + expected +
                                           " expected seqno:" + expected +
@@ -897,32 +889,27 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                   lastPacketInBlock = pkt.lastPacketInBlock;
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
                 }
               }
               }
-            } catch (InterruptedException ine) {
-              isInterrupted = true;
-            } catch (IOException ioe) {
-              if (Thread.interrupted()) {
-                isInterrupted = true;
-              } else {
-                // continue to run even if can not read from mirror
-                // notify client of the error
-                // and wait for the client to shut down the pipeline
-                mirrorError = true;
-                LOG.info("PacketResponder " + block + " " + numTargets +
-                    " Exception " + StringUtils.stringifyException(ioe));
+            } catch (Throwable e) {
+              if (running) {
+                LOG.info("PacketResponder " + block + " " + numTargets + 
+                         " Exception " + StringUtils.stringifyException(e));
+                running = false;
               }
               }
             }
             }
 
 
-            if (Thread.interrupted() || isInterrupted) {
+            if (Thread.interrupted()) {
               /* The receiver thread cancelled this thread. 
               /* The receiver thread cancelled this thread. 
                * We could also check any other status updates from the 
                * We could also check any other status updates from the 
                * receiver thread (e.g. if it is ok to write to replyOut). 
                * receiver thread (e.g. if it is ok to write to replyOut). 
                * It is prudent to not send any more status back to the client
                * It is prudent to not send any more status back to the client
                * because this datanode has a problem. The upstream datanode
                * because this datanode has a problem. The upstream datanode
-               * will detect that this datanode is bad, and rightly so.
+               * will detect a timout on heartbeats and will declare that
+               * this datanode is bad, and rightly so.
                */
                */
               LOG.info("PacketResponder " + block +  " " + numTargets +
               LOG.info("PacketResponder " + block +  " " + numTargets +
                        " : Thread is interrupted.");
                        " : Thread is interrupted.");
-              break;
+              running = false;
+              continue;
             }
             }
             
             
             // If this is the last packet in block, then close block
             // If this is the last packet in block, then close block
@@ -949,7 +936,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
 
 
             // construct my ack message
             // construct my ack message
             short[] replies = null;
             short[] replies = null;
-            if (mirrorError) { // no ack is read
+            if (!didRead) { // no ack is read
               replies = new short[2];
               replies = new short[2];
               replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
               replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
               replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
               replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
@@ -970,14 +957,24 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                         " for block " + block +
                         " for block " + block +
                         " responded an ack: " + replyAck);
                         " responded an ack: " + replyAck);
             }
             }
-        } catch (Throwable e) {
+
+            // If we forwarded an error response from a downstream datanode
+            // and we are acting on behalf of a client, then we quit. The 
+            // client will drive the recovery mechanism.
+            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
+              running = false;
+            }
+        } catch (IOException e) {
           if (running) {
           if (running) {
             LOG.info("PacketResponder " + block + " " + numTargets + 
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
             running = false;
           }
           }
-          if (!Thread.interrupted()) { // error not caused by interruption
-            receiverThread.interrupt();
+        } catch (RuntimeException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
           }
           }
         }
         }
       }
       }