浏览代码

HADOOP-3006. Fix wrong packet size reported by DataNode when a block is being replicated. (rangadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@637995 13f79535-47bb-0310-9956-ffa450edef68
Raghu Angadi 17 年之前
父节点
当前提交
d7f2be7fe9
共有 2 个文件被更改,包括 30 次插入8 次删除
  1. 3 0
      CHANGES.txt
  2. 27 8
      src/java/org/apache/hadoop/dfs/DataNode.java

+ 3 - 0
CHANGES.txt

@@ -260,6 +260,9 @@ Trunk (unreleased changes)
     HADOOP-3008. SocketIOWithTimeout throws InterruptedIOException if the
     thread is interrupted while it is waiting. (rangadi)
     
+    HADOOP-3006. Fix wrong packet size reported by DataNode when a block
+    is being replicated. (rangadi)
+    
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

+ 27 - 8
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -2327,11 +2327,32 @@ public class DataNode implements FSConstants, Runnable {
                 " offsetInBlock " + offsetInBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
       setBlockPosition(offsetInBlock);
+      
+      int len = in.readInt();
+      curPacketSize += 4;            // read an integer in previous line
 
       // send packet header to next datanode in pipeline
       if (mirrorOut != null) {
         try {
-          mirrorOut.writeInt(packetSize);
+          int mirrorPacketSize = packetSize;
+          if (len > bytesPerChecksum) {
+            /* 
+             * This is a packet with non-interleaved checksum. 
+             * But we are sending interleaving checksums to mirror, 
+             * which changes packet len. Adjust the packet size for mirror.
+             * 
+             * As mentioned above, this is mismatch is 
+             * temporary till HADOOP-1702.
+             */
+            
+            //find out how many chunks are in this patcket :
+            int chunksInPkt = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+            
+            // we send 4 more bytes for for each of the extra 
+            // checksum chunks. so :
+            mirrorPacketSize += (chunksInPkt - 1) * 4;
+          }
+          mirrorOut.writeInt(mirrorPacketSize);
           mirrorOut.writeLong(offsetInBlock);
           mirrorOut.writeLong(seqno);
           mirrorOut.writeBoolean(lastPacketInBlock);
@@ -2357,9 +2378,6 @@ public class DataNode implements FSConstants, Runnable {
         }
       }
 
-      int len = in.readInt();
-      curPacketSize += 4;            // read an integer in previous line
-
       if (len == 0) {
         LOG.info("Receiving empty packet for block " + block);
         if (mirrorOut != null) {
@@ -2388,16 +2406,17 @@ public class DataNode implements FSConstants, Runnable {
           
           int toRecv = Math.min(len, bytesPerChecksum);
           
-          receiveChunk(toRecv, checksumBuf, checksumOff);
-          
-          len -= toRecv;
-          checksumOff += checksumSize;       
           curPacketSize += (toRecv + checksumSize);
           if (curPacketSize > packetSize) {
             throw new IOException("Packet size for block " + block +
                                   " too long " + curPacketSize +
                                   " was expecting " + packetSize);
           } 
+          
+          receiveChunk(toRecv, checksumBuf, checksumOff);
+          
+          len -= toRecv;
+          checksumOff += checksumSize;       
         }
         
         if (curPacketSize == packetSize) {