瀏覽代碼

HDFS-3622. Backport HDFS-3541 to branch-0.23 (bobby via daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1362150 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 13 年之前
父節點
當前提交
e58f368e00

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -64,6 +64,8 @@ Release 0.23.3 - UNRELEASED
     HDFS-3486. offlineimageviewer can't read fsimage files that contain
     persistent delegation tokens. (Colin Patrick McCabe via eli)
 
+    HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay via umamahesh)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -237,10 +237,10 @@ class BPOfferService implements Runnable {
   }
 
   private void connectToNNAndHandshake() throws IOException {
+    
     // get NN proxy
-    bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-          DatanodeProtocol.versionID, nnAddr, dn.getConf());
-
+    bpNamenode = dn.connectToNN(nnAddr);
+    
     // First phase of the handshake with NN - get the namespace
     // info.
     bpNSInfo = retrieveNamespaceInfo();

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -790,6 +790,7 @@ class BlockReceiver implements Closeable {
         try {
           responder.join();
         } catch (InterruptedException e) {
+          responder.interrupt();
           throw new IOException("Interrupted receiveBlock");
         }
         responder = null;
@@ -963,6 +964,7 @@ class BlockReceiver implements Closeable {
           wait();
         } catch (InterruptedException e) {
           running = false;
+          Thread.currentThread().interrupt();
         }
       }
       if(LOG.isDebugEnabled()) {

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -995,6 +995,15 @@ public class DataNode extends Configured
     dnId.storageID = createNewStorageId(dnId.getPort());
   }
   
