Browse Source

HDFS-690. TestAppend2#testComplexAppend failed on "Too many open files". Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@828926 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 years ago
parent
commit
6dbf300ac9
2 changed files with 18 additions and 5 deletions
  1. 3 0
      CHANGES.txt
  2. 15 5
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

+ 3 - 0
CHANGES.txt

@@ -437,6 +437,9 @@ Release 0.21.0 - Unreleased
     HDFS-722. Fix callCreateBlockWriteStream pointcut in FSDatasetAspects.
     (szetszwo)
 
+    HDFS-690. TestAppend2#testComplexAppend failed on "Too many open files".
+    (hairong)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS

+ 15 - 5
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -816,7 +816,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             }
             Packet pkt = ackQueue.getFirst();
             long expected = pkt.seqno;
-            notifyAll();
             LOG.debug("PacketResponder " + numTargets +
                       " for block " + block + 
                       " acking for packet " + expected);
@@ -847,7 +846,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
             replyOut.writeLong(expected);
             SUCCESS.write(replyOut);
             replyOut.flush();
-            ackQueue.removeFirst();
+            // remove the packet from the ack queue
+            removeAckHead();
+            // update the bytes acked
             if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
               replicaInfo.setBytesAcked(pkt.lastByteInBlock);
             }
@@ -923,7 +924,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
@@ -1017,8 +1017,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
                       " responded other status " + " for seqno " + expected);
 
             if (pkt != null) {
-              // remove the packet from the queue
-              ackQueue.removeFirst();
+              // remove the packet from the ack queue
+              removeAckHead();
               // update bytes acked
               if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
                 replicaInfo.setBytesAcked(pkt.lastByteInBlock);
@@ -1057,6 +1057,16 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       LOG.info("PacketResponder " + numTargets + 
                " for block " + block + " terminating");
     }
+    
+    /**
+     * Remove a packet from the head of the ack queue
+     * 
+     * This should be called only when the ack queue is not empty
+     */
+    private synchronized void removeAckHead() {
+      ackQueue.removeFirst();
+      notifyAll();
+    }
   }
   
   /**