|
@@ -17,21 +17,47 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.mover;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_ADDRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KERBEROS_PRINCIPAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KEYTAB_ENABLED_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MOVER_KEYTAB_FILE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
-import java.util.*;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Properties;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import com.google.common.collect.Maps;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.StorageType;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
|
@@ -47,14 +73,27 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
|
|
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
|
|
+import org.apache.hadoop.http.HttpConfig;
|
|
|
+import org.apache.hadoop.minikdc.MiniKdc;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.authentication.util.KerberosName;
|
|
|
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-public class TestMover {
|
|
|
+import com.google.common.base.Supplier;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
|
|
|
- static final int DEFAULT_BLOCK_SIZE = 100;
|
|
|
+public class TestMover {
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(TestMover.class);
|
|
|
+ private static final int DEFAULT_BLOCK_SIZE = 100;
|
|
|
+ private File keytabFile;
|
|
|
+ private String principal;
|
|
|
|
|
|
static {
|
|
|
TestBalancer.initTestSetup();
|
|
@@ -116,14 +155,11 @@ public class TestMover {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testScheduleBlockWithinSameNode() throws Exception {
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
- initConf(conf);
|
|
|
+ private void testWithinSameNode(Configuration conf) throws Exception {
|
|
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
.numDataNodes(3)
|
|
|
.storageTypes(
|
|
|
- new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
|
|
|
+ new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
|
|
|
.build();
|
|
|
try {
|
|
|
cluster.waitActive();
|
|
@@ -133,13 +169,11 @@ public class TestMover {
|
|
|
dfs.mkdirs(dir);
|
|
|
// write to DISK
|
|
|
dfs.setStoragePolicy(dir, "HOT");
|
|
|
- {
|
|
|
- final FSDataOutputStream out = dfs.create(new Path(file));
|
|
|
- out.writeChars("testScheduleWithinSameNode");
|
|
|
- out.close();
|
|
|
- }
|
|
|
+ final FSDataOutputStream out = dfs.create(new Path(file));
|
|
|
+ out.writeChars("testScheduleWithinSameNode");
|
|
|
+ out.close();
|
|
|
|
|
|
- //verify before movement
|
|
|
+ // verify before movement
|
|
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
|
|
StorageType[] storageTypes = lb.getStorageTypes();
|
|
|
for (StorageType storageType : storageTypes) {
|
|
@@ -148,21 +182,49 @@ public class TestMover {
|
|
|
// move to ARCHIVE
|
|
|
dfs.setStoragePolicy(dir, "COLD");
|
|
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
|
|
- new String[] { "-p", dir.toString() });
|
|
|
- Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
|
|
|
+ new String[] {"-p", dir.toString()});
|
|
|
+ Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
|
|
|
|
|
- // Wait till namenode notified
|
|
|
- Thread.sleep(3000);
|
|
|
- lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
|
|
- storageTypes = lb.getStorageTypes();
|
|
|
- for (StorageType storageType : storageTypes) {
|
|
|
- Assert.assertTrue(StorageType.ARCHIVE == storageType);
|
|
|
- }
|
|
|
+ // Wait till namenode notified about the block location details
|
|
|
+ waitForLocatedBlockWithArchiveStorageType(dfs, file, 3);
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void waitForLocatedBlockWithArchiveStorageType(
|
|
|
+ final DistributedFileSystem dfs, final String file,
|
|
|
+ int expectedArchiveCount) throws Exception {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ LocatedBlock lb = null;
|
|
|
+ try {
|
|
|
+ lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Exception while getting located blocks", e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ int archiveCount = 0;
|
|
|
+ for (StorageType storageType : lb.getStorageTypes()) {
|
|
|
+ if (StorageType.ARCHIVE == storageType) {
|
|
|
+ archiveCount++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Archive replica count, expected={} and actual={}",
|
|
|
+ expectedArchiveCount, archiveCount);
|
|
|
+ return expectedArchiveCount == archiveCount;
|
|
|
+ }
|
|
|
+ }, 100, 3000);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testScheduleBlockWithinSameNode() throws Exception {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ initConf(conf);
|
|
|
+ testWithinSameNode(conf);
|
|
|
+ }
|
|
|
+
|
|
|
private void checkMovePaths(List<Path> actual, Path... expected) {
|
|
|
Assert.assertEquals(expected.length, actual.size());
|
|
|
for (Path p : expected) {
|
|
@@ -334,19 +396,10 @@ public class TestMover {
|
|
|
dfs.setStoragePolicy(new Path(file), "COLD");
|
|
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
|
|
new String[] { "-p", file.toString() });
|
|
|
- Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
|
|
|
+ Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
|
|
|
|
|
- // Wait till namenode notified
|
|
|
- Thread.sleep(3000);
|
|
|
- lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
|
|
- storageTypes = lb.getStorageTypes();
|
|
|
- int archiveCount = 0;
|
|
|
- for (StorageType storageType : storageTypes) {
|
|
|
- if (StorageType.ARCHIVE == storageType) {
|
|
|
- archiveCount++;
|
|
|
- }
|
|
|
- }
|
|
|
- Assert.assertEquals(archiveCount, 2);
|
|
|
+ // Wait till namenode notified about the block location details
|
|
|
+ waitForLocatedBlockWithArchiveStorageType(dfs, file, 2);
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
@@ -514,7 +567,7 @@ public class TestMover {
|
|
|
// run Mover
|
|
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
|
|
new String[] { "-p", barDir });
|
|
|
- Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
|
|
|
+ Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
|
|
|
|
|
// verify storage types and locations
|
|
|
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
|
|
@@ -562,4 +615,87 @@ public class TestMover {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void initSecureConf(Configuration conf) throws Exception {
|
|
|
+ String username = "mover";
|
|
|
+ File baseDir = GenericTestUtils.getTestDir(TestMover.class.getSimpleName());
|
|
|
+ FileUtil.fullyDelete(baseDir);
|
|
|
+ Assert.assertTrue(baseDir.mkdirs());
|
|
|
+
|
|
|
+ Properties kdcConf = MiniKdc.createConf();
|
|
|
+ MiniKdc kdc = new MiniKdc(kdcConf, baseDir);
|
|
|
+ kdc.start();
|
|
|
+
|
|
|
+ SecurityUtil.setAuthenticationMethod(
|
|
|
+ UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
+ KerberosName.resetDefaultRealm();
|
|
|
+ Assert.assertTrue("Expected configuration to enable security",
|
|
|
+ UserGroupInformation.isSecurityEnabled());
|
|
|
+
|
|
|
+ keytabFile = new File(baseDir, username + ".keytab");
|
|
|
+ String keytab = keytabFile.getAbsolutePath();
|
|
|
+ // Windows will not reverse name lookup "127.0.0.1" to "localhost".
|
|
|
+ String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
|
|
|
+ principal = username + "/" + krbInstance + "@" + kdc.getRealm();
|
|
|
+ String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
|
|
|
+ kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
|
|
|
+ "HTTP/" + krbInstance);
|
|
|
+
|
|
|
+ conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
|
|
|
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
|
|
|
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
|
|
|
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
|
|
|
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
|
|
|
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
|
|
+ conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
|
|
|
+ conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
|
|
+ conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
|
|
+ conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
|
|
+ conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
|
|
|
+
|
|
|
+ conf.setBoolean(DFS_MOVER_KEYTAB_ENABLED_KEY, true);
|
|
|
+ conf.set(DFS_MOVER_ADDRESS_KEY, "localhost:0");
|
|
|
+ conf.set(DFS_MOVER_KEYTAB_FILE_KEY, keytab);
|
|
|
+ conf.set(DFS_MOVER_KERBEROS_PRINCIPAL_KEY, principal);
|
|
|
+
|
|
|
+ String keystoresDir = baseDir.getAbsolutePath();
|
|
|
+ String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestMover.class);
|
|
|
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
|
|
|
+
|
|
|
+ conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
|
|
|
+ KeyStoreTestUtil.getClientSSLConfigFileName());
|
|
|
+ conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
|
|
|
+ KeyStoreTestUtil.getServerSSLConfigFileName());
|
|
|
+ initConf(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test Mover runs fine when logging in with a keytab in kerberized env.
|
|
|
+ * Reusing testWithinSameNode here for basic functionality testing.
|
|
|
+ */
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testMoverWithKeytabs() throws Exception {
|
|
|
+ final Configuration conf = new HdfsConfiguration();
|
|
|
+ try {
|
|
|
+ initSecureConf(conf);
|
|
|
+ final UserGroupInformation ugi = UserGroupInformation
|
|
|
+ .loginUserFromKeytabAndReturnUGI(principal,
|
|
|
+ keytabFile.getAbsolutePath());
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
+ @Override
|
|
|
+ public Void run() throws Exception {
|
|
|
+ // verify that mover runs Ok.
|
|
|
+ testWithinSameNode(conf);
|
|
|
+ // verify that UGI was logged in using keytab.
|
|
|
+ Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } finally {
|
|
|
+ // Reset UGI so that other tests are not affected.
|
|
|
+ UserGroupInformation.reset();
|
|
|
+ UserGroupInformation.setConfiguration(new Configuration());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|