浏览代码

HDFS-673. BlockReceiver#PacketResponder should not remove a packet from the ack queue before its ack is sent. Contributed by Hairong Kuang.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@824552 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 年之前
父节点
当前提交
7d68dfbe2f
共有 2 个文件被更改,包括 17 次插入9 次删除
  1. 3 0
      CHANGES.txt
  2. 14 9
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

+ 3 - 0
CHANGES.txt

@@ -392,6 +392,9 @@ Release 0.21.0 - Unreleased
 
     HDFS-676. Fix NPE in FSDataset.updateReplicaUnderRecovery() (shv)
 
+    HDFS-673. BlockReceiver#PacketResponder should not remove a packet from
+    the ack queue before its ack is sent. (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

+ 14 - 9
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -517,6 +517,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
           }
           replicaInfo.setBytesOnDisk(offsetInBlock);
           datanode.myMetrics.bytesWritten.inc(len);
+          /// flush entire packet
+          flush();
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -524,9 +526,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
     }
 
-    /// flush entire packet before sending ack
-    flush();
-
     if (throttler != null) { // throttle I/O
       throttler.throttle(len);
     }
@@ -804,7 +803,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             if (!running || !datanode.shouldRun) {
               break;
             }
-            Packet pkt = ackQueue.removeFirst();
+            Packet pkt = ackQueue.getFirst();
             long expected = pkt.seqno;
             notifyAll();
             LOG.debug("PacketResponder " + numTargets +
@@ -837,6 +836,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             replyOut.writeLong(expected);
             SUCCESS.write(replyOut);
             replyOut.flush();
+            ackQueue.removeFirst();
             if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
               replicaInfo.setBytesAcked(pkt.lastByteInBlock);
             }
@@ -872,7 +872,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
 
       boolean lastPacketInBlock = false;
-      Packet pkt = null;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
@@ -880,6 +879,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
         try {
             DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
+            Packet pkt = null;
             long expected = -2;
             try { 
               // read seqno from downstream datanode
@@ -910,7 +910,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                       throw e;
                     }
                   }
-                  pkt = ackQueue.removeFirst();
+                  pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
                   notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
@@ -1001,12 +1001,17 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
               op.write(replyOut);
             }
             replyOut.flush();
+            
             LOG.debug("PacketResponder " + block + " " + numTargets + 
                       " responded other status " + " for seqno " + expected);
 
-            if (pkt != null && success && 
-                pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+            if (pkt != null) {
+              // remove the packet from the queue
+              ackQueue.removeFirst();
+              // update bytes acked
+              if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+              }
             }
             // If we were unable to read the seqno from downstream, then stop.
             if (expected == -2) {