|
@@ -52,6 +52,7 @@ import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -958,13 +959,12 @@ public class TestMover {
|
|
|
new String[] { "-p", barDir });
|
|
|
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
|
|
|
|
|
- // verify storage types and locations
|
|
|
+ // Verify storage types and locations.
|
|
|
+ // Wait until Namenode confirms ARCHIVE storage type for all blocks of
|
|
|
+ // fooFile.
|
|
|
+ waitForUpdatedStorageType(client, fooFile, fileLen, StorageType.ARCHIVE);
|
|
|
+
|
|
|
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
|
|
|
- for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
|
|
|
- for( StorageType type : lb.getStorageTypes()){
|
|
|
- Assert.assertEquals(StorageType.ARCHIVE, type);
|
|
|
- }
|
|
|
- }
|
|
|
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
|
|
|
dataBlocks + parityBlocks);
|
|
|
|
|
@@ -1005,6 +1005,43 @@ public class TestMover {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Wait until Namenode reports expected storage type for all blocks of
|
|
|
+ * given file.
|
|
|
+ *
|
|
|
+ * @param client handle all RPC calls to Namenode.
|
|
|
+ * @param file file for which we are expecting same storage type of all
|
|
|
+ * located blocks.
|
|
|
+ * @param fileLen length of the file.
|
|
|
+ * @param expectedStorageType storage type to expect for all blocks of the
|
|
|
+ * given file.
|
|
|
+ * @throws TimeoutException if the wait timed out.
|
|
|
+ * @throws InterruptedException if interrupted while waiting for the response.
|
|
|
+ */
|
|
|
+ private void waitForUpdatedStorageType(ClientProtocol client, String file,
|
|
|
+ long fileLen, StorageType expectedStorageType)
|
|
|
+ throws TimeoutException, InterruptedException {
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ LocatedBlocks blocks;
|
|
|
+ try {
|
|
|
+ blocks = client.getBlockLocations(file, 0, fileLen);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ for (LocatedBlock lb : blocks.getLocatedBlocks()) {
|
|
|
+ for (StorageType type : lb.getStorageTypes()) {
|
|
|
+ if (!expectedStorageType.equals(type)) {
|
|
|
+ LOG.info("Block {} has StorageType: {}. It might not have been "
|
|
|
+ + "updated yet, awaiting the latest update.",
|
|
|
+ lb.getBlock().toString(), type);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }, 500, 5000, "Blocks storage type must be ARCHIVE");
|
|
|
+ }
|
|
|
+
|
|
|
private void initSecureConf(Configuration conf) throws Exception {
|
|
|
String username = "mover";
|
|
|
File baseDir = GenericTestUtils.getTestDir(TestMover.class.getSimpleName());
|