浏览代码

HADOOP-3339. Some of the failures on 3rd datanode in DFS write pipelie
are not detected properly. This could lead to hard failure of client's
write operation. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@657903 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 年之前
父节点
当前提交
04d678ed8f
共有 3 个文件被更改,包括 22 次插入4 次删除
  1. 4 0
      CHANGES.txt
  2. 4 1
      src/java/org/apache/hadoop/dfs/DFSClient.java
  3. 14 3
      src/java/org/apache/hadoop/dfs/DataNode.java

+ 4 - 0
CHANGES.txt

@@ -278,6 +278,10 @@ Trunk (unreleased changes)
     HADOOP-3396. TestDatanodeBlockScanner occationally fails. 
     (Lohit Vijayarenu via rangadi)
 
+    HADOOP-3339. Some of the failures on 3rd datanode in DFS write pipelie 
+    are not detected properly. This could lead to hard failure of client's
+    write operation. (rangadi)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 4 - 1
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -72,6 +72,7 @@ class DFSClient implements FSConstants {
   private short defaultReplication;
   private SocketFactory socketFactory;
   private int socketTimeout;
+  private int datanodeWriteTimeout;
   final int writePacketSize;
   private FileSystem.Statistics stats;
     
@@ -145,6 +146,8 @@ class DFSClient implements FSConstants {
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
                                      FSConstants.READ_TIMEOUT);
+    this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
+                                            FSConstants.WRITE_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
@@ -2279,7 +2282,7 @@ class DFSClient implements FSConstants {
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
         LOG.debug("Send buf size " + s.getSendBufferSize());
         long writeTimeout = WRITE_TIMEOUT_EXTENSION * nodes.length +
-                            WRITE_TIMEOUT;
+                            datanodeWriteTimeout;
 
         //
         // Xmit header info to datanode

+ 14 - 3
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -2137,12 +2137,23 @@ public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
                 LOG.info("PacketResponder " + block + " " + numTargets + 
                          " Exception " + StringUtils.stringifyException(e));
                 running = false;
-                if (!didRead) {
-                  op = OP_STATUS_ERROR;
-                }
               }
             }
 
+            if (Thread.interrupted()) {
+              /* The receiver thread cancelled this thread. 
+               * We could also check any other status updates from the 
+               * receiver thread (e.g. if it is ok to write to replyOut). 
+               */
+              LOG.info("PacketResponder " + block +  " " + numTargets +
+                       " : Thread is interrupted.");
+              running = false;
+            }
+            
+            if (!didRead) {
+              op = OP_STATUS_ERROR;
+            }
+            
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock && !receiver.finalized) {