瀏覽代碼

HDFS-724. Use a bidirectional heartbeat to detect stuck pipeline. Contributed by Hairong Kuang.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-append@1029559 13f79535-47bb-0310-9956-ffa450edef68

Hairong Kuang 14 年之前
父節點
當前提交
539a539a48

+ 3 - 0
CHANGES.txt

@@ -82,6 +82,9 @@ Release 0.20-append - Unreleased
 
     HDFS-1346. DFSClient receives out of order packet ack. (hairong)
 
+    HDFS-724.  Use a bidirectional heartbeat to detect stuck
+    pipeline. (hairong)
+
 Release 0.20.3 - Unreleased
 
   NEW FEATURES

+ 89 - 40
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -81,6 +81,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   private SocketFactory socketFactory;
   private int socketTimeout;
   private int datanodeWriteTimeout;
+  private int timeoutValue;  // read timeout for the socket
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
@@ -189,6 +190,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                                      HdfsConstants.READ_TIMEOUT);
     this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
                                             HdfsConstants.WRITE_TIMEOUT);
+    this.timeoutValue = this.socketTimeout;
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
@@ -2205,7 +2207,28 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       int     dataPos;
       int     checksumStart;
       int     checksumPos;      
-  
+
+      private static final long HEART_BEAT_SEQNO = -1L;
+
+      /**
+       *  create a heartbeat packet
+       */
+      Packet() {
+        this.lastPacketInBlock = false;
+        this.numChunks = 0;
+        this.offsetInBlock = 0;
+        this.seqno = HEART_BEAT_SEQNO;
+
+        buffer = null;
+        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        buf = new byte[packetSize];
+
+        checksumStart = dataStart = packetSize;
+        checksumPos = checksumStart;
+        dataPos = dataStart;
+        maxChunks = 0;
+      }
+
       // create a new packet
       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
         this.lastPacketInBlock = false;
@@ -2286,6 +2309,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         buffer.reset();
         return buffer;
       }
+      
+      /**
+       * Check if this packet is a heart beat packet
+       * @return true if the sequence number is HEART_BEAT_SEQNO
+       */
+      private boolean isHeartbeatPacket() {
+        return seqno == HEART_BEAT_SEQNO;
+      }
     }
   
     //
