Browse Source

HDFS-7009. Active NN and standby NN have different live nodes. Contributed by Ming Ma.

cnauroth 10 years ago
parent
commit
769507bd7a

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -25,6 +25,7 @@ import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -280,7 +281,7 @@ public class Client {
   /** Check the rpc response header. */
   void checkResponse(RpcResponseHeaderProto header) throws IOException {
     if (header == null) {
-      throw new IOException("Response is null.");
+      throw new EOFException("Response is null.");
     }
     if (header.hasClientId()) {
       // check client IDs

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1006,6 +1006,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7813. TestDFSHAAdminMiniCluster#testFencer testcase is failing
     frequently. (Rakesh R via cnauroth)
 
+    HDFS-7009. Active NN and standby NN have different live nodes.
+    (Ming Ma via cnauroth)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 233 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java

@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This tests DatanodeProtocol retry policy
+ */
+public class TestDatanodeProtocolRetryPolicy {
+  private static final Log LOG = LogFactory.getLog(
+      TestDatanodeProtocolRetryPolicy.class);
+  private static final String DATA_DIR =
+      MiniDFSCluster.getBaseDirectory() + "data";
+  private DataNode dn;
+  private Configuration conf;
+  private boolean tearDownDone;
+  ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
+  private final static String CLUSTER_ID = "testClusterID";
+  private final static String POOL_ID = "BP-TEST";
+  private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
+      "localhost", 5020);
+  private static DatanodeRegistration datanodeRegistration =
+      DFSTestUtil.getLocalDatanodeRegistration();
+
+  static {
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /**
+   * Starts an instance of DataNode
+   * @throws IOException
+   */
+  @Before
+  public void startUp() throws IOException, URISyntaxException {
+    tearDownDone = false;
+    conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
+    conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    FileSystem.setDefaultUri(conf,
+        "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
+    File dataDir = new File(DATA_DIR);
+    FileUtil.fullyDelete(dataDir);
+    dataDir.mkdirs();
+    StorageLocation location = StorageLocation.parse(dataDir.getPath());
+    locations.add(location);
+  }
+
+  /**
+   * Cleans the resources and closes the instance of datanode
+   * @throws IOException if an error occurred
+   */
+  @After
+  public void tearDown() throws IOException {
+    if (!tearDownDone && dn != null) {
+      try {
+        dn.shutdown();
+      } catch(Exception e) {
+        LOG.error("Cannot close: ", e);
+      } finally {
+        File dir = new File(DATA_DIR);
+        if (dir.exists())
+          Assert.assertTrue(
+              "Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
+      }
+      tearDownDone = true;
+    }
+  }
+
+  private void waitForBlockReport(
+      final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          Mockito.verify(mockNN).blockReport(
+              Mockito.eq(datanodeRegistration),
+              Mockito.eq(POOL_ID),
+              Mockito.<StorageBlockReport[]>anyObject());
+          return true;
+        } catch (Throwable t) {
+          LOG.info("waiting on block report: " + t.getMessage());
+          return false;
+        }
+      }
+    }, 500, 100000);
+  }
+
+  /**
+   * Verify the following scenario.
+   * 1. The initial DatanodeProtocol.registerDatanode succeeds.
+   * 2. DN starts heartbeat process.
+   * 3. In the first heartbeat, NN asks DN to reregister.
+   * 4. DN calls DatanodeProtocol.registerDatanode.
+   * 5. DatanodeProtocol.registerDatanode throws EOFException.
+   * 6. DN retries.
+   * 7. DatanodeProtocol.registerDatanode succeeds.
+   */
+  @Test(timeout = 60000)
+  public void testDatanodeRegistrationRetry() throws Exception {
+    final DatanodeProtocolClientSideTranslatorPB namenode =
+        mock(DatanodeProtocolClientSideTranslatorPB.class);
+
+    Mockito.doAnswer(new Answer<DatanodeRegistration>() {
+      int i = 0;
+      @Override
+      public DatanodeRegistration answer(InvocationOnMock invocation)
+          throws Throwable {
+        i++;
+        if ( i > 1 && i < 5) {
+          LOG.info("mockito exception " + i);
+          throw new EOFException("TestDatanodeProtocolRetryPolicy");
+        } else {
+          DatanodeRegistration dr =
+              (DatanodeRegistration) invocation.getArguments()[0];
+          datanodeRegistration.setDatanodeUuidForTesting(dr.getDatanodeUuid());
+          LOG.info("mockito succeeded " + datanodeRegistration);
+          return datanodeRegistration;
+        }
+      }
+    }).when(namenode).registerDatanode(
+        Mockito.any(DatanodeRegistration.class));
+
+    when(namenode.versionRequest()).thenReturn(
+        new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L));
+
+    Mockito.doAnswer(new Answer<HeartbeatResponse>() {
+      int i = 0;
+      @Override
+      public HeartbeatResponse answer(InvocationOnMock invocation)
+          throws Throwable {
+        i++;
+        HeartbeatResponse heartbeatResponse;
+        if ( i == 1 ) {
+          LOG.info("mockito heartbeatResponse registration " + i);
+          heartbeatResponse = new HeartbeatResponse(
+              new DatanodeCommand[]{RegisterCommand.REGISTER},
+              new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
+              null);
+        } else {
+          LOG.info("mockito heartbeatResponse " + i);
+          heartbeatResponse = new HeartbeatResponse(
+              new DatanodeCommand[0],
+              new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
+              null);
+        }
+        return heartbeatResponse;
+      }
+    }).when(namenode).sendHeartbeat(
+           Mockito.any(DatanodeRegistration.class),
+           Mockito.any(StorageReport[].class),
+           Mockito.anyLong(),
+           Mockito.anyLong(),
+           Mockito.anyInt(),
+           Mockito.anyInt(),
+           Mockito.anyInt(),
+           Mockito.any(VolumeFailureSummary.class));
+
+    dn = new DataNode(conf, locations, null) {
+      @Override
+      DatanodeProtocolClientSideTranslatorPB connectToNN(
+          InetSocketAddress nnAddr) throws IOException {
+        Assert.assertEquals(NN_ADDR, nnAddr);
+        return namenode;
+      }
+    };
+
+    // Trigger a heartbeat so that it acknowledges the NN as active.
+    dn.getAllBpOs()[0].triggerHeartbeatForTests();
+
+    waitForBlockReport(namenode);
+  }
+}