Browse Source

commit fb88642e8acee8dacb5c8146b3b70344f6021a84
Author: Hairong Kuang <hairong@ucdev21.inktomisearch.com>
Date: Sun Mar 21 01:55:11 2010 +0000

HDFS:101 from https://issues.apache.org/jira/secure/attachment/12439387/pipelineHeartbeat_yahoo.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077351 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
2e9546764d

+ 2 - 2
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -2553,7 +2553,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             // read an ack from the pipeline
             ack.readFields(blockReplyStream);
             if (LOG.isDebugEnabled()) {
-              LOG.debug("DFSClient " + ack);
+              LOG.debug("DFSClient for block " + block + " " + ack);
             }
             long seqno = ack.getSeqno();
             if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
@@ -2567,7 +2567,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               }
               if (one.seqno != seqno) {
                 throw new IOException("Responseprocessor: Expecting seqno " + 
-                                      " for block " + block +
+                                      " for block " + block + " " +
                                       one.seqno + " but received " + seqno);
               }
               lastPacketInBlock = one.lastPacketInBlock;

+ 25 - 22
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -873,18 +873,16 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             PipelineAck ack = new PipelineAck();
             long seqno = -2;
             try { 
-               if (!mirrorError) {
+              if (!mirrorError) {
                 // read an ack from downstream datanode
-                 ack.readFields(mirrorIn);
-                 if (LOG.isDebugEnabled()) {
-                   LOG.debug("PacketResponder " + numTargets + " got " + ack);
-                 }
-                 seqno = ack.getSeqno();
-               }
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut); // send keepalive
-                replyOut.flush();
-              } else if (seqno >= 0 || mirrorError) {
+                ack.readFields(mirrorIn);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block + " got " + ack);
+                }
+                seqno = ack.getSeqno();
+              }
+              if (seqno >= 0 || mirrorError) {
                 Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -963,20 +961,25 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               }
             }
 
-            // construct my ack message
-            short[] replies = null;
-            if (mirrorError) { // no ack is read
-              replies = new short[2];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+            PipelineAck replyAck;
+            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+              replyAck = ack;  // continue to send keep alive
             } else {
-              replies = new short[1+ack.getNumOfReplies()];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              for (int i=0; i<ack.getNumOfReplies(); i++) {
-                replies[i+1] = ack.getReply(i);
+              // construct my ack message
+              short[] replies = null;
+              if (mirrorError) { // no ack is read
+                replies = new short[2];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+              } else {
+                replies = new short[1+ack.getNumOfReplies()];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                for (int i=0; i<ack.getNumOfReplies(); i++) {
+                  replies[i+1] = ack.getReply(i);
+                }
               }
+              replyAck = new PipelineAck(expected, replies);
             }
-            PipelineAck replyAck = new PipelineAck(expected, replies);
  
             // send my ack back to upstream datanode
             replyAck.write(replyOut);