瀏覽代碼

HADOOP-2883. Write failures and data corruptions on HDFS files.
The write timeout is back to what it was on 0.15 release. Also, the
datnodes flushes the block file buffered output stream before
sending a positive ack for the packet back to the client. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@633779 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父節點
當前提交
5bfe5d2004
共有 3 個文件被更改,包括 50 次插入4 次删除
  1. 5 0
      CHANGES.txt
  2. 23 2
      src/java/org/apache/hadoop/dfs/DataNode.java
  3. 22 2
      src/java/org/apache/hadoop/dfs/FSDataset.java

+ 5 - 0
CHANGES.txt

@@ -242,6 +242,11 @@ Release 0.16.1 - Unreleased
     HADOOP-2931. IOException thrown by DFSOutputStream had wrong stack
     trace in some cases. (Michael Bieniosek via rangadi)
 
+    HADOOP-2883. Write failures and data corruptions on HDFS files.
+    The write timeout is back to what it was on 0.15 release. Also, the
+    datnodes flushes the block file buffered output stream before
+    sending a positive ack for the packet back to the client. (dhruba)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

+ 23 - 2
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -1097,7 +1097,7 @@ public class DataNode implements FSConstants, Runnable {
           mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
           mirrorSock = new Socket();
           try {
-            int timeoutValue = 3000 * numTargets + socketTimeout;
+            int timeoutValue = numTargets * socketTimeout;
             mirrorSock.connect(mirrorTarget, timeoutValue);
             mirrorSock.setSoTimeout(timeoutValue);
             mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -1898,6 +1898,11 @@ public class DataNode implements FSConstants, Runnable {
                          " from " + receiver.inAddr);
               }
               lastPacket = true;
+            } else {
+              // flush packet to disk before sending ack
+              if (!receiver.finalized) {
+                receiver.flush();
+              }
             }
 
             replyOut.writeLong(expected);
@@ -1982,6 +1987,12 @@ public class DataNode implements FSConstants, Runnable {
                        " of size " + block.getNumBytes() + 
                        " from " + receiver.inAddr);
             }
+            else if (!lastPacketInBlock) {
+              // flush packet to disk before sending ack
+              if (!receiver.finalized) {
+                receiver.flush();
+              }
+            }
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
@@ -2159,6 +2170,16 @@ public class DataNode implements FSConstants, Runnable {
       }
     }
 
+    // flush block data and metadata files to disk.
+    void flush() throws IOException {
+      if (checksumOut != null) {
+        checksumOut.flush();
+      }
+      if (out != null) {
+        out.flush();
+      }
+    }
+
     /* receive a chunk: write it to disk & mirror it to another stream */
     private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
                               throws IOException {
@@ -2481,7 +2502,7 @@ public class DataNode implements FSConstants, Runnable {
       if (checksumOut != null) {
         checksumOut.flush();
       }
-      LOG.info("Changing block file offset from " + 
+      LOG.info("Changing block file offset of block " + block + " from " + 
                data.getChannelPosition(block, streams) +
                " to " + offsetInBlock +
                " meta file offset to " + offsetInChecksum);

+ 22 - 2
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -330,6 +330,14 @@ class FSDataset implements FSConstants, FSDatasetInterface {
       return createTmpFile(b, f);
     }
 
+    /**
+     * Returns the name of the temporary file for this block.
+     */
+    File getTmpFile(Block b) throws IOException {
+      File f = new File(tmpDir, b.getBlockName());
+      return f;
+    }
+
     /**
      * Files used for copy-on-write. They need recovery when datanode
      * restarts.
@@ -747,6 +755,18 @@ class FSDataset implements FSConstants, FSDatasetInterface {
   public void setChannelPosition(Block b, BlockWriteStreams streams, 
                                  long dataOffset, long ckOffset) 
                                  throws IOException {
+    long size = 0;
+    synchronized (this) {
+      FSVolume vol = volumeMap.get(b).getVolume();
+      size = vol.getTmpFile(b).length();
+    }
+    if (size < dataOffset) {
+      String msg = "Trying to change block file offset of block " + b +
+                     " to " + dataOffset +
+                     " but actual size of file is " +
+                     size;
+      throw new IOException(msg);
+    }
     FileOutputStream file = (FileOutputStream) streams.dataOut;
     file.getChannel().position(dataOffset);
     file = (FileOutputStream) streams.checksumOut;
@@ -762,7 +782,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
     }
     return vol.createTmpFile(blk);
   }
-  
+
   //
   // REMIND - mjc - eventually we should have a timeout system
   // in place to clean up block files left by abandoned clients.
@@ -898,7 +918,7 @@ class FSDataset implements FSConstants, FSDatasetInterface {
   }
 
   /**
-   * check if a data diretory is healthy
+   * check if a data directory is healthy
    * @throws DiskErrorException
    */
   public void checkDataDir() throws DiskErrorException {