|
@@ -0,0 +1,316 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.hdfs.server.datanode;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+
|
|
|
+/**
|
|
|
+ * This tests if sync all replicas in block recovery works correctly
|
|
|
+ */
|
|
|
+public class TestBlockRecovery {
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
|
|
|
+ private static final String DATA_DIR =
|
|
|
+ MiniDFSCluster.getBaseDirectory() + "data";
|
|
|
+ private DataNode dn;
|
|
|
+ private Configuration conf;
|
|
|
+ private final static long RECOVERY_ID = 3000L;
|
|
|
+ private final static long BLOCK_ID = 1000L;
|
|
|
+ private final static long GEN_STAMP = 2000L;
|
|
|
+ private final static long BLOCK_LEN = 3000L;
|
|
|
+ private final static long REPLICA_LEN1 = 6000L;
|
|
|
+ private final static long REPLICA_LEN2 = 5000L;
|
|
|
+ private final static Block block = new Block(BLOCK_ID, BLOCK_LEN, GEN_STAMP);
|
|
|
+
|
|
|
+ static {
|
|
|
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Starts an instance of DataNode
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Before
|
|
|
+ public void startUp() throws IOException {
|
|
|
+ conf = new HdfsConfiguration();
|
|
|
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
|
|
|
+ FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
|
|
|
+ ArrayList<File> dirs = new ArrayList<File>();
|
|
|
+ File dataDir = new File(DATA_DIR);
|
|
|
+ FileUtil.fullyDelete(dataDir);
|
|
|
+ dataDir.mkdirs();
|
|
|
+ dirs.add(dataDir);
|
|
|
+ DatanodeProtocol namenode = mock(DatanodeProtocol.class);
|
|
|
+ when(namenode.versionRequest()).thenReturn(new NamespaceInfo(1, 1L, 1));
|
|
|
+ when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(),
|
|
|
+ anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
|
|
|
+ new DatanodeCommand[0]);
|
|
|
+ dn = new DataNode(conf, dirs, namenode);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Cleans the resources and closes the instance of datanode
|
|
|
+ * @throws IOException if an error occurred
|
|
|
+ */
|
|
|
+ @After
|
|
|
+ public void tearDown() throws IOException {
|
|
|
+ if (dn != null) {
|
|
|
+ try {
|
|
|
+ dn.shutdown();
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.error("Cannot close: ", e);
|
|
|
+ } finally {
|
|
|
+ File dir = new File(DATA_DIR);
|
|
|
+ if (dir.exists())
|
|
|
+ Assert.assertTrue(
|
|
|
+ "Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Sync two replicas */
|
|
|
+ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
|
|
|
+ ReplicaRecoveryInfo replica2,
|
|
|
+ InterDatanodeProtocol dn1,
|
|
|
+ InterDatanodeProtocol dn2) throws IOException {
|
|
|
+
|
|
|
+ DatanodeInfo[] locs = new DatanodeInfo[]{
|
|
|
+ mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
|
|
|
+ RecoveringBlock rBlock = new RecoveringBlock(block,
|
|
|
+ locs, RECOVERY_ID);
|
|
|
+ ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
|
|
|
+ BlockRecord record1 = new BlockRecord(
|
|
|
+ new DatanodeID("xx", "yy", 44, 55), dn1, replica1);
|
|
|
+ BlockRecord record2 = new BlockRecord(
|
|
|
+ new DatanodeID("aa", "bb", 11, 22), dn2, replica2);
|
|
|
+ syncList.add(record1);
|
|
|
+ syncList.add(record2);
|
|
|
+ dn.syncBlock(rBlock, syncList);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.8.
|
|
|
+ * Two replicas are in Finalized state
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFinalizedReplicas () throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+
|
|
|
+ // two finalized replicas have different length
|
|
|
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
|
|
|
+
|
|
|
+ try {
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ Assert.fail("Two finalized replicas should not have different lengthes!");
|
|
|
+ } catch (IOException e) {
|
|
|
+ Assert.assertTrue(e.getMessage().startsWith(
|
|
|
+ "Inconsistent size of finalized replicas. "));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.9.
|
|
|
+ * One replica is Finalized and another is RBW.
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFinalizedRbwReplicas() throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+
|
|
|
+ // rbw and finalized replicas have the same length
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+
|
|
|
+ // rbw replica has a different length from the finalized one
|
|
|
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
|
|
|
+
|
|
|
+ dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2, never()).updateReplicaUnderRecovery(
|
|
|
+ block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.10.
|
|
|
+ * One replica is Finalized and another is RWR.
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFinalizedRwrReplicas() throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+
|
|
|
+ // rbw and finalized replicas have the same length
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2, never()).updateReplicaUnderRecovery(
|
|
|
+ block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+
|
|
|
+ // rbw replica has a different length from the finalized one
|
|
|
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
|
|
|
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
|
|
|
+
|
|
|
+ dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2, never()).updateReplicaUnderRecovery(
|
|
|
+ block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.11.
|
|
|
+ * Two replicas are RBW.
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRBWReplicas() throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
|
|
|
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.12.
|
|
|
+ * One replica is RBW and another is RWR.
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRBW_RWRReplicas() throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ verify(dn2, never()).updateReplicaUnderRecovery(
|
|
|
+ block, RECOVERY_ID, REPLICA_LEN1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * BlockRecovery_02.13.
|
|
|
+ * Two replicas are RWR.
|
|
|
+ * @throws IOException in case of an error
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRWRReplicas() throws IOException {
|
|
|
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
|
|
|
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
|
|
|
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
|
|
|
+
|
|
|
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
|
|
|
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
|
|
|
+
|
|
|
+ testSyncReplicas(replica1, replica2, dn1, dn2);
|
|
|
+
|
|
|
+ long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
|
|
|
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
|
|
|
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
|
|
|
+ }
|
|
|
+}
|