@@ -2301,6 +2332,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       private volatile boolean closed = false;
   
       public void run() {
+        long lastPacket = 0;
+
         while (!closed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
@@ -2320,23 +2353,36 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             boolean doSleep = processDatanodeError(hasError, false);
 
             // wait for a packet to be sent.
+            long now = System.currentTimeMillis();
             while ((!closed && !hasError && clientRunning 
-                   && dataQueue.size() == 0) || doSleep) {
+                   && dataQueue.size() == 0  &&
+                   (blockStream == null || (
+                    blockStream != null && now - lastPacket < timeoutValue/2)))
+                   || doSleep) {
+              long timeout = timeoutValue/2 - (now-lastPacket);
+              timeout = timeout <= 0 ? 1000 : timeout;
+
               try {
-                dataQueue.wait(1000);
+                dataQueue.wait(timeout);
+                now = System.currentTimeMillis();
               } catch (InterruptedException  e) {
               }
               doSleep = false;
             }
-            if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
+            if (closed || hasError || !clientRunning) {
               continue;
             }
 
             try {
               // get packet to be sent.
-              one = dataQueue.getFirst();
+              if (dataQueue.isEmpty()) {
+                  one = new Packet();  // heartbeat packet
+              } else {
+                  one = dataQueue.getFirst(); // regular data packet
+              }
+              
               long offsetInBlock = one.offsetInBlock;
-  
+
               // get new block from namenode.
               if (blockStream == null) {
                 LOG.debug("Allocating new block");
@@ -2358,12 +2404,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               ByteBuffer buf = one.getBuffer();
               
               // move packet from dataQueue to ackQueue
-              dataQueue.removeFirst();
-              dataQueue.notifyAll();
-              synchronized (ackQueue) {
-                ackQueue.addLast(one);
-                ackQueue.notifyAll();
-              } 
+              if (!one.isHeartbeatPacket()) {
+                dataQueue.removeFirst();
+                dataQueue.notifyAll();
+                synchronized (ackQueue) {
+                  ackQueue.addLast(one);
+                  ackQueue.notifyAll();
+                }
+              }
               
               // write out data to remote datanode
               blockStream.write(buf.array(), buf.position(), buf.remaining());
@@ -2372,6 +2420,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                 blockStream.writeInt(0); // indicate end-of-block 
               }
               blockStream.flush();
+              lastPacket = System.currentTimeMillis();
+
               if (LOG.isDebugEnabled()) {
                 LOG.debug("DataStreamer block " + block +
                           " wrote packet seqno:" + one.seqno +
@@ -2480,38 +2530,37 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             if (LOG.isDebugEnabled()) {
               LOG.debug("DFSClient " + ack);
             }
+            
+            // processes response status from all datanodes.
+            for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
+                short reply = ack.getReply(i);  
+              if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {    
+                errorIndex = i; // first bad datanode   
+                throw new IOException("Bad response " + reply +   
+                      " for block " + block +   
+                      " from datanode " +     
+                      targets[i].getName());    
+              }   
+            }
+
             long seqno = ack.getSeqno();
-            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+            assert seqno != PipelineAck.UNKOWN_SEQNO :
+              "Ack for unkown seqno should be a failed ack: " + ack;
+            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
               continue;
-            } else if (seqno == -2) {
-              // This signifies that some pipeline node failed to read downstream
-              // and therefore has no idea what sequence number the message corresponds
-              // to. So, we don't try to match it up with an ack.
-              assert ! ack.isSuccess();
-            } else {
-              Packet one = null;
-              synchronized (ackQueue) {
-                one = ackQueue.getFirst();
-              }
-              if (one.seqno != seqno) {
-                throw new IOException("Responseprocessor: Expecting seqno " + 
-                                      " for block " + block +
-                                      one.seqno + " but received " + seqno);
-              }
-              lastPacketInBlock = one.lastPacketInBlock;
             }
 
-            // processes response status from all datanodes.
-            for (int i = 0; i < targets.length && clientRunning; i++) {
-              short reply = ack.getReply(i);
-              if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-                errorIndex = i; // first bad datanode
-                throw new IOException("Bad response " + reply +
-                                      " for block " + block +
-                                      " from datanode " + 
-                                      targets[i].getName());
-              }
+            Packet one = null;
+            synchronized (ackQueue) {
+              one = ackQueue.getFirst();
+            }
+            
+            if (one.seqno != seqno) {
+              throw new IOException("Responseprocessor: Expecting seqno " + 
+                                    " for block " + block + " " +
+                                    one.seqno + " but received " + seqno);
             }
+            lastPacketInBlock = one.lastPacketInBlock;
 
             synchronized (ackQueue) {
               ackQueue.removeFirst();
@@ -2931,7 +2980,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         LOG.debug("Connecting to " + nodes[0].getName());
         InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
         s = socketFactory.createSocket();
-        int timeoutValue = 3000 * nodes.length + socketTimeout;
+        timeoutValue = 3000 * nodes.length + socketTimeout;
         NetUtils.connect(s, target, timeoutValue);
         s.setSoTimeout(timeoutValue);
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);

+ 4 - 10
src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -34,15 +34,10 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 14:
-   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
-   *    including the block id, source, and proxy.
-   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
-   *    only the block id.
-   *    A reply to OP_COPY_BLOCK sends the block content.
-   *    A reply to OP_REPLACE_BLOCK includes an operation status.
+   * Version 15:
+   * A heartbeat is sent from the client to pipeline and then acked back
    */
-  public static final int DATA_TRANSFER_VERSION = 14;
+  public static final int DATA_TRANSFER_VERSION = 15;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -66,8 +61,7 @@ public interface DataTransferProtocol {
   public static class PipelineAck {
     private long seqno;
     private short replies[];
-    final public static PipelineAck HEART_BEAT =
-      new PipelineAck(HEARTBEAT_SEQNO, new short[0]);
+    final public static long UNKOWN_SEQNO = -2; 
 
     /** default constructor **/
     public PipelineAck() {

+ 61 - 167
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -471,6 +471,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
           datanode.myMetrics.bytesWritten.inc(len);
+
+          /// flush entire packet before sending ack
+          flush();
+          
+          // update length only after flush to disk
+          datanode.data.setVisibleLength(block, offsetInBlock);
+          
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -478,12 +485,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
     }
 
-    /// flush entire packet before sending ack
-    flush();
-    
-    // update length only after flush to disk
-    datanode.data.setVisibleLength(block, offsetInBlock);
-    
     // put in queue for pending acks
     if (responder != null) {
       ((PacketResponder)responder.getRunnable()).enqueue(seqno,
@@ -757,134 +758,50 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       notifyAll();
     }
 
-    private synchronized void lastDataNodeRun() {
-      long lastHeartbeat = System.currentTimeMillis();
-      boolean lastPacket = false;
-
-      while (running && datanode.shouldRun && !lastPacket) {
-        long now = System.currentTimeMillis();
-        try {
-
-            // wait for a packet to be sent to downstream datanode
-            while (running && datanode.shouldRun && ackQueue.size() == 0) {
-              long idle = now - lastHeartbeat;
-              long timeout = (datanode.socketTimeout/2) - idle;
-              if (timeout <= 0) {
-                timeout = 1000;
-              }
-              try {
-                wait(timeout);
-              } catch (InterruptedException e) {
-                if (running) {
-                  LOG.info("PacketResponder " + numTargets +
-                           " for block " + block + " Interrupted.");
-                  running = false;
-                }
-                break;
-              }
-          
-              // send a heartbeat if it is time.
-              now = System.currentTimeMillis();
-              if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
-                replyOut.flush();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets +
-                            " for block " + block + 
-                            " sent a heartbeat");
-                }
-                lastHeartbeat = now;
-              }
-            }
-
-            if (!running || !datanode.shouldRun) {
-              break;
-            }
-            Packet pkt = ackQueue.removeFirst();
-            long expected = pkt.seqno;
-            notifyAll();
-            LOG.debug("PacketResponder " + numTargets +
-                      " for block " + block + 
-                      " acking for packet " + expected);
-
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (pkt.lastPacketInBlock) {
-              if (!receiver.finalized) {
-                receiver.close();
-                block.setNumBytes(receiver.offsetInBlock);
-                datanode.data.finalizeBlock(block);
-                datanode.myMetrics.blocksWritten.inc();
-                datanode.notifyNamenodeReceivedBlock(block, 
-                    DataNode.EMPTY_DEL_HINT);
-                if (ClientTraceLog.isInfoEnabled() &&
-                    receiver.clientName.length() > 0) {
-                  ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                        receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                        "HDFS_WRITE", receiver.clientName,
-                        datanode.dnRegistration.getStorageID(), block));
-                } else {
-                  LOG.info("Received block " + block + 
-                           " of size " + block.getNumBytes() + 
-                           " from " + receiver.inAddr);
-                }
-              }
-              lastPacket = true;
-            }
-
-            new PipelineAck(expected, new short[]{
-                DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
-            replyOut.flush();
-        } catch (Exception e) {
-          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
-          if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
-                  ioe);
-            }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
-            running = false;
-          }
-        }
-      }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
-    }
-
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
      */
     public void run() {
-
-      // If this is the last datanode in pipeline, then handle differently
-      if (numTargets == 0) {
-        lastDataNodeRun();
-        return;
-      }
-
       boolean lastPacketInBlock = false;
       boolean isInterrupted = false;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
-            /**
-             * Sequence number -2 is a special value that is used when
-             * a DN fails to read an ack from a downstream. In this case,
-             * it needs to tell the client that there's been an error downstream
-             * but has no valid sequence number to use. Thus, -2 is used
-             * as an UNKNOWN value.
-             */
-            long expected = -2;
-            long seqno = -2;
-
-            PipelineAck ack = new PipelineAck();
-            boolean localMirrorError = mirrorError;
-            try { 
-              if (!localMirrorError) {
+          /**
+           * Sequence number -2 is a special value that is used when
+           * a DN fails to read an ack from a downstream. In this case,
+           * it needs to tell the client that there's been an error downstream
+           * but has no valid sequence number to use. Thus, -2 is used
+           * as an UNKNOWN value.
+           */
+          long expected = PipelineAck.UNKOWN_SEQNO;
+          long seqno = PipelineAck.UNKOWN_SEQNO;;
+
+          PipelineAck ack = new PipelineAck();
+          boolean localMirrorError = mirrorError;
+          try { 
+            Packet pkt = null;
+            synchronized (this) {
+              // wait for a packet to arrive
+              while (running && datanode.shouldRun && ackQueue.size() == 0) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + 
+                            " seqno = " + seqno +
+                            " for block " + block +
+                            " waiting for local datanode to finish write.");
+                  }
+                  wait();
+                }
+                if (!running || !datanode.shouldRun) {
+                  break;
+                }
+                pkt = ackQueue.removeFirst();
+                expected = pkt.seqno;
+                notifyAll();
+              }
+              // receive an ack if DN is not the last one in the pipeline
+              if (numTargets > 0 && !localMirrorError) {
                 // read an ack from downstream datanode
                 ack.readFields(mirrorIn, numTargets);
                 if (LOG.isDebugEnabled()) {
@@ -892,34 +809,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                       " for block " + block + " got " + ack);
                 }
                 seqno = ack.getSeqno();
-              }
-              if (seqno >= 0 || localMirrorError) {
-                Packet pkt = null;
-                synchronized (this) {
-                  while (running && datanode.shouldRun && ackQueue.size() == 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("PacketResponder " + numTargets + 
-                                " seqno = " + seqno +
-                                " for block " + block +
-                                " waiting for local datanode to finish write.");
-                    }
-                    wait();
-                  }
-                  if (!running || !datanode.shouldRun) {
-                    break;
-                  }
-                  pkt = ackQueue.removeFirst();
-                  expected = pkt.seqno;
-                  notifyAll();
-                  if (seqno != expected && !localMirrorError) {
-                    throw new IOException("PacketResponder " + numTargets +
-                                          " for block " + block +
-                                          " expected seqno:" + expected +
-                                          " received:" + seqno);
-                  }
-                  lastPacketInBlock = pkt.lastPacketInBlock;
+                // verify seqno
+                if (seqno != expected) {
+                  throw new IOException("PacketResponder " + numTargets +
+                      " for block " + block +
+                      " expected seqno:" + expected +
+                      " received:" + seqno);
                 }
               }
