|
@@ -614,6 +614,47 @@ public class TestCacheDirectives {
|
|
}, 500, 60000);
|
|
}, 500, 60000);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void waitForCachedStats(final DistributedFileSystem dfs,
|
|
|
|
+ final long targetFilesAffected, final long targetBytesNeeded,
|
|
|
|
+ final long targetBytesCached,
|
|
|
|
+ final CacheDirectiveInfo filter, final String infoString)
|
|
|
|
+ throws Exception {
|
|
|
|
+ LOG.info("Polling listDirectives{" +
|
|
|
|
+ ((filter == null) ? "ALL" : filter.toString()) +
|
|
|
|
+ " for " + targetFilesAffected + " targetFilesAffected, " +
|
|
|
|
+ targetBytesNeeded + " targetBytesNeeded, " +
|
|
|
|
+ targetBytesCached + " targetBytesCached");
|
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean get() {
|
|
|
|
+ RemoteIterator<CacheDirectiveEntry> iter = null;
|
|
|
|
+ CacheDirectiveEntry entry = null;
|
|
|
|
+ try {
|
|
|
|
+ iter = dfs.listCacheDirectives(filter);
|
|
|
|
+ entry = iter.next();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ fail("got IOException while calling " +
|
|
|
|
+ "listCacheDirectives: " + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ Assert.assertNotNull(entry);
|
|
|
|
+ CacheDirectiveStats stats = entry.getStats();
|
|
|
|
+ if ((targetFilesAffected == stats.getFilesAffected()) &&
|
|
|
|
+ (targetBytesNeeded == stats.getBytesNeeded()) &&
|
|
|
|
+ (targetBytesCached == stats.getBytesCached())) {
|
|
|
|
+ return true;
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info(infoString + ": filesAffected: " +
|
|
|
|
+ stats.getFilesAffected() + "/" + targetFilesAffected +
|
|
|
|
+ ", bytesNeeded: " +
|
|
|
|
+ stats.getBytesNeeded() + "/" + targetBytesNeeded +
|
|
|
|
+ ", bytesCached: " +
|
|
|
|
+ stats.getBytesCached() + "/" + targetBytesCached);
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }, 500, 60000);
|
|
|
|
+ }
|
|
|
|
+
|
|
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
|
|
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
|
|
final List<Path> paths, final int expectedBlocks,
|
|
final List<Path> paths, final int expectedBlocks,
|
|
final int expectedReplicas)
|
|
final int expectedReplicas)
|
|
@@ -804,21 +845,12 @@ public class TestCacheDirectives {
|
|
waitForCachedBlocks(namenode, 4, 8,
|
|
waitForCachedBlocks(namenode, 4, 8,
|
|
"testWaitForCachedReplicasInDirectory:1");
|
|
"testWaitForCachedReplicasInDirectory:1");
|
|
// Verify that listDirectives gives the stats we want.
|
|
// Verify that listDirectives gives the stats we want.
|
|
- RemoteIterator<CacheDirectiveEntry> iter =
|
|
|
|
- dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().
|
|
|
|
- setPath(new Path("/foo")).
|
|
|
|
- build());
|
|
|
|
- CacheDirectiveEntry entry = iter.next();
|
|
|
|
- CacheDirectiveStats stats = entry.getStats();
|
|
|
|
- Assert.assertEquals(Long.valueOf(2),
|
|
|
|
- stats.getFilesAffected());
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
|
|
|
- stats.getBytesNeeded());
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
|
|
|
- stats.getBytesCached());
|
|
|
|
-
|
|
|
|
|
|
+ waitForCachedStats(dfs, 2,
|
|
|
|
+ 8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
|
|
|
|
+ new CacheDirectiveInfo.Builder().
|
|
|
|
+ setPath(new Path("/foo")).
|
|
|
|
+ build(),
|
|
|
|
+ "testWaitForCachedReplicasInDirectory:2");
|
|
long id2 = dfs.addCacheDirective(
|
|
long id2 = dfs.addCacheDirective(
|
|
new CacheDirectiveInfo.Builder().
|
|
new CacheDirectiveInfo.Builder().
|
|
setPath(new Path("/foo/bar")).
|
|
setPath(new Path("/foo/bar")).
|
|
@@ -827,44 +859,28 @@ public class TestCacheDirectives {
|
|
build());
|
|
build());
|
|
// wait for an additional 2 cached replicas to come up
|
|
// wait for an additional 2 cached replicas to come up
|
|
waitForCachedBlocks(namenode, 4, 10,
|
|
waitForCachedBlocks(namenode, 4, 10,
|
|
- "testWaitForCachedReplicasInDirectory:2");
|
|
|
|
|
|
+ "testWaitForCachedReplicasInDirectory:3");
|
|
// the directory directive's stats are unchanged
|
|
// the directory directive's stats are unchanged
|
|
- iter = dfs.listCacheDirectives(
|
|
|
|
|
|
+ waitForCachedStats(dfs, 2,
|
|
|
|
+ 8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
|
|
new CacheDirectiveInfo.Builder().
|
|
new CacheDirectiveInfo.Builder().
|
|
- setPath(new Path("/foo")).
|
|
|
|
- build());
|
|
|
|
- entry = iter.next();
|
|
|
|
- stats = entry.getStats();
|
|
|
|
- Assert.assertEquals(Long.valueOf(2),
|
|
|
|
- stats.getFilesAffected());
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
|
|
|
- stats.getBytesNeeded());
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
|
|
|
- stats.getBytesCached());
|
|
|
|
|
|
+ setPath(new Path("/foo")).
|
|
|
|
+ build(),
|
|
|
|
+ "testWaitForCachedReplicasInDirectory:4");
|
|
// verify /foo/bar's stats
|
|
// verify /foo/bar's stats
|
|
- iter = dfs.listCacheDirectives(
|
|
|
|
|
|
+ waitForCachedStats(dfs, 1,
|
|
|
|
+ 4 * numBlocksPerFile * BLOCK_SIZE,
|
|
|
|
+ // only 3 because the file only has 3 replicas, not 4 as requested.
|
|
|
|
+ 3 * numBlocksPerFile * BLOCK_SIZE,
|
|
new CacheDirectiveInfo.Builder().
|
|
new CacheDirectiveInfo.Builder().
|
|
- setPath(new Path("/foo/bar")).
|
|
|
|
- build());
|
|
|
|
- entry = iter.next();
|
|
|
|
- stats = entry.getStats();
|
|
|
|
- Assert.assertEquals(Long.valueOf(1),
|
|
|
|
- stats.getFilesAffected());
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 4 * numBlocksPerFile * BLOCK_SIZE),
|
|
|
|
- stats.getBytesNeeded());
|
|
|
|
- // only 3 because the file only has 3 replicas, not 4 as requested.
|
|
|
|
- Assert.assertEquals(Long.valueOf(
|
|
|
|
- 3 * numBlocksPerFile * BLOCK_SIZE),
|
|
|
|
- stats.getBytesCached());
|
|
|
|
-
|
|
|
|
|
|
+ setPath(new Path("/foo/bar")).
|
|
|
|
+ build(),
|
|
|
|
+ "testWaitForCachedReplicasInDirectory:5");
|
|
// remove and watch numCached go to 0
|
|
// remove and watch numCached go to 0
|
|
dfs.removeCacheDirective(id);
|
|
dfs.removeCacheDirective(id);
|
|
dfs.removeCacheDirective(id2);
|
|
dfs.removeCacheDirective(id2);
|
|
waitForCachedBlocks(namenode, 0, 0,
|
|
waitForCachedBlocks(namenode, 0, 0,
|
|
- "testWaitForCachedReplicasInDirectory:3");
|
|
|
|
|
|
+ "testWaitForCachedReplicasInDirectory:6");
|
|
} finally {
|
|
} finally {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|