|
@@ -25,13 +25,16 @@ import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
import com.google.common.base.Supplier;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
|
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
|
|
@@ -97,6 +100,8 @@ public class TestBootstrapStandby {
|
|
"storage directory does not exist or is not accessible",
|
|
"storage directory does not exist or is not accessible",
|
|
ioe);
|
|
ioe);
|
|
}
|
|
}
|
|
|
|
+ int expectedCheckpointTxId = (int)NameNodeAdapter.getNamesystem(nn0)
|
|
|
|
+ .getFSImage().getMostRecentCheckpointTxId();
|
|
|
|
|
|
int rc = BootstrapStandby.run(
|
|
int rc = BootstrapStandby.run(
|
|
new String[]{"-nonInteractive"},
|
|
new String[]{"-nonInteractive"},
|
|
@@ -105,7 +110,7 @@ public class TestBootstrapStandby {
|
|
|
|
|
|
// Should have copied over the namespace from the active
|
|
// Should have copied over the namespace from the active
|
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
|
- ImmutableList.of(0));
|
|
|
|
|
|
+ ImmutableList.of(expectedCheckpointTxId));
|
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
|
|
|
|
|
// We should now be able to start the standby successfully.
|
|
// We should now be able to start the standby successfully.
|
|
@@ -214,7 +219,7 @@ public class TestBootstrapStandby {
|
|
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY}
|
|
* {@link DFSConfigKeys#DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY}
|
|
* created by HDFS-8808.
|
|
* created by HDFS-8808.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=30000)
|
|
public void testRateThrottling() throws Exception {
|
|
public void testRateThrottling() throws Exception {
|
|
cluster.getConfiguration(0).setLong(
|
|
cluster.getConfiguration(0).setLong(
|
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 1);
|
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 1);
|
|
@@ -222,23 +227,29 @@ public class TestBootstrapStandby {
|
|
cluster.waitActive();
|
|
cluster.waitActive();
|
|
nn0 = cluster.getNameNode(0);
|
|
nn0 = cluster.getNameNode(0);
|
|
cluster.transitionToActive(0);
|
|
cluster.transitionToActive(0);
|
|
- // Each edit has at least 1 byte. So the lowRate definitely should cause
|
|
|
|
- // a timeout, if enforced. If lowRate is not enforced, any reasonable test
|
|
|
|
- // machine should at least download an image with 5 edits in 5 seconds.
|
|
|
|
- for (int i = 0; i < 5; i++) {
|
|
|
|
- nn0.getRpcServer().rollEditLog();
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ int timeOut = updatePrimaryNNAndGetTimeout();
|
|
// A very low DFS_IMAGE_TRANSFER_RATE_KEY value won't affect bootstrapping
|
|
// A very low DFS_IMAGE_TRANSFER_RATE_KEY value won't affect bootstrapping
|
|
|
|
+ final AtomicBoolean bootStrapped = new AtomicBoolean(false);
|
|
|
|
+ new Thread(
|
|
|
|
+ new Runnable() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ testSuccessfulBaseCase();
|
|
|
|
+ bootStrapped.set(true);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ fail(e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ ).start();
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
public Boolean get() {
|
|
public Boolean get() {
|
|
- try {
|
|
|
|
- testSuccessfulBaseCase();
|
|
|
|
- return true;
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ return bootStrapped.get();
|
|
}
|
|
}
|
|
- }, 500, 5000);
|
|
|
|
|
|
+ }, 50, timeOut);
|
|
|
|
|
|
shutdownCluster();
|
|
shutdownCluster();
|
|
setupCluster();
|
|
setupCluster();
|
|
@@ -250,22 +261,61 @@ public class TestBootstrapStandby {
|
|
cluster.transitionToActive(0);
|
|
cluster.transitionToActive(0);
|
|
// A very low DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY value should
|
|
// A very low DFS_IMAGE_TRANSFER_BOOTSTRAP_STANDBY_RATE_KEY value should
|
|
// cause timeout
|
|
// cause timeout
|
|
|
|
+ timeOut = updatePrimaryNNAndGetTimeout();
|
|
|
|
+ bootStrapped.set(false);
|
|
|
|
+ new Thread(
|
|
|
|
+ new Runnable() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ testSuccessfulBaseCase();
|
|
|
|
+ bootStrapped.set(true);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.info(e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ ).start();
|
|
try {
|
|
try {
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
public Boolean get() {
|
|
public Boolean get() {
|
|
- try {
|
|
|
|
- testSuccessfulBaseCase();
|
|
|
|
- return true;
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
|
|
+ return bootStrapped.get();
|
|
}
|
|
}
|
|
- }, 500, 5000);
|
|
|
|
|
|
+ }, 50, timeOut);
|
|
fail("Did not timeout");
|
|
fail("Did not timeout");
|
|
} catch (TimeoutException e) {
|
|
} catch (TimeoutException e) {
|
|
LOG.info("Encountered expected timeout.");
|
|
LOG.info("Encountered expected timeout.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Add enough content to the primary NN's fsimage so that it's larger than
|
|
|
|
+ * the IO transfer buffer size of bootstrapping. The return the correct
|
|
|
|
+ * timeout duration.
|
|
|
|
+ */
|
|
|
|
+ private int updatePrimaryNNAndGetTimeout() throws IOException{
|
|
|
|
+ // Any reasonable test machine should be able to transfer 1 byte per MS
|
|
|
|
+ // (which is ~1K/s)
|
|
|
|
+ final int minXferRatePerMS = 1;
|
|
|
|
+ int imageXferBufferSize = DFSUtilClient.getIoFileBufferSize(
|
|
|
|
+ new Configuration());
|
|
|
|
+ File imageFile = null;
|
|
|
|
+ int dirIdx = 0;
|
|
|
|
+ while (imageFile == null || imageFile.length() < imageXferBufferSize) {
|
|
|
|
+ for (int i = 0; i < 5; i++) {
|
|
|
|
+ cluster.getFileSystem(0).mkdirs(new Path("/foo" + dirIdx++));
|
|
|
|
+ }
|
|
|
|
+ nn0.getRpcServer().rollEditLog();
|
|
|
|
+ NameNodeAdapter.enterSafeMode(nn0, false);
|
|
|
|
+ NameNodeAdapter.saveNamespace(nn0);
|
|
|
|
+ NameNodeAdapter.leaveSafeMode(nn0);
|
|
|
|
+ imageFile = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
|
|
|
|
+ .getFSImage(nn0).getStorage().getStorageDir(0));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return (int)(imageFile.length() / minXferRatePerMS) + 1;
|
|
|
|
+ }
|
|
|
|
+
|
|
private void removeStandbyNameDirs() {
|
|
private void removeStandbyNameDirs() {
|
|
for (URI u : cluster.getNameDirs(1)) {
|
|
for (URI u : cluster.getNameDirs(1)) {
|
|
assertTrue(u.getScheme().equals("file"));
|
|
assertTrue(u.getScheme().equals("file"));
|