Explorar el Código

HADOOP-3033. The BlockReceiver thread in the datanode writes data to
the block file, changes file position (if needed) and flushes all by
itself. The PacketResponder thread does not flush block file. (dhruba)
svn merge -c 638716 from trunk.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.16@638718 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur hace 17 años
padre
commit
a3f51dca8d
Se han modificado 2 ficheros con 17 adiciones y 48 borrados
  1. 4 0
      CHANGES.txt
  2. 13 48
      src/java/org/apache/hadoop/dfs/DataNode.java

+ 4 - 0
CHANGES.txt

@@ -7,6 +7,10 @@ Release 0.16.2 - Unreleased
     HADOOP-3011. Prohibit distcp from overwriting directories on the
     destination filesystem with files. (cdouglas)
 
+    HADOOP-3033. The BlockReceiver thread in the datanode writes data to 
+    the block file, changes file position (if needed) and flushes all by
+    itself. The PacketResponder thread does not flush block file. (dhruba)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

+ 13 - 48
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -1777,11 +1777,6 @@ public class DataNode implements FSConstants, Runnable {
                          " from " + receiver.inAddr);
               }
               lastPacket = true;
-            } else {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
             }
 
             replyOut.writeLong(expected);
@@ -1830,6 +1825,15 @@ public class DataNode implements FSConstants, Runnable {
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + seqno);
                 Packet pkt = null;
                 synchronized (this) {
+                  while (running && shouldRun && ackQueue.size() == 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
+                                " waiting for local datanode to finish write.");
+                    }
+                    wait();
+                  }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   notifyAll();
@@ -1866,12 +1870,6 @@ public class DataNode implements FSConstants, Runnable {
                        " of size " + block.getNumBytes() + 
                        " from " + receiver.inAddr);
             }
-            else if (!lastPacketInBlock) {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
-            }
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
@@ -1969,8 +1967,6 @@ public class DataNode implements FSConstants, Runnable {
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private String clientName;
-    private Object currentWriteLock;
-    volatile private boolean currentWrite;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
                   boolean isRecovery, String clientName)
@@ -1982,8 +1978,6 @@ public class DataNode implements FSConstants, Runnable {
         this.isRecovery = isRecovery;
         this.clientName = clientName;
         this.offsetInBlock = 0;
-        this.currentWriteLock = new Object();
-        this.currentWrite = false;
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
@@ -2009,18 +2003,6 @@ public class DataNode implements FSConstants, Runnable {
     // close files
     public void close() throws IOException {
 
-      synchronized (currentWriteLock) {
-        while (currentWrite) {
-          try {
-            LOG.info("BlockReceiver for block " + block +
-                     " waiting for last write to drain.");
-            currentWriteLock.wait();
-          } catch (InterruptedException e) {
-            throw new IOException("BlockReceiver for block " + block +
-                                  " interrupted drain of last io.");
-          }
-        }
-      }
       IOException ioe = null;
       // close checksum file
       try {
@@ -2089,11 +2071,6 @@ public class DataNode implements FSConstants, Runnable {
       }
 
       checksum.reset();
-
-      // record the fact that the current write is still in progress
-      synchronized (currentWriteLock) {
-        currentWrite = true;
-      }
       offsetInBlock += len;
 
       // First write to remote node before writing locally.
@@ -2113,10 +2090,6 @@ public class DataNode implements FSConstants, Runnable {
           // recovery.
           //
           if (clientName.length() > 0) {
-            synchronized (currentWriteLock) {
-              currentWrite = false;
-              currentWriteLock.notifyAll();
-            }
             throw ioe;
           }
         }
@@ -2132,11 +2105,6 @@ public class DataNode implements FSConstants, Runnable {
       } catch (IOException iex) {
         checkDiskError(iex);
         throw iex;
-      } finally {
-        synchronized (currentWriteLock) {
-          currentWrite = false;
-          currentWriteLock.notifyAll();
-        }
       }
 
       if (throttler != null) { // throttle I/O
@@ -2184,12 +2152,6 @@ public class DataNode implements FSConstants, Runnable {
             throw e;
           }
         }
-        // first enqueue the ack packet to avoid a race with the response coming
-        // from downstream datanode.
-        if (responder != null) {
-          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
-                                          lastPacketInBlock); 
-        }
       }
 
       if (len == 0) {
@@ -2220,8 +2182,11 @@ public class DataNode implements FSConstants, Runnable {
         curPacketSize += 4;
       }
 
+      /// flush entire packet before sending ack
+      flush();
+
       // put in queue for pending acks
-      if (responder != null && mirrorOut == null) {
+      if (responder != null) {
         ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                         lastPacketInBlock); 
       }