|
@@ -22,10 +22,19 @@ import static org.junit.Assert.assertNull;
|
|
|
|
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
@@ -67,4 +76,41 @@ public class TestPendingDataNodeMessages {
|
|
assertNull(msgs.takeBlockQueue(block1Gs1));
|
|
assertNull(msgs.takeBlockQueue(block1Gs1));
|
|
assertEquals(0, msgs.count());
|
|
assertEquals(0, msgs.count());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testPendingDataNodeMessagesWithEC() throws Exception {
|
|
|
|
+ ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getPolicies()
|
|
|
|
+ .get(3);
|
|
|
|
+ Path dirPath = new Path("/testPendingDataNodeMessagesWithEC");
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60000);
|
|
|
|
+
|
|
|
|
+ int numDn = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
|
|
|
|
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
+ .numDataNodes(numDn).nnTopology(MiniDFSNNTopology.simpleHATopology())
|
|
|
|
+ .build();
|
|
|
|
+ try {
|
|
|
|
+ cluster.transitionToActive(0);
|
|
|
|
+
|
|
|
|
+ DistributedFileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
|
|
|
+ fs.enableErasureCodingPolicy(ecPolicy.getName());
|
|
|
|
+ fs.mkdirs(dirPath);
|
|
|
|
+ fs.setErasureCodingPolicy(dirPath, ecPolicy.getName());
|
|
|
|
+
|
|
|
|
+ DFSTestUtil.createFile(fs, new Path(dirPath, "file"),
|
|
|
|
+ ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(), (short) 1, 0);
|
|
|
|
+
|
|
|
|
+ cluster.getNameNode(0).getRpcServer().rollEditLog();
|
|
|
|
+ cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
|
|
|
|
+
|
|
|
|
+ // PendingDataNodeMessages datanode message queue should be empty after
|
|
|
|
+ // processing IBR
|
|
|
|
+ int pendingIBRMsg = cluster.getNameNode(1).getNamesystem()
|
|
|
|
+ .getBlockManager().getPendingDataNodeMessageCount();
|
|
|
|
+ assertEquals("All DN message should processed after tail edits", 0,
|
|
|
|
+ pendingIBRMsg);
|
|
|
|
+ } finally {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|