+  
+  /**
+   * Connect to the NN. This is separated out for easier testing.
+   */
+  DatanodeProtocol connectToNN(InetSocketAddress nnAddr) throws IOException {
+    return (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+          DatanodeProtocol.versionID, nnAddr, conf);
+  }
+  
   static String createNewStorageId(int port) {
     /* Return 
      * "DS-randInt-ipaddr-currentTimeMillis"
@@ -1911,8 +1920,10 @@ public class DataNode extends Configured
    */
   public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
-    if(bpos == null || bpos.bpNamenode == null) {
-      throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
+    if (bpos == null) {
+      throw new IOException("No block pool offer service for bpid=" + bpid);
+    } else if (bpos.bpNamenode == null) {
+      throw new IOException("cannot find a namenode proxy for bpid=" + bpid);
     }
     return bpos.bpNamenode;
   }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -1793,6 +1793,10 @@ class FSDataset implements FSDatasetInterface {
    */
   @Override // FSDatasetInterface
   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+    if (Thread.interrupted()) {
+      // Don't allow data modifications from interrupted threads
+      throw new IOException("Cannot finalize block from Interrupted Thread");
+    }
     ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
       // this is legal, when recovery happens on a file that has

+ 134 - 24
hadoop-hdfs-project/hadoop-hdfs/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -18,47 +18,70 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * This tests if sync all replicas in block recovery works correctly
@@ -72,6 +95,8 @@ public class TestBlockRecovery {
   private final static long RECOVERY_ID = 3000L;
   private final static String CLUSTER_ID = "testClusterID";
   private final static String POOL_ID = "BP-TEST";
+  private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
+      "localhost", 5020);
   private final static long BLOCK_ID = 1000L;
   private final static long GEN_STAMP = 2000L;
   private final static long BLOCK_LEN = 3000L;
@@ -79,9 +104,6 @@ public class TestBlockRecovery {
   private final static long REPLICA_LEN2 = 5000L;
   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
-  
-  private final NamespaceInfo nsifno = 
-    new NamespaceInfo(1,CLUSTER_ID, POOL_ID, 2, 3);
 
   static {
     ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
@@ -99,21 +121,43 @@ public class TestBlockRecovery {
     conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    FileSystem.setDefaultUri(conf,
+        "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
     ArrayList<File> dirs = new ArrayList<File>();
     File dataDir = new File(DATA_DIR);
     FileUtil.fullyDelete(dataDir);
     dataDir.mkdirs();
     dirs.add(dataDir);
-    DatanodeProtocol namenode = mock(DatanodeProtocol.class);
+    final DatanodeProtocol namenode = mock(DatanodeProtocol.class);
+    
+    Mockito.doAnswer(new Answer<DatanodeRegistration>() {
+      @Override
+      public DatanodeRegistration answer(InvocationOnMock invocation)
+          throws Throwable {
+        return (DatanodeRegistration) invocation.getArguments()[0];
+      }
+    }).when(namenode).registerDatanode(
+        Mockito.any(DatanodeRegistration.class));
+    
     when(namenode.versionRequest()).thenReturn(new NamespaceInfo
         (1, CLUSTER_ID, POOL_ID, 1L, 1));
+    
     when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(), 
         anyLong(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt()))
         .thenReturn(new DatanodeCommand[0]);
-    dn = new DataNode(conf, dirs, null);
-    
-    DataNodeTestUtils.setBPNamenodeByIndex(dn, nsifno, POOL_ID, namenode);
+
+    dn = new DataNode(conf, dirs, null) {
+      @Override
+      DatanodeProtocol connectToNN(
+          InetSocketAddress nnAddr) throws IOException {
+        Assert.assertEquals(NN_ADDR, nnAddr);
+        return namenode;
+      }
+    };
+    dn.runDatanodeDaemon();
+    // Trigger a heartbeat so that it acknowledges the NN as active.
+    dn.getAllBpOs()[0].triggerHeartbeatForTests();
   }
 
   /**
@@ -142,7 +186,6 @@ public class TestBlockRecovery {
       InterDatanodeProtocol dn1,
       InterDatanodeProtocol dn2,
       long expectLen) throws IOException {
-    
     DatanodeInfo[] locs = new DatanodeInfo[]{
         mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
     RecoveringBlock rBlock = new RecoveringBlock(block, 
@@ -301,7 +344,7 @@ public class TestBlockRecovery {
     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
     testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
-    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
   }
   
   /**
@@ -325,7 +368,7 @@ public class TestBlockRecovery {
     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
     verify(dn2, never()).updateReplicaUnderRecovery(
-        block, RECOVERY_ID, REPLICA_LEN1);    
+        block, RECOVERY_ID, REPLICA_LEN1);
   }
   
   /**
@@ -350,7 +393,7 @@ public class TestBlockRecovery {
     testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
     
     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
-    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);    
+    verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
   }  
 
   private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
@@ -495,7 +538,8 @@ public class TestBlockRecovery {
     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
     BlockWriteStreams streams = null;
     try {
-      streams = replicaInfo.createStreams(true, 0, 0);
+      streams = replicaInfo.createStreams(true,
+          DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
       streams.checksumOut.write('a');
       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
       try {
@@ -512,4 +556,70 @@ public class TestBlockRecovery {
       streams.close();
     }
   }
+  
+  /**
+   * Test to verify the race between finalizeBlock and Lease recovery
+   * 
+   * @throws Exception
+   */
+  @Test(timeout = 20000)
+  public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
+    tearDown();// Stop the Mocked DN started in startup()
+
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .federation(false)
+        .nameNodePort(50070)
+        .nameNodeHttpPort(8020)
+        .numDataNodes(1).build();
+    try {
+      cluster.waitClusterUp();
+      FileSystem fs = cluster.getFileSystem();
+      Path path = new Path("/test");
+      FSDataOutputStream out = fs.create(path);
+      out.writeBytes("data");
+      out.hsync();
+      
+      List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
+      final LocatedBlock block = blocks.get(0);
+      final DataNode dataNode = cluster.getDataNodes().get(0);
+      
+      final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
+      Thread recoveryThread = new Thread() {
+        public void run() {
+          try {
+            DatanodeInfo[] locations = block.getLocations();
+            final RecoveringBlock recoveringBlock = new RecoveringBlock(
+                block.getBlock(), locations, block.getBlock()
+                    .getGenerationStamp() + 1);
+            synchronized (dataNode.data) {
+              Thread.sleep(2000);
+              dataNode.initReplicaRecovery(recoveringBlock);
+            }
+          } catch (Exception e) {
+            recoveryInitResult.set(false);
+          }
+        }
+      };
+      recoveryThread.start();
+      try {
+        out.close();
+      } catch (IOException e) {
+        Assert.assertTrue("Writing should fail",
+            e.getMessage().contains("are bad. Aborting..."));
+      } finally {
+        recoveryThread.join();
+      }
+      Assert.assertTrue("Recovery should be initiated successfully",
+          recoveryInitResult.get());
+      
+      dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
+          .getGenerationStamp() + 1, block.getBlockSize());
+    } finally {
+      if (null != cluster) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    }
+  }
 }