Browse Source

HDFS-5583. Make DN send an OOB Ack on shutdown before restarting. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571491 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 11 years ago
parent
commit
1c6b5d2b58

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt

@@ -82,3 +82,4 @@ HDFS-5535 subtasks:
     HDFS-6004. Change DFSAdmin for rolling upgrade commands. (szetszwo via
     Arpit Agarwal)
 
+    HDFS-5583. Make DN send an OOB Ack on shutdown before restarting. (kihwal)

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -225,6 +225,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 4000;
+  public static final String  DFS_DATANODE_OOB_TIMEOUT_KEY = "dfs.datanode.oob.timeout-ms";
+  public static final String  DFS_DATANODE_OOB_TIMEOUT_DEFAULT = "1500,0,0,0"; // OOB_TYPE1, OOB_TYPE2, OOB_TYPE3, OOB_TYPE4
 
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

+ 64 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java

@@ -26,10 +26,12 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-
 import com.google.protobuf.TextFormat;
 
 /** Pipeline Acknowledgment **/
@@ -38,6 +40,21 @@ import com.google.protobuf.TextFormat;
 public class PipelineAck {
   PipelineAckProto proto;
   public final static long UNKOWN_SEQNO = -2;
+  final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
+  final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
+  final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1;
+  // place holder for timeout value of each OOB type
+  final static long[] OOB_TIMEOUT;
+
+  static {
+    OOB_TIMEOUT = new long[NUM_OOB_TYPES];
+    HdfsConfiguration conf = new HdfsConfiguration();
+    String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
+        DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
+    for (int i = 0; i < NUM_OOB_TYPES; i++) {
+      OOB_TIMEOUT[i] = (i < ele.length) ? Long.valueOf(ele[i]) : 0;
+    }
+  }
 
   /** default constructor **/
   public PipelineAck() {
@@ -103,14 +120,57 @@ public class PipelineAck {
    * @return true if all statuses are SUCCESS
    */
   public boolean isSuccess() {
-    for (DataTransferProtos.Status reply : proto.getStatusList()) {
-      if (reply != DataTransferProtos.Status.SUCCESS) {
+    for (Status reply : proto.getStatusList()) {
+      if (reply != Status.SUCCESS) {
         return false;
       }
     }
     return true;
   }
-  
+
+  /**
+   * Returns the OOB status if this ack contains one. 
+   * @return null if it is not an OOB ack.
+   */
+  public Status getOOBStatus() {
+    // Normal data transfer acks will have a valid sequence number, so
+    // this will return right away in most cases.
+    if (getSeqno() != UNKOWN_SEQNO) {
+      return null;
+    }
+    for (Status reply : proto.getStatusList()) {
+      // The following check is valid because protobuf guarantees to
+      // preserve the ordering of enum elements.
+      if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) {
+        return reply;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the timeout to be used for transmitting the OOB type
+   * @return the timeout in milliseconds
+   */
+  public static long getOOBTimeout(Status status) throws IOException {
+    int index = status.getNumber() - OOB_START;
+    if (index >= 0 && index < NUM_OOB_TYPES) {
+      return OOB_TIMEOUT[index];
+    } 
+    // Not an OOB.
+    throw new IOException("Not an OOB status: " + status);
+  }
+
+  /** Get the Restart OOB ack status */
+  public static Status getRestartOOBStatus() {
+    return Status.OOB_RESTART;
+  }
+
+  /** return true if it is the restart OOB status code  */
+  public static boolean isRestartOOBStatus(Status st) {
+    return st.equals(Status.OOB_RESTART);
+  }
+
   /**** Writable interface ****/
   public void readFields(InputStream in) throws IOException {
     proto = PipelineAckProto.parseFrom(vintPrefixed(in));

+ 174 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -723,14 +723,40 @@ class BlockReceiver implements Closeable {
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception for " + block, ioe);
-      throw ioe;
+      if (datanode.isRestarting()) {
+        // Do not throw if shutting down for restart. Otherwise, it will cause
+        // premature termination of responder.
+        LOG.info("Shutting down for restart (" + block + ").");
+      } else {
+        LOG.info("Exception for " + block, ioe);
+        throw ioe;
+      }
     } finally {
-      if (!responderClosed) { // Abnormal termination of the flow above
-        IOUtils.closeStream(this);
+      // Clear the previous interrupt state of this thread.
+      Thread.interrupted();
+
+      // If a shutdown for restart was initiated, upstream needs to be notified.
+      // There is no need to do anything special if the responder was closed
+      // normally.
+      if (!responderClosed) { // Data transfer was not complete.
         if (responder != null) {
+          // In case this datanode is shutting down for quick restart,
+          // send a special ack upstream.
+          if (datanode.isRestarting()) {
+            try {
+              ((PacketResponder) responder.getRunnable()).
+                  sendOOBResponse(PipelineAck.getRestartOOBStatus());
+            } catch (InterruptedException ie) {
+              // It is already going down. Ignore this.
+            } catch (IOException ioe) {
+              LOG.info("Error sending OOB Ack.", ioe);
+              // The OOB ack could not be sent. Since the datanode is going
+              // down, this is ignored.
+            }
+          }
           responder.interrupt();
         }
+        IOUtils.closeStream(this);
         cleanupBlock();
       }
       if (responder != null) {
@@ -744,7 +770,10 @@ class BlockReceiver implements Closeable {
           }
         } catch (InterruptedException e) {
           responder.interrupt();
-          throw new IOException("Interrupted receiveBlock");
+          // do not throw if shutting down for restart.
+          if (!datanode.isRestarting()) {
+            throw new IOException("Interrupted receiveBlock");
+          }
         }
         responder = null;
       }
@@ -862,6 +891,7 @@ class BlockReceiver implements Closeable {
     private final PacketResponderType type;
     /** for log and error messages */
     private final String myString; 
+    private boolean sending = false;
 
     @Override
     public String toString() {
@@ -887,7 +917,9 @@ class BlockReceiver implements Closeable {
     }
 
     private boolean isRunning() {
-      return running && datanode.shouldRun;
+      // When preparing for a restart, it should continue to run until
+      // interrupted by the receiver thread.
+      return running && (datanode.shouldRun || datanode.isRestarting());
     }
     
     /**
@@ -903,44 +935,96 @@ class BlockReceiver implements Closeable {
       if(LOG.isDebugEnabled()) {
         LOG.debug(myString + ": enqueue " + p);
       }
-      synchronized(this) {
+      synchronized(ackQueue) {
         if (running) {
           ackQueue.addLast(p);
-          notifyAll();
+          ackQueue.notifyAll();
+        }
+      }
+    }
+
+    /**
+     * Send an OOB response. If all acks have been sent already for the block
+     * and the responder is about to close, the delivery is not guaranteed.
+     * This is because the other end can close the connection independently.
+     * An OOB coming from downstream will be automatically relayed upstream
+     * by the responder. This method is used only by originating datanode.
+     *
+     * @param ackStatus the type of ack to be sent
+     */
+    void sendOOBResponse(final Status ackStatus) throws IOException,
+        InterruptedException {
+      if (!running) {
+        LOG.info("Cannot send OOB response " + ackStatus + 
+            ". Responder not running.");
+        return;
+      }
+
+      synchronized(this) {
+        if (sending) {
+          wait(PipelineAck.getOOBTimeout(ackStatus));
+          // Didn't get my turn in time. Give up.
+          if (sending) {
+            throw new IOException("Could not send OOB reponse in time: "
+                + ackStatus);
+          }
+        }
+        sending = true;
+      }
+
+      LOG.info("Sending an out of band ack of type " + ackStatus);
+      try {
+        sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
+            ackStatus);
+      } finally {
+        // Let others send ack. Unless there are miltiple OOB send
+        // calls, there can be only one waiter, the responder thread.
+        // In any case, only one needs to be notified.
+        synchronized(this) {
+          sending = false;
+          notify();
         }
       }
     }
     
     /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
-    synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
-      while (isRunning() && ackQueue.size() == 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(myString + ": seqno=" + seqno +
-                    " waiting for local datanode to finish write.");
+    Packet waitForAckHead(long seqno) throws InterruptedException {
+      synchronized(ackQueue) {
+        while (isRunning() && ackQueue.size() == 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(myString + ": seqno=" + seqno +
+                      " waiting for local datanode to finish write.");
+          }
+          ackQueue.wait();
         }
-        wait();
+        return isRunning() ? ackQueue.getFirst() : null;
       }
-      return isRunning() ? ackQueue.getFirst() : null;
     }
 
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
     @Override
-    public synchronized void close() {
-      while (isRunning() && ackQueue.size() != 0) {
-        try {
-          wait();
-        } catch (InterruptedException e) {
-          running = false;
-          Thread.currentThread().interrupt();
+    public void close() {
+      synchronized(ackQueue) {
+        while (isRunning() && ackQueue.size() != 0) {
+          try {
+            ackQueue.wait();
+          } catch (InterruptedException e) {
+            running = false;
+            Thread.currentThread().interrupt();
+          }
         }
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(myString + ": closing");
+        }
+        running = false;
+        ackQueue.notifyAll();
       }
-      if(LOG.isDebugEnabled()) {
-        LOG.debug(myString + ": closing");
+
+      synchronized(this) {
+        notifyAll();
       }
-      running = false;
-      notifyAll();
     }
 
     /**
@@ -968,6 +1052,14 @@ class BlockReceiver implements Closeable {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(myString + " got " + ack);
               }
+              // Process an OOB ACK.
+              Status oobStatus = ack.getOOBStatus();
+              if (oobStatus != null) {
+                LOG.info("Relaying an out of band ack of type " + oobStatus);
+                sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
+                    Status.SUCCESS);
+                continue;
+              }
               seqno = ack.getSeqno();
             }
             if (seqno != PipelineAck.UNKOWN_SEQNO
@@ -1025,6 +1117,9 @@ class BlockReceiver implements Closeable {
              * status back to the client because this datanode has a problem.
              * The upstream datanode will detect that this datanode is bad, and
              * rightly so.
+             *
+             * The receiver thread can also interrupt this thread for sending
+             * an out-of-band response upstream.
              */
             LOG.info(myString + ": Thread is interrupted.");
             running = false;
@@ -1094,17 +1189,64 @@ class BlockReceiver implements Closeable {
     }
     
     /**
+     * The wrapper for the unprotected version. This is only called by
+     * the responder's run() method.
+     *
      * @param ack Ack received from downstream
      * @param seqno sequence number of ack to be sent upstream
      * @param totalAckTimeNanos total ack time including all the downstream
      *          nodes
      * @param offsetInBlock offset in block for the data in packet
+     * @param myStatus the local ack status
      */
     private void sendAckUpstream(PipelineAck ack, long seqno,
         long totalAckTimeNanos, long offsetInBlock,
         Status myStatus) throws IOException {
+      try {
+        // Wait for other sender to finish. Unless there is an OOB being sent,
+        // the responder won't have to wait.
+        synchronized(this) {
+          while(sending) {
+            wait();
+          }
+          sending = true;
+        }
+
+        try {
+          if (!running) return;
+          sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
+              offsetInBlock, myStatus);
+        } finally {
+          synchronized(this) {
+            sending = false;
+            notify();
+          }
+        }
+      } catch (InterruptedException ie) {
+        // The responder was interrupted. Make it go down without
+        // interrupting the receiver(writer) thread.  
+        running = false;
+      }
+    }
+
+    /**
+     * @param ack Ack received from downstream
+     * @param seqno sequence number of ack to be sent upstream
+     * @param totalAckTimeNanos total ack time including all the downstream
+     *          nodes
+     * @param offsetInBlock offset in block for the data in packet
+     * @param myStatus the local ack status
+     */
+    private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
+        long totalAckTimeNanos, long offsetInBlock, Status myStatus)
+        throws IOException {
       Status[] replies = null;
-      if (mirrorError) { // ack read error
+      if (ack == null) {
+        // A new OOB response is being sent from this node. Regardless of
+        // downstream nodes, reply should contain one reply.
+        replies = new Status[1];
+        replies[0] = myStatus;
+      } else if (mirrorError) { // ack read error
         replies = MIRROR_ERROR_STATUS;
       } else {
         short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
@@ -1152,9 +1294,11 @@ class BlockReceiver implements Closeable {
      * 
      * This should be called only when the ack queue is not empty
      */
-    private synchronized void removeAckHead() {
-      ackQueue.removeFirst();
-      notifyAll();
+    private void removeAckHead() {
+      synchronized(ackQueue) {
+        ackQueue.removeFirst();
+        ackQueue.notifyAll();
+      }
     }
   }
   

+ 72 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -175,6 +175,8 @@ public class DataNode extends Configured
   }
   
   volatile boolean shouldRun = true;
+  volatile boolean shutdownForUpgrade = false;
+  private boolean shutdownInProgress = false;
   private BlockPoolManager blockPoolManager;
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   private String clusterId = null;
@@ -1190,9 +1192,31 @@ public class DataNode extends Configured
     // offerServices may be modified.
     BPOfferService[] bposArray = this.blockPoolManager == null ? null
         : this.blockPoolManager.getAllNamenodeThreads();
-    this.shouldRun = false;
+    // If shutdown is not for restart, set shouldRun to false early. 
+    if (!shutdownForUpgrade) {
+      shouldRun = false;
+    }
+
+    // When shutting down for restart, DataXceiverServer is interrupted
+    // in order to avoid any further acceptance of requests, but the peers
+    // for block writes are not closed until the clients are notified.
+    if (dataXceiverServer != null) {
+      ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
+      this.dataXceiverServer.interrupt();
+    }
+
+    // Record the time of initial notification
+    long timeNotified = Time.now();
+
+    if (localDataXceiverServer != null) {
+      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+      this.localDataXceiverServer.interrupt();
+    }
+
+    // Terminate directory scanner and block scanner
     shutdownPeriodicScanners();
-    
+
+    // Stop the web server
     if (infoServer != null) {
       try {
         infoServer.stop();
@@ -1200,26 +1224,24 @@ public class DataNode extends Configured
         LOG.warn("Exception shutting down DataNode", e);
       }
     }
-    if (ipcServer != null) {
-      ipcServer.stop();
-    }
     if (pauseMonitor != null) {
       pauseMonitor.stop();
     }
+
+    // shouldRun is set to false here to prevent certain threads from exiting
+    // before the restart prep is done.
+    this.shouldRun = false;
     
-    if (dataXceiverServer != null) {
-      ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
-      this.dataXceiverServer.interrupt();
-    }
-    if (localDataXceiverServer != null) {
-      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
-      this.localDataXceiverServer.interrupt();
-    }
     // wait for all data receiver threads to exit
     if (this.threadGroup != null) {
       int sleepMs = 2;
       while (true) {
-        this.threadGroup.interrupt();
+        // When shutting down for restart, wait 2.5 seconds before forcing
+        // termination of receiver threads.
+        if (!this.shutdownForUpgrade || 
+            (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) {
+          this.threadGroup.interrupt();
+        }
         LOG.info("Waiting for threadgroup to exit, active threads is " +
                  this.threadGroup.activeCount());
         if (this.threadGroup.activeCount() == 0) {
@@ -1249,7 +1271,13 @@ public class DataNode extends Configured
       } catch (InterruptedException ie) {
       }
     }
-    
+   
+   // IPC server needs to be shutdown late in the process, otherwise
+   // shutdown command response won't get sent.
+   if (ipcServer != null) {
+      ipcServer.stop();
+    }
+
     if(blockPoolManager != null) {
       try {
         this.blockPoolManager.shutDownAll(bposArray);
@@ -1275,6 +1303,11 @@ public class DataNode extends Configured
       MBeans.unregister(dataNodeInfoBeanName);
       dataNodeInfoBeanName = null;
     }
+    LOG.info("Shutdown complete.");
+    synchronized(this) {
+      // Notify the main thread.
+      notifyAll();
+    }
   }
   
   
@@ -1775,7 +1808,11 @@ public class DataNode extends Configured
             && blockPoolManager.getAllNamenodeThreads().length == 0) {
           shouldRun = false;
         }
-        Thread.sleep(2000);
+        // Terminate if shutdown is complete or 2 seconds after all BPs
+        // are shutdown.
+        synchronized(this) {
+          wait(2000);
+        }
       } catch (InterruptedException ex) {
         LOG.warn("Received exception in Datanode#join: " + ex);
       }
@@ -2411,17 +2448,27 @@ public class DataNode extends Configured
   }
 
   @Override // ClientDatanodeProtocol
-  public void shutdownDatanode(boolean forUpgrade) throws IOException {
+  public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException {
     LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
         "). Shutting down Datanode...");
 
-    // Delay start the shutdown process so that the rpc response can be
+    // Shutdown can be called only once.
+    if (shutdownInProgress) {
+      throw new IOException("Shutdown already in progress.");
+    }
+    shutdownInProgress = true;
+    shutdownForUpgrade = forUpgrade;
+
+    // Asynchronously start the shutdown process so that the rpc response can be
     // sent back.
     Thread shutdownThread = new Thread() {
       @Override public void run() {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) { }
+        if (!shutdownForUpgrade) {
+          // Delay the shutdown a bit if not doing for restart.
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) { }
+        }
         shutdown();
       }
     };
@@ -2462,6 +2509,10 @@ public class DataNode extends Configured
     return bp != null ? bp.isAlive() : false;
   }
 
+  boolean isRestarting() {
+    return shutdownForUpgrade;
+  }
+
   /**
    * A datanode is considered to be fully started if all the BP threads are
    * alive and all the block pools are initialized.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -158,8 +158,8 @@ class DataXceiver extends Receiver implements Runnable {
     int opsProcessed = 0;
     Op op = null;
 
-    dataXceiverServer.addPeer(peer);
     try {
+      dataXceiverServer.addPeer(peer, Thread.currentThread());
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer) {

+ 61 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -20,8 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,8 @@ class DataXceiverServer implements Runnable {
   
   private final PeerServer peerServer;
   private final DataNode datanode;
-  private final Set<Peer> peers = new HashSet<Peer>();
+  private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+  private boolean closed = false;
   
   /**
    * Maximal number of concurrent xceivers per node.
@@ -127,7 +127,7 @@ class DataXceiverServer implements Runnable {
   @Override
   public void run() {
     Peer peer = null;
-    while (datanode.shouldRun) {
+    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
       try {
         peer = peerServer.accept();
 
@@ -147,7 +147,7 @@ class DataXceiverServer implements Runnable {
       } catch (AsynchronousCloseException ace) {
         // another thread closed our listener socket - that's expected during shutdown,
         // but not in other circumstances
-        if (datanode.shouldRun) {
+        if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
           LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
@@ -170,35 +170,82 @@ class DataXceiverServer implements Runnable {
         datanode.shouldRun = false;
       }
     }
-    synchronized (this) {
-      for (Peer p : peers) {
-        IOUtils.cleanup(LOG, p);
-      }
-    }
+
+    // Close the server to stop reception of more requests.
     try {
       peerServer.close();
+      closed = true;
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName()
           + " :DataXceiverServer: close exception", ie);
     }
+
+    // if in restart prep stage, notify peers before closing them.
+    if (datanode.shutdownForUpgrade) {
+      restartNotifyPeers();
+      // Each thread needs some time to process it. If a thread needs
+      // to send an OOB message to the client, but blocked on network for
+      // long time, we need to force its termination.
+      LOG.info("Shutting down DataXceiverServer before restart");
+      // Allow roughly up to 2 seconds.
+      for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    }
+    // Close all peers.
+    closeAllPeers();
   }
 
   void kill() {
-    assert datanode.shouldRun == false :
-      "shoudRun should be set to false before killing";
+    assert (datanode.shouldRun == false || datanode.shutdownForUpgrade) :
+      "shoudRun should be set to false or restarting should be true"
+      + " before killing";
     try {
       this.peerServer.close();
+      this.closed = true;
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
     }
   }
   
-  synchronized void addPeer(Peer peer) {
-    peers.add(peer);
+  synchronized void addPeer(Peer peer, Thread t) throws IOException {
+    if (closed) {
+      throw new IOException("Server closed.");
+    }
+    peers.put(peer, t);
   }
 
   synchronized void closePeer(Peer peer) {
     peers.remove(peer);
     IOUtils.cleanup(null, peer);
   }
+
+  // Notify all peers of the shutdown and restart.
+  // datanode.shouldRun should still be true and datanode.restarting should
+  // be set true before calling this method.
+  synchronized void restartNotifyPeers() {
+    assert (datanode.shouldRun == true && datanode.shutdownForUpgrade);
+    for (Peer p : peers.keySet()) {
+      // interrupt each and every DataXceiver thread.
+      peers.get(p).interrupt();
+    }
+  }
+
+  // Close all peers and clear the map.
+  synchronized void closeAllPeers() {
+    LOG.info("Closing all peers.");
+    for (Peer p : peers.keySet()) {
+      IOUtils.cleanup(LOG, p);
+    }
+    peers.clear();
+  }
+
+  // Return the number of peers.
+  synchronized int getNumPeers() {
+    return peers.size();
+  }
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -157,6 +157,10 @@ enum Status {
   ERROR_ACCESS_TOKEN = 5;
   CHECKSUM_OK = 6;
   ERROR_UNSUPPORTED = 7;
+  OOB_RESTART = 8;            // Quick restart
+  OOB_RESERVED1 = 9;          // Reserved
+  OOB_RESERVED2 = 10;         // Reserved
+  OOB_RESERVED3 = 11;         // Reserved
 }
 
 message PipelineAckProto {

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
@@ -159,4 +161,39 @@ public class TestClientProtocolForPipelineRecovery {
       }
     }
   }
+
+  /** Test recovery on restart OOB message */
+  @Test
+  public void testPipelineRecoveryOnOOB() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      int numDataNodes = 3;
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("dataprotocol2.dat");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
+      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+          getWrappedStream());
+      out.write(1);
+      out.hflush();
+
+      DFSAdmin dfsadmin = new DFSAdmin(conf);
+      DataNode dn = cluster.getDataNodes().get(0);
+      final String dnAddr = dn.getDatanodeId().getIpcAddr(false);
+      // issue shutdown to the datanode.
+      final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
+      Assert.assertEquals(0, dfsadmin.run(args1));
+      out.close();
+      Thread.sleep(3000);
+      final String[] args2 = {"-getDatanodeInfo", dnAddr };
+      Assert.assertEquals(-1, dfsadmin.run(args2));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }