瀏覽代碼

Revert the change that HDFS-793 made to branch 0.20.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@899508 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 年之前
父節點
當前提交
0169d19cc5

+ 0 - 3
CHANGES.txt

@@ -70,9 +70,6 @@ Release 0.20.2 - Unreleased
     MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
     MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
     configured threshold. (cdouglas)
     configured threshold. (cdouglas)
 
 
-    HDFS-793. Data node should recceive the whole packet ack message before
-    it constructs and sends its own ack message for the packet. (hairong)
-
     HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress:
     HADOOP-6386. NameNode's HttpServer can't instantiate InetSocketAddress:
     IllegalArgumentException is thrown. (cos)
     IllegalArgumentException is thrown. (cos)
 
 

+ 6 - 11
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -2397,18 +2396,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       public void run() {
       public void run() {
 
 
         this.setName("ResponseProcessor for block " + block);
         this.setName("ResponseProcessor for block " + block);
-        PipelineAck ack = new PipelineAck();
   
   
         while (!closed && clientRunning && !lastPacketInBlock) {
         while (!closed && clientRunning && !lastPacketInBlock) {
           // process responses from datanodes.
           // process responses from datanodes.
           try {
           try {
-            // read an ack from the pipeline
-            ack.readFields(blockReplyStream);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DFSClient " + ack);
-            }
-            long seqno = ack.getSeqno();
-            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+            // verify seqno from datanode
+            long seqno = blockReplyStream.readLong();
+            LOG.debug("DFSClient received ack for seqno " + seqno);
+            if (seqno == -1) {
               continue;
               continue;
             } else if (seqno == -2) {
             } else if (seqno == -2) {
               // no nothing
               // no nothing
@@ -2426,8 +2421,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             }
             }
 
 
             // processes response status from all datanodes.
             // processes response status from all datanodes.
-            for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
-              short reply = ack.getReply(i);
+            for (int i = 0; i < targets.length && clientRunning; i++) {
+              short reply = blockReplyStream.readShort();
               if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
               if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
                 errorIndex = i; // first bad datanode
                 errorIndex = i; // first bad datanode
                 throw new IOException("Bad response " + reply +
                 throw new IOException("Bad response " + reply +

+ 8 - 100
src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -17,11 +17,6 @@
  */
  */
 package org.apache.hadoop.hdfs.protocol;
 package org.apache.hadoop.hdfs.protocol;
 
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
 
 
 /**
 /**
  * 
  * 
@@ -36,11 +31,15 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    * when protocol changes. It is not very obvious. 
    */
    */
   /*
   /*
-   * Version 18:
-   *    Change the block packet ack protocol to include seqno,
-   *    numberOfReplies, reply0, reply1, ...
+   * Version 14:
+   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
+   *    including the block id, source, and proxy.
+   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
+   *    only the block id.
+   *    A reply to OP_COPY_BLOCK sends the block content.
+   *    A reply to OP_REPLACE_BLOCK includes an operation status.
    */
    */
-  public static final int DATA_TRANSFER_VERSION = 18;
+  public static final int DATA_TRANSFER_VERSION = 14;
 
 
   // Processed at datanode stream-handler
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
   public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -57,97 +56,6 @@ public interface DataTransferProtocol {
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
   public static final int OP_STATUS_CHECKSUM_OK = 5;  
   public static final int OP_STATUS_CHECKSUM_OK = 5;  
 
 
-  /** reply **/
-  public static class PipelineAck implements Writable {
-    private long seqno;
-    private short replies[];
-    final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new short[0]);
-
-    /** default constructor **/
-    public PipelineAck() {
-    }
-
-    /**
-     * Constructor
-     * @param seqno sequence number
-     * @param replies an array of replies
-     */
-    public PipelineAck(long seqno, short[] replies) {
-      this.seqno = seqno;
-      this.replies = replies;
-    }
-
-    /**
-     * Get the sequence number
-     * @return the sequence number
-     */
-    public long getSeqno() {
-      return seqno;
-    }
-
-    /**
-     * Get the number of replies
-     * @return the number of replies
-     */
-    public short getNumOfReplies() {
-      return (short)replies.length;
-    }
-
-    /**
-     * get the ith reply
-     * @return the the ith reply
-     */
-    public short getReply(int i) {
-      return replies[i];
-    }
-
-    /**
-     * Check if this ack contains error status
-     * @return true if all statuses are SUCCESS
-     */
-    public boolean isSuccess() {
-      for (short reply : replies) {
-        if (reply != OP_STATUS_SUCCESS) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    /**** Writable interface ****/
-    @Override // Writable
-    public void readFields(DataInput in) throws IOException {
-      seqno = in.readLong();
-      short numOfReplies = in.readShort();
-      replies = new short[numOfReplies];
-      for (int i=0; i<numOfReplies; i++) {
-        replies[i] = in.readShort();
-      }
-    }
 
 
-    @Override // Writable
-    public void write(DataOutput out) throws IOException {
-      //WritableUtils.writeVLong(out, seqno);
-      out.writeLong(seqno);
-      out.writeShort((short)replies.length);
-      for(short reply : replies) {
-        out.writeShort(reply);
-      }
-    }
 
 
-    @Override //Object
-    public String toString() {
-      StringBuilder ack = new StringBuilder("Replies for seqno ");
-      ack.append( seqno ).append( " are" );
-      for(short reply : replies) {
-        ack.append(" ");
-        if (reply == OP_STATUS_SUCCESS) {
-          ack.append("SUCCESS");
-        } else {
-          ack.append("FAILED");
-        }
-      }
-      return ack.toString();
-    }
-  }
 }
 }

+ 50 - 40
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
@@ -774,13 +773,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               // send a heartbeat if it is time.
               // send a heartbeat if it is time.
               now = System.currentTimeMillis();
               now = System.currentTimeMillis();
               if (now - lastHeartbeat > datanode.socketTimeout/2) {
               if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
+                replyOut.writeLong(-1); // send heartbeat
                 replyOut.flush();
                 replyOut.flush();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets +
-                            " for block " + block + 
-                            " sent a heartbeat");
-                }
                 lastHeartbeat = now;
                 lastHeartbeat = now;
               }
               }
             }
             }
@@ -820,8 +814,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               lastPacket = true;
               lastPacket = true;
             }
             }
 
 
-            new PipelineAck(expected, new short[]{
-                DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
+            replyOut.writeLong(expected);
+            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
             replyOut.flush();
             replyOut.flush();
         } catch (Exception e) {
         } catch (Exception e) {
           if (running) {
           if (running) {
@@ -851,21 +845,23 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       while (running && datanode.shouldRun && !lastPacketInBlock) {
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
 
         try {
         try {
+            short op = DataTransferProtocol.OP_STATUS_SUCCESS;
             boolean didRead = false;
             boolean didRead = false;
             long expected = -2;
             long expected = -2;
-            PipelineAck ack = new PipelineAck();
             try { 
             try { 
-              // read an ack from downstream datanode
-              ack.readFields(mirrorIn);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("PacketResponder " + numTargets + " got " + ack);
-              }
-              long seqno = ack.getSeqno();
+              // read seqno from downstream datanode
+              long seqno = mirrorIn.readLong();
               didRead = true;
               didRead = true;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut); // send keepalive
+              if (seqno == -1) {
+                replyOut.writeLong(-1); // send keepalive
                 replyOut.flush();
                 replyOut.flush();
-              } else if (seqno >= 0) {
+                LOG.debug("PacketResponder " + numTargets + " got -1");
+                continue;
+              } else if (seqno == -2) {
+                LOG.debug("PacketResponder " + numTargets + " got -2");
+              } else {
+                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
+                    seqno);
                 Packet pkt = null;
                 Packet pkt = null;
                 synchronized (this) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -880,6 +876,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                   pkt = ackQueue.removeFirst();
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   expected = pkt.seqno;
                   notifyAll();
                   notifyAll();
+                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
                   if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " for block " + block +
@@ -912,6 +909,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               continue;
               continue;
             }
             }
             
             
+            if (!didRead) {
+              op = DataTransferProtocol.OP_STATUS_ERROR;
+            }
+            
             // If this is the last packet in block, then close block
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {
             if (lastPacketInBlock && !receiver.finalized) {
@@ -934,34 +935,43 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               }
               }
             }
             }
 
 
-            // construct my ack message
-            short[] replies = null;
-            if (!didRead) { // no ack is read
-              replies = new short[2];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
-            } else {
-              replies = new short[1+ack.getNumOfReplies()];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              for (int i=0; i<ack.getNumOfReplies(); i++) {
-                replies[i+1] = ack.getReply(i);
+            // send my status back to upstream datanode
+            replyOut.writeLong(expected); // send seqno upstream
+            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+
+            LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block +
+                      " responded my status " +
+                      " for seqno " + expected);
+
+            // forward responses from downstream datanodes.
+            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
+              try {
+                if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
+                  op = mirrorIn.readShort();
+                  if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                    LOG.debug("PacketResponder for block " + block +
+                              ": error code received from downstream " +
+                              " datanode[" + i + "] " + op);
+                  }
+                }
+              } catch (Throwable e) {
+                op = DataTransferProtocol.OP_STATUS_ERROR;
               }
               }
+              replyOut.writeShort(op);
             }
             }
-            PipelineAck replyAck = new PipelineAck(expected, replies);
- 
-            // send my ack back to upstream datanode
-            replyAck.write(replyOut);
             replyOut.flush();
             replyOut.flush();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("PacketResponder " + numTargets +
-                        " for block " + block +
-                        " responded an ack: " + replyAck);
-            }
+            LOG.debug("PacketResponder " + block + " " + numTargets + 
+                      " responded other status " + " for seqno " + expected);
 
 
+            // If we were unable to read the seqno from downstream, then stop.
+            if (expected == -2) {
+              running = false;
+            }
             // If we forwarded an error response from a downstream datanode
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
             // client will drive the recovery mechanism.
-            if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
+            if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
               running = false;
               running = false;
             }
             }
         } catch (IOException e) {
         } catch (IOException e) {

+ 3 - 3
src/test/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -250,9 +250,9 @@ public class TestDataTransferProtocol extends TestCase {
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
     sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
     //ok finally write a block with 0 len
-    Text.writeString(recvOut, "");
-    new DataTransferProtocol.PipelineAck(100, 
-        new short[]{DataTransferProtocol.OP_STATUS_SUCCESS}).write(recvOut);
+    Text.writeString(recvOut, ""); // first bad node
+    recvOut.writeLong(100);        // sequencenumber
+    recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     
     /* Test OP_READ_BLOCK */
     /* Test OP_READ_BLOCK */