|
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
import com.google.common.base.Supplier;
|
|
import com.google.common.base.Supplier;
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
|
|
|
|
|
+
|
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
@@ -36,15 +37,27 @@ import java.io.IOException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Map.Entry;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.io.IOUtils;
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -81,16 +94,28 @@ public abstract class LazyPersistTestCase {
|
|
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
|
|
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(LazyPersistTestCase.class);
|
|
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
|
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
|
protected static final int BUFFER_LENGTH = 4096;
|
|
protected static final int BUFFER_LENGTH = 4096;
|
|
- private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
|
|
|
- private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
|
|
|
- private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
|
|
|
- private static final String JMX_SERVICE_NAME = "DataNode";
|
|
|
|
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
|
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
|
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
|
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
|
- protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
|
|
|
|
protected static final short REPL_FACTOR = 1;
|
|
protected static final short REPL_FACTOR = 1;
|
|
|
|
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
|
|
|
+ private static final String JMX_SERVICE_NAME = "DataNode";
|
|
|
|
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
|
|
|
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
|
|
|
+ private static final int WAIT_FOR_FBR_MS = 10 * 1000;
|
|
|
|
+ private static final int WAIT_FOR_STORAGE_TYPES_MS = 30 * 1000;
|
|
|
|
+ private static final int WAIT_FOR_ASYNC_DELETE_MS = 10 * 1000;
|
|
|
|
+ private static final int WAIT_FOR_DN_SHUTDOWN_MS = 30 * 1000;
|
|
|
|
+ private static final int WAIT_FOR_REDUNDANCY_MS =
|
|
|
|
+ 2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000;
|
|
|
|
+ private static final int WAIT_FOR_LAZY_SCRUBBER_MS =
|
|
|
|
+ 2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000;
|
|
|
|
+ private static final int WAIT_POLL_INTERVAL_MS = 10;
|
|
|
|
+ private static final int WAIT_POLL_INTERVAL_LARGE_MS = 20;
|
|
|
|
+
|
|
protected final long osPageSize =
|
|
protected final long osPageSize =
|
|
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
|
|
|
|
|
@@ -154,7 +179,7 @@ public abstract class LazyPersistTestCase {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }, 100, 30 * 1000);
|
|
|
|
|
|
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
|
|
|
|
|
|
return client.getLocatedBlocks(path.toString(), 0, fileLength);
|
|
return client.getLocatedBlocks(path.toString(), 0, fileLength);
|
|
}
|
|
}
|
|
@@ -429,11 +454,38 @@ public abstract class LazyPersistTestCase {
|
|
private boolean disableScrubber=false;
|
|
private boolean disableScrubber=false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Force a FBR on all the datanodes.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ * @throws TimeoutException
|
|
|
|
+ */
|
|
protected final void triggerBlockReport()
|
|
protected final void triggerBlockReport()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
// Trigger block report to NN
|
|
// Trigger block report to NN
|
|
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
|
|
|
- Thread.sleep(10 * 1000);
|
|
|
|
|
|
+ final Map<DatanodeStorageInfo, Integer> reportCounts = new HashMap<>();
|
|
|
|
+ final FSNamesystem fsn = cluster.getNamesystem();
|
|
|
|
+ for (DataNode dn : cluster.getDataNodes()) {
|
|
|
|
+ final DatanodeDescriptor dnd =
|
|
|
|
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
|
|
|
+ final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
|
|
|
|
+ reportCounts.put(storage, storage.getBlockReportCount());
|
|
|
|
+ DataNodeTestUtils.triggerBlockReport(dn);
|
|
|
|
+ }
|
|
|
|
+ // wait for block reports to be received.
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ for (Entry<DatanodeStorageInfo, Integer> repCntEntry : reportCounts
|
|
|
|
+ .entrySet()) {
|
|
|
|
+ if (repCntEntry.getValue() == repCntEntry.getKey()
|
|
|
|
+ .getBlockReportCount()) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
|
|
}
|
|
}
|
|
|
|
|
|
protected final boolean verifyBlockDeletedFromDir(File dir,
|
|
protected final boolean verifyBlockDeletedFromDir(File dir,
|
|
@@ -445,51 +497,65 @@ public abstract class LazyPersistTestCase {
|
|
|
|
|
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
|
if (blockFile.exists()) {
|
|
if (blockFile.exists()) {
|
|
- LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
|
|
|
|
- " exists after deletion.");
|
|
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
File metaFile = new File(targetDir,
|
|
File metaFile = new File(targetDir,
|
|
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
|
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
|
lb.getBlock().getGenerationStamp()));
|
|
lb.getBlock().getGenerationStamp()));
|
|
if (metaFile.exists()) {
|
|
if (metaFile.exists()) {
|
|
- LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
|
|
|
|
- " exists after deletion.");
|
|
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
- protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks)
|
|
|
|
+ throws Exception {
|
|
|
|
|
|
LOG.info("Verifying replica has no saved copy after deletion.");
|
|
LOG.info("Verifying replica has no saved copy after deletion.");
|
|
triggerBlockReport();
|
|
triggerBlockReport();
|
|
|
|
+ final DataNode dn = cluster.getDataNodes().get(0);
|
|
|
|
|
|
- while(
|
|
|
|
- cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
|
|
|
|
- > 0L){
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- }
|
|
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ for (DataNode dn1 : cluster.getDataNodes()) {
|
|
|
|
+ if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
|
|
|
|
+ > 0) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
|
|
|
|
|
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
- final FsDatasetSpi<?> dataset =
|
|
|
|
- cluster.getDataNodes().get(0).getFSDataset();
|
|
|
|
-
|
|
|
|
|
|
+ final FsDatasetSpi<?> dataset = dn.getFSDataset();
|
|
// Make sure deleted replica does not have a copy on either finalized dir of
|
|
// Make sure deleted replica does not have a copy on either finalized dir of
|
|
- // transient volume or finalized dir of non-transient volume
|
|
|
|
|
|
+ // transient volume or finalized dir of non-transient volume.
|
|
|
|
+ // We need to wait until the asyn deletion is scheduled.
|
|
try (FsDatasetSpi.FsVolumeReferences volumes =
|
|
try (FsDatasetSpi.FsVolumeReferences volumes =
|
|
dataset.getFsVolumeReferences()) {
|
|
dataset.getFsVolumeReferences()) {
|
|
- for (FsVolumeSpi vol : volumes) {
|
|
|
|
- FsVolumeImpl volume = (FsVolumeImpl) vol;
|
|
|
|
- File targetDir = (volume.isTransientStorage()) ?
|
|
|
|
- volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
|
|
|
- volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
|
|
|
- if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
|
|
|
|
- return false;
|
|
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ try {
|
|
|
|
+ for (FsVolumeSpi vol : volumes) {
|
|
|
|
+ FsVolumeImpl volume = (FsVolumeImpl) vol;
|
|
|
|
+ File targetDir = (volume.isTransientStorage()) ?
|
|
|
|
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
|
|
|
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
|
|
|
+ if (!LazyPersistTestCase.this
|
|
|
|
+ .verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -534,4 +600,104 @@ public abstract class LazyPersistTestCase {
|
|
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
|
|
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
|
|
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
|
|
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * the DataNodes and sleep for the time it takes the NN to detect the DN as
|
|
|
|
+ * being dead.
|
|
|
|
+ */
|
|
|
|
+ protected void shutdownDataNodes()
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ cluster.shutdownDataNodes();
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ try {
|
|
|
|
+ DatanodeInfo[] info = client.datanodeReport(
|
|
|
|
+ HdfsConstants.DatanodeReportType.LIVE);
|
|
|
|
+ return info.length == 0;
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void waitForCorruptBlock(final long corruptCnt)
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ // wait for the redundancy monitor to mark the file as corrupt.
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
|
|
|
|
+ .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
|
|
|
|
+ int count = 0;
|
|
|
|
+ while (bInfoIter.hasNext()) {
|
|
|
|
+ bInfoIter.next();
|
|
|
|
+ count++;
|
|
|
|
+ }
|
|
|
|
+ return corruptCnt == count;
|
|
|
|
+ }
|
|
|
|
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void waitForScrubberCycle()
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ // wait for the redundancy monitor to mark the file as corrupt.
|
|
|
|
+ final FSNamesystem fsn = cluster.getNamesystem();
|
|
|
|
+ final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
|
|
|
|
+ if (lastTimeStamp == -1) { // scrubber is disabled
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
|
+ new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return lastTimeStamp != fsn.getLazyPersistFileScrubberTS();
|
|
|
|
+ }
|
|
|
|
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void waitForRedundancyMonitorCycle()
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ // wait for the redundancy monitor to mark the file as corrupt.
|
|
|
|
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
|
|
+ final long lastRedundancyTS =
|
|
|
|
+ bm.getLastReplicationCycleTS();
|
|
|
|
+
|
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
|
+ new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return lastRedundancyTS != bm.getLastReplicationCycleTS();
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void waitForRedundancyCount(final long cnt)
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
|
|
+
|
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
|
+ new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ return cnt == bm.getUnderReplicatedBlocksCount();
|
|
|
|
+ }
|
|
|
|
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void waitForFile(final Path path, final boolean expected)
|
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ try {
|
|
|
|
+ return expected == fs.exists(path);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
|
|
|
|
+ }
|
|
}
|
|
}
|