+              lastPacketInBlock = pkt.lastPacketInBlock;
             } catch (InterruptedException ine) {
               isInterrupted = true;
             } catch (IOException ioe) {
@@ -970,25 +868,21 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               }
             }
 
-            PipelineAck replyAck;
-            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-              replyAck = ack;  // continue to send keep alive
+            // construct my ack message
+            short[] replies = null;
+            if (mirrorError) { // no ack is read
+            	replies = new short[2];
+            	replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+            	replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
             } else {
-              // construct my ack message
-              short[] replies = null;
-              if (mirrorError) { // no ack is read
-                replies = new short[2];
-                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
-              } else {
-                replies = new short[1+ack.getNumOfReplies()];
-                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-                for (int i=0; i<ack.getNumOfReplies(); i++) {
-                  replies[i+1] = ack.getReply(i);
-                }
-              }
-              replyAck = new PipelineAck(expected, replies);
+            	short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+            	replies = new short[1+ackLen];
+            	replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+            	for (int i=0; i<ackLen; i++) {
+            		replies[i+1] = ack.getReply(i);
+            	}
             }
+            PipelineAck replyAck = new PipelineAck(expected, replies);
  
             // send my ack back to upstream datanode
             replyAck.write(replyOut);

+ 64 - 1
src/test/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -22,6 +22,9 @@ import java.io.*;
 import java.net.*;
 import java.util.List;
 
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +44,11 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  * support HDFS appends.
  */
 public class TestFileAppend extends TestCase {
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final int blockSize = 1024;
   static final int numBlocks = 10;
   static final int fileSize = numBlocks * blockSize + 1;
@@ -123,7 +131,7 @@ public class TestFileAppend extends TestCase {
 
   private void checkFullFile(FileSystem fs, Path name) throws IOException {
     FSDataInputStream stm = fs.open(name);
-    byte[] actual = new byte[fileSize];
+    byte[] actual = new byte[fileContents.length];
     stm.readFully(0, actual);
     checkData(actual, 0, fileContents, "Read 2");
     stm.close();
@@ -307,4 +315,59 @@ public class TestFileAppend extends TestCase {
       cluster.shutdown();
     }
   }
+  
+
+  /** This creates a slow writer and check to see
+   * if pipeline heartbeats work fine
+   */
+  public void testPipelineHeartbeat() throws Exception {
+    final int DATANODE_NUM = 2;
+    final int fileLen = 6;
+    Configuration conf = new Configuration();
+    final int timeout = 2000;
+    conf.setInt("dfs.socket.timeout",timeout);
+
+    final Path p = new Path("/pipelineHeartbeat/foo");
+    System.out.println("p=" + p);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+    initBuffer(fileLen);
+
+    try {
+    	// create a new file.
+    	FSDataOutputStream stm = createFile(fs, p, DATANODE_NUM);
+
+    	stm.write(fileContents, 0, 1);
+    	Thread.sleep(timeout);
+    	stm.sync();
+    	System.out.println("Wrote 1 byte and hflush " + p);
+
+    	// write another byte
+    	Thread.sleep(timeout);
+    	stm.write(fileContents, 1, 1);
+    	stm.sync();
+
+    	stm.write(fileContents, 2, 1);
+    	Thread.sleep(timeout);
+    	stm.sync();
+
+    	stm.write(fileContents, 3, 1);
+    	Thread.sleep(timeout);
+    	stm.write(fileContents, 4, 1);
+    	stm.sync();
+
+    	stm.write(fileContents, 5, 1);
+    	Thread.sleep(timeout);
+    	stm.close();
+
+    	// verify that entire file is good
+    	checkFullFile(fs, p);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
 }

+ 1 - 1
src/test/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -965,7 +965,7 @@ public class TestFileAppend4 extends TestCase {
    * Mockito answer helper that triggers one latch as soon as the
    * method is called, then waits on another before continuing.
    */
-  private static class DelayAnswer implements Answer {
+  public static class DelayAnswer implements Answer {
     private final CountDownLatch fireLatch = new CountDownLatch(1);
     private final CountDownLatch waitLatch = new CountDownLatch(1);
 

+ 91 - 0
src/test/org/apache/hadoop/hdfs/server/datanode/TestStuckDataNode.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.TestFileAppend4.DelayAnswer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestStuckDataNode extends TestCase {
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /** This creates a slow writer and check to see
+   * if pipeline heartbeats work fine
+   */
+  public void testStuckDataNode() throws Exception {
+    final int DATANODE_NUM = 3;
+    Configuration conf = new Configuration();
+    final int timeout = 8000;
+    conf.setInt("dfs.socket.timeout",timeout);
+
+    final Path p = new Path("/pipelineHeartbeat/foo");
+    System.out.println("p=" + p);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+	DataNodeMetrics metrics = cluster.getDataNodes().get(0).myMetrics;
+	MetricsTimeVaryingLong spyBytesWritten = spy(metrics.bytesWritten);
+	DelayAnswer delayAnswer = new DelayAnswer(); 
+	doAnswer(delayAnswer).when(spyBytesWritten).inc(anyInt());
+	metrics.bytesWritten = spyBytesWritten;
+
+	try {
+    	// create a new file.
+    	FSDataOutputStream stm = fs.create(p);
+    	stm.write(1);
+    	stm.sync();
+    	stm.write(2);
+    	stm.close();
+
+    	// verify that entire file is good
+    	FSDataInputStream in = fs.open(p);
+    	assertEquals(1, in.read());
+    	assertEquals(2, in.read());
+    	in.close();
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+}