|
@@ -19,40 +19,34 @@
|
|
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.Future;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.BufferedWriter;
|
|
|
import java.io.File;
|
|
@@ -61,6 +55,8 @@ import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
import java.io.Writer;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.UUID;
|
|
@@ -372,68 +368,4 @@ public class TestNameNodePrunesMissingStorages {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Test(timeout=300000)
|
|
|
- public void testInterleavedFullBlockReports() throws Exception {
|
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
|
|
- 36000000L);
|
|
|
- int numStoragesPerDatanode = 6;
|
|
|
- final MiniDFSCluster cluster = new MiniDFSCluster
|
|
|
- .Builder(conf).numDataNodes(1)
|
|
|
- .storagesPerDatanode(numStoragesPerDatanode)
|
|
|
- .build();
|
|
|
- try {
|
|
|
- LOG.info("waiting for cluster to become active...");
|
|
|
- cluster.waitActive();
|
|
|
- // Get the datanode registration and the block reports
|
|
|
- DataNode dn = cluster.getDataNodes().get(0);
|
|
|
- final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
|
|
|
- LOG.info("Block pool id: " + blockPoolId);
|
|
|
- final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
|
|
|
- Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
|
|
- dn.getFSDataset().getBlockReports(blockPoolId);
|
|
|
- final StorageBlockReport[] reports =
|
|
|
- new StorageBlockReport[perVolumeBlockLists.size()];
|
|
|
- int reportIndex = 0;
|
|
|
- for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
|
|
|
- perVolumeBlockLists.entrySet()) {
|
|
|
- DatanodeStorage dnStorage = kvPair.getKey();
|
|
|
- BlockListAsLongs blockList = kvPair.getValue();
|
|
|
- reports[reportIndex++] =
|
|
|
- new StorageBlockReport(dnStorage, blockList);
|
|
|
- }
|
|
|
- // Get the list of storage ids associated with the datanode
|
|
|
- // before the test
|
|
|
- BlockManager bm =
|
|
|
- cluster.getNameNode().getNamesystem().getBlockManager();
|
|
|
- final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
|
|
|
- getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
|
|
|
- DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
|
|
|
- // Send the full block report concurrently using
|
|
|
- // numThreads=numStoragesPerDatanode
|
|
|
- ExecutorService executorService = Executors.
|
|
|
- newFixedThreadPool(numStoragesPerDatanode);
|
|
|
- List<Future<DatanodeCommand>> futureList =
|
|
|
- new ArrayList<>(numStoragesPerDatanode);
|
|
|
- for (int i = 0; i < numStoragesPerDatanode; i++) {
|
|
|
- futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
|
|
|
- @Override
|
|
|
- public DatanodeCommand call() throws IOException {
|
|
|
- return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
|
|
|
- reports, new BlockReportContext(1, 0, System.nanoTime(),
|
|
|
- 0L, true));
|
|
|
- }
|
|
|
- }));
|
|
|
- }
|
|
|
- for (Future<DatanodeCommand> future: futureList) {
|
|
|
- future.get();
|
|
|
- }
|
|
|
- executorService.shutdown();
|
|
|
- // Verify that the storages match before and after the test
|
|
|
- Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
|
|
|
- } finally {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
}
|