|
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
|
|
|
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
@@ -623,45 +624,111 @@ public class TestCacheDirectives {
|
|
|
}, 500, 60000);
|
|
|
}
|
|
|
|
|
|
- private static void waitForCachedStats(final DistributedFileSystem dfs,
|
|
|
- final long targetFilesAffected, final long targetBytesNeeded,
|
|
|
- final long targetBytesCached,
|
|
|
- final CacheDirectiveInfo filter, final String infoString)
|
|
|
+ private static void waitForCacheDirectiveStats(final DistributedFileSystem dfs,
|
|
|
+ final long targetBytesNeeded, final long targetBytesCached,
|
|
|
+ final long targetFilesNeeded, final long targetFilesCached,
|
|
|
+ final CacheDirectiveInfo filter, final String infoString)
|
|
|
throws Exception {
|
|
|
- LOG.info("Polling listDirectives{" +
|
|
|
- ((filter == null) ? "ALL" : filter.toString()) +
|
|
|
- " for " + targetFilesAffected + " targetFilesAffected, " +
|
|
|
- targetBytesNeeded + " targetBytesNeeded, " +
|
|
|
- targetBytesCached + " targetBytesCached");
|
|
|
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean get() {
|
|
|
- RemoteIterator<CacheDirectiveEntry> iter = null;
|
|
|
- CacheDirectiveEntry entry = null;
|
|
|
+ LOG.info("Polling listCacheDirectives " +
|
|
|
+ ((filter == null) ? "ALL" : filter.toString()) + " for " +
|
|
|
+ targetBytesNeeded + " targetBytesNeeded, " +
|
|
|
+ targetBytesCached + " targetBytesCached, " +
|
|
|
+ targetFilesNeeded + " targetFilesNeeded, " +
|
|
|
+ targetFilesCached + " targetFilesCached");
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ RemoteIterator<CacheDirectiveEntry> iter = null;
|
|
|
+ CacheDirectiveEntry entry = null;
|
|
|
+ try {
|
|
|
+ iter = dfs.listCacheDirectives(filter);
|
|
|
+ entry = iter.next();
|
|
|
+ } catch (IOException e) {
|
|
|
+ fail("got IOException while calling " +
|
|
|
+ "listCacheDirectives: " + e.getMessage());
|
|
|
+ }
|
|
|
+ Assert.assertNotNull(entry);
|
|
|
+ CacheDirectiveStats stats = entry.getStats();
|
|
|
+ if ((targetBytesNeeded == stats.getBytesNeeded()) &&
|
|
|
+ (targetBytesCached == stats.getBytesCached()) &&
|
|
|
+ (targetFilesNeeded == stats.getFilesNeeded()) &&
|
|
|
+ (targetFilesCached == stats.getFilesCached())) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ LOG.info(infoString + ": " +
|
|
|
+ "filesNeeded: " +
|
|
|
+ stats.getFilesNeeded() + "/" + targetFilesNeeded +
|
|
|
+ ", filesCached: " +
|
|
|
+ stats.getFilesCached() + "/" + targetFilesCached +
|
|
|
+ ", bytesNeeded: " +
|
|
|
+ stats.getBytesNeeded() + "/" + targetBytesNeeded +
|
|
|
+ ", bytesCached: " +
|
|
|
+ stats.getBytesCached() + "/" + targetBytesCached);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 500, 60000);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void waitForCachePoolStats(final DistributedFileSystem dfs,
|
|
|
+ final long targetBytesNeeded, final long targetBytesCached,
|
|
|
+ final long targetFilesNeeded, final long targetFilesCached,
|
|
|
+ final CachePoolInfo pool, final String infoString)
|
|
|
+ throws Exception {
|
|
|
+ LOG.info("Polling listCachePools " + pool.toString() + " for " +
|
|
|
+ targetBytesNeeded + " targetBytesNeeded, " +
|
|
|
+ targetBytesCached + " targetBytesCached, " +
|
|
|
+ targetFilesNeeded + " targetFilesNeeded, " +
|
|
|
+ targetFilesCached + " targetFilesCached");
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ RemoteIterator<CachePoolEntry> iter = null;
|
|
|
+ try {
|
|
|
+ iter = dfs.listCachePools();
|
|
|
+ } catch (IOException e) {
|
|
|
+ fail("got IOException while calling " +
|
|
|
+ "listCachePools: " + e.getMessage());
|
|
|
+ }
|
|
|
+ while (true) {
|
|
|
+ CachePoolEntry entry = null;
|
|
|
try {
|
|
|
- iter = dfs.listCacheDirectives(filter);
|
|
|
+ if (!iter.hasNext()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
entry = iter.next();
|
|
|
} catch (IOException e) {
|
|
|
- fail("got IOException while calling " +
|
|
|
- "listCacheDirectives: " + e.getMessage());
|
|
|
+ fail("got IOException while iterating through " +
|
|
|
+ "listCachePools: " + e.getMessage());
|
|
|
+ }
|
|
|
+ if (entry == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (!entry.getInfo().getPoolName().equals(pool.getPoolName())) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- Assert.assertNotNull(entry);
|
|
|
- CacheDirectiveStats stats = entry.getStats();
|
|
|
- if ((targetFilesAffected == stats.getFilesAffected()) &&
|
|
|
- (targetBytesNeeded == stats.getBytesNeeded()) &&
|
|
|
- (targetBytesCached == stats.getBytesCached())) {
|
|
|
+ CachePoolStats stats = entry.getStats();
|
|
|
+ if ((targetBytesNeeded == stats.getBytesNeeded()) &&
|
|
|
+ (targetBytesCached == stats.getBytesCached()) &&
|
|
|
+ (targetFilesNeeded == stats.getFilesNeeded()) &&
|
|
|
+ (targetFilesCached == stats.getFilesCached())) {
|
|
|
return true;
|
|
|
} else {
|
|
|
- LOG.info(infoString + ": filesAffected: " +
|
|
|
- stats.getFilesAffected() + "/" + targetFilesAffected +
|
|
|
- ", bytesNeeded: " +
|
|
|
+ LOG.info(infoString + ": " +
|
|
|
+ "filesNeeded: " +
|
|
|
+ stats.getFilesNeeded() + "/" + targetFilesNeeded +
|
|
|
+ ", filesCached: " +
|
|
|
+ stats.getFilesCached() + "/" + targetFilesCached +
|
|
|
+ ", bytesNeeded: " +
|
|
|
stats.getBytesNeeded() + "/" + targetBytesNeeded +
|
|
|
- ", bytesCached: " +
|
|
|
+ ", bytesCached: " +
|
|
|
stats.getBytesCached() + "/" + targetBytesCached);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
- }, 500, 60000);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 500, 60000);
|
|
|
}
|
|
|
|
|
|
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
|
|
@@ -837,7 +904,8 @@ public class TestCacheDirectives {
|
|
|
NameNode namenode = cluster.getNameNode();
|
|
|
// Create the pool
|
|
|
final String pool = "friendlyPool";
|
|
|
- dfs.addCachePool(new CachePoolInfo(pool));
|
|
|
+ final CachePoolInfo poolInfo = new CachePoolInfo(pool);
|
|
|
+ dfs.addCachePool(poolInfo);
|
|
|
// Create some test files
|
|
|
final List<Path> paths = new LinkedList<Path>();
|
|
|
paths.add(new Path("/foo/bar"));
|
|
@@ -853,6 +921,7 @@ public class TestCacheDirectives {
|
|
|
}
|
|
|
waitForCachedBlocks(namenode, 0, 0,
|
|
|
"testWaitForCachedReplicasInDirectory:0");
|
|
|
+
|
|
|
// cache entire directory
|
|
|
long id = dfs.addCacheDirective(
|
|
|
new CacheDirectiveInfo.Builder().
|
|
@@ -861,14 +930,20 @@ public class TestCacheDirectives {
|
|
|
setPool(pool).
|
|
|
build());
|
|
|
waitForCachedBlocks(namenode, 4, 8,
|
|
|
- "testWaitForCachedReplicasInDirectory:1");
|
|
|
+ "testWaitForCachedReplicasInDirectory:1:blocks");
|
|
|
// Verify that listDirectives gives the stats we want.
|
|
|
- waitForCachedStats(dfs, 2,
|
|
|
- 8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
|
|
|
+ waitForCacheDirectiveStats(dfs,
|
|
|
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ 2, 2,
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
setPath(new Path("/foo")).
|
|
|
build(),
|
|
|
- "testWaitForCachedReplicasInDirectory:2");
|
|
|
+ "testWaitForCachedReplicasInDirectory:1:directive");
|
|
|
+ waitForCachePoolStats(dfs,
|
|
|
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ 2, 2,
|
|
|
+ poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
|
|
|
+
|
|
|
long id2 = dfs.addCacheDirective(
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
setPath(new Path("/foo/bar")).
|
|
@@ -877,28 +952,42 @@ public class TestCacheDirectives {
|
|
|
build());
|
|
|
// wait for an additional 2 cached replicas to come up
|
|
|
waitForCachedBlocks(namenode, 4, 10,
|
|
|
- "testWaitForCachedReplicasInDirectory:3");
|
|
|
+ "testWaitForCachedReplicasInDirectory:2:blocks");
|
|
|
// the directory directive's stats are unchanged
|
|
|
- waitForCachedStats(dfs, 2,
|
|
|
- 8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
|
|
|
+ waitForCacheDirectiveStats(dfs,
|
|
|
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ 2, 2,
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
setPath(new Path("/foo")).
|
|
|
build(),
|
|
|
- "testWaitForCachedReplicasInDirectory:4");
|
|
|
+ "testWaitForCachedReplicasInDirectory:2:directive-1");
|
|
|
// verify /foo/bar's stats
|
|
|
- waitForCachedStats(dfs, 1,
|
|
|
+ waitForCacheDirectiveStats(dfs,
|
|
|
4 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
// only 3 because the file only has 3 replicas, not 4 as requested.
|
|
|
3 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ 1,
|
|
|
+ // only 0 because the file can't be fully cached
|
|
|
+ 0,
|
|
|
new CacheDirectiveInfo.Builder().
|
|
|
setPath(new Path("/foo/bar")).
|
|
|
build(),
|
|
|
- "testWaitForCachedReplicasInDirectory:5");
|
|
|
+ "testWaitForCachedReplicasInDirectory:2:directive-2");
|
|
|
+ waitForCachePoolStats(dfs,
|
|
|
+ (4+4) * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ (4+3) * numBlocksPerFile * BLOCK_SIZE,
|
|
|
+ 3, 2,
|
|
|
+ poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
|
|
|
+
|
|
|
// remove and watch numCached go to 0
|
|
|
dfs.removeCacheDirective(id);
|
|
|
dfs.removeCacheDirective(id2);
|
|
|
waitForCachedBlocks(namenode, 0, 0,
|
|
|
- "testWaitForCachedReplicasInDirectory:6");
|
|
|
+ "testWaitForCachedReplicasInDirectory:3:blocks");
|
|
|
+ waitForCachePoolStats(dfs,
|
|
|
+ 0, 0,
|
|
|
+ 0, 0,
|
|
|
+ poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|