Browse Source

HADOOP-3673. Avoid deadlock caused by DataNode RPC receoverBlock().
(Tsz Wo (Nicholas), SZE via rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@674910 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 years ago
parent
commit
8667309c87

+ 3 - 0
CHANGES.txt

@@ -174,6 +174,9 @@ Release 0.18.0 - Unreleased
     a RawComparator for NullWritable and permit it to be written as a key
     to SequenceFiles. (cdouglas)
 
+    HADOOP-3673. Avoid deadlock caused by DataNode RPC receoverBlock().
+    (Tsz Wo (Nicholas), SZE via rangadi)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

+ 6 - 4
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -2183,10 +2183,12 @@ public class DFSClient implements FSConstants {
         // by stamping appropriate generation stamps.
         //
         Block newBlock = null;
-        ClientDatanodeProtocol datanodeRPC =  null;
+        ClientDatanodeProtocol primary =  null;
         try {
-          datanodeRPC = createClientDatanodeProtocolProxy(newnodes[0], conf);
-          newBlock = datanodeRPC.recoverBlock(block, newnodes);
+          // Pick the "least" datanode as the primary datanode to avoid deadlock.
+          primary = createClientDatanodeProtocolProxy(
+              Collections.min(Arrays.asList(newnodes)), conf);
+          newBlock = primary.recoverBlock(block, newnodes);
         } catch (IOException e) {
           recoveryErrorCount++;
           if (recoveryErrorCount > maxRecoveryErrorCount) {
@@ -2206,7 +2208,7 @@ public class DFSClient implements FSConstants {
                    " times. Will retry...");
           return true;          // sleep when we return from here
         } finally {
-          RPC.stopProxy(datanodeRPC);
+          RPC.stopProxy(primary);
         }
         recoveryErrorCount = 0; // block recovery successful
 

+ 4 - 4
src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableComparable;
  * which it currently represents.
  * 
  */
-public class DatanodeID implements WritableComparable {
+public class DatanodeID implements WritableComparable<DatanodeID> {
   public static final DatanodeID[] EMPTY_ARRAY = {}; 
 
   public String name;      /// hostname:portNumber
@@ -158,11 +158,11 @@ public class DatanodeID implements WritableComparable {
     
   /** Comparable.
    * Basis of compare is the String name (host:portNumber) only.
-   * @param o
+   * @param that
    * @return as specified by Comparable.
    */
-  public int compareTo(Object o) {
-    return name.compareTo(((DatanodeID)o).getName());
+  public int compareTo(DatanodeID that) {
+    return name.compareTo(that.getName());
   }
 
   /////////////////////////////////////////////////

+ 2 - 2
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -3113,7 +3113,7 @@ public class DataNode extends Configured
   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
       public void run() {
-        LeaseManager.recoverBlocks(blocks, targets, namenode, getConf());
+        LeaseManager.recoverBlocks(blocks, targets, DataNode.this, namenode, getConf());
       }
     });
     d.start();
@@ -3151,7 +3151,7 @@ public class DataNode extends Configured
   public Block recoverBlock(Block block, DatanodeInfo[] targets
       ) throws IOException {
     LOG.info("Client invoking recoverBlock for block " + block);
-    return LeaseManager.recoverBlock(block, targets, namenode, 
+    return LeaseManager.recoverBlock(block, targets, this, namenode, 
                                      getConf(), false);
   }
 }

+ 3 - 30
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.CheckpointStates;
 import org.apache.hadoop.hdfs.protocol.FSConstants.StartupOption;
 import org.apache.hadoop.hdfs.protocol.FSConstants.NodeType;
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
@@ -1258,35 +1258,8 @@ public class FSImage extends Storage {
    * DatanodeImage is used to store persistent information
    * about datanodes into the fsImage.
    */
-  static class DatanodeImage implements WritableComparable {
-    DatanodeDescriptor              node;
-
-    DatanodeImage() {
-      node = new DatanodeDescriptor();
-    }
-
-    DatanodeImage(DatanodeDescriptor from) {
-      node = from;
-    }
-
-    /** 
-     * Returns the underlying Datanode Descriptor
-     */
-    DatanodeDescriptor getDatanodeDescriptor() { 
-      return node; 
-    }
-
-    public int compareTo(Object o) {
-      return node.compareTo(o);
-    }
-
-    public boolean equals(Object o) {
-      return node.equals(o);
-    }
-
-    public int hashCode() {
-      return node.hashCode();
-    }
+  static class DatanodeImage implements Writable {
+    DatanodeDescriptor node = new DatanodeDescriptor();
 
     /////////////////////////////////////////////////
     // Writable

+ 5 - 5
src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -414,10 +414,10 @@ public class LeaseManager {
    * This method is invoked by the primary datanode.
    */
   public static void recoverBlocks(Block[] blocks, DatanodeID[][] targets,
-      DatanodeProtocol namenode, Configuration conf) {
+      DataNode primary, DatanodeProtocol namenode, Configuration conf) {
     for(int i = 0; i < blocks.length; i++) {
       try {
-        recoverBlock(blocks[i], targets[i], namenode, conf, true);
+        recoverBlock(blocks[i], targets[i], primary, namenode, conf, true);
       } catch (IOException e) {
         LOG.warn("recoverBlocks, i=" + i, e);
       }
@@ -426,7 +426,7 @@ public class LeaseManager {
 
   /** Recover a block */
   public static Block recoverBlock(Block block, DatanodeID[] datanodeids,
-      DatanodeProtocol namenode, Configuration conf,
+      DataNode primary, DatanodeProtocol namenode, Configuration conf,
       boolean closeFile) throws IOException {
 
     // If the block is already being recovered, then skip recovering it.
@@ -455,8 +455,8 @@ public class LeaseManager {
       //check generation stamps
       for(DatanodeID id : datanodeids) {
         try {
-          InterDatanodeProtocol datanode
-              = DataNode.createInterDataNodeProtocolProxy(id, conf);
+          InterDatanodeProtocol datanode = primary.dnRegistration.equals(id)?
+              primary: DataNode.createInterDataNodeProtocolProxy(id, conf);
           BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
           if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
             syncList.add(new BlockRecord(id, datanode, new Block(info)));

+ 96 - 1
src/test/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -47,7 +47,7 @@ public class TestFileCreation extends junit.framework.TestCase {
   static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
 
   {
-    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
   }
@@ -696,4 +696,99 @@ public class TestFileCreation extends junit.framework.TestCase {
 
     System.out.println("testLeaseExpireHardLimit successful");
   }
+
+  /** Test lease recovery Triggered by DFSClient. */
+  public void testClientTriggeredLeaseRecovery() throws Exception {
+    final int REPLICATION = 3;
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      final Path dir = new Path("/wrwelkj");
+      
+      SlowWriter[] slowwriters = new SlowWriter[10];
+      for(int i = 0; i < slowwriters.length; i++) {
+        slowwriters[i] = new SlowWriter(fs, new Path(dir, "file" + i));
+      }
+
+      try {
+        for(int i = 0; i < slowwriters.length; i++) {
+          slowwriters[i].start();
+        }
+
+        //stop a datanode, it should have least recover.
+        cluster.stopDataNode(new Random().nextInt(REPLICATION));
+        
+        //let the slow writer writes a few more seconds
+        System.out.println("Wait a few seconds");
+        Thread.sleep(5000);
+      }
+      finally {
+        for(int i = 0; i < slowwriters.length; i++) {
+          if (slowwriters[i] != null) {
+            slowwriters[i].interrupt();
+          }
+        }
+        for(int i = 0; i < slowwriters.length; i++) {
+          if (slowwriters[i] != null) {
+            slowwriters[i].join();
+          }
+        }
+      }
+
+      //Verify the file
+      System.out.println("Verify the file");
+      for(int i = 0; i < slowwriters.length; i++) {
+        System.out.println(slowwriters[i].filepath + ": length="
+            + fs.getFileStatus(slowwriters[i].filepath).getLen());
+        FSDataInputStream in = null;
+        try {
+          in = fs.open(slowwriters[i].filepath);
+          for(int j = 0, x; (x = in.read()) != -1; j++) {
+            assertEquals(j, x);
+          }
+        }
+        finally {
+          IOUtils.closeStream(in);
+        }
+      }
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  static class SlowWriter extends Thread {
+    final FileSystem fs;
+    final Path filepath;
+    
+    SlowWriter(FileSystem fs, Path filepath) {
+      super(SlowWriter.class.getSimpleName() + ":" + filepath);
+      this.fs = fs;
+      this.filepath = filepath;
+    }
+
+    public void run() {
+      FSDataOutputStream out = null;
+      int i = 0;
+      try {
+        out = fs.create(filepath);
+        for(; ; i++) {
+          System.out.println(getName() + " writes " + i);
+          out.write(i);
+          out.sync();
+          sleep(100);
+        }
+      }
+      catch(Exception e) {
+        System.out.println(getName() + " dies: e=" + e);
+      }
+      finally {
+        System.out.println(getName() + ": i=" + i);
+        IOUtils.closeStream(out);
+      }
+    }        
+  }
 }