|
@@ -16,6 +16,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.fs.s3a.auth.delegation;
|
|
|
|
|
|
+import java.net.URI;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
|
|
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.examples.WordCount;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|
@@ -47,6 +49,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
|
|
import static java.util.Objects.requireNonNull;
|
|
|
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
|
|
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
|
|
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
|
|
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
|
@@ -72,7 +75,12 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopClust
|
|
|
* of org.apache.hadoop.mapreduce.protocol.ClientProtocol is mock.
|
|
|
*
|
|
|
* It's still an ITest though, as it does use S3A as the source and
|
|
|
- * dest so as to collect URLs.
|
|
|
+ * dest so as to collect delegation tokens.
|
|
|
+ *
|
|
|
+ * It also uses the open street map open bucket, so that there's an extra
|
|
|
+ * S3 URL in job submission which can be added as a job resource.
|
|
|
+ * This is needed to verify that job resources have their tokens extracted
|
|
|
+ * too.
|
|
|
*/
|
|
|
@RunWith(Parameterized.class)
|
|
|
public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
@@ -99,6 +107,11 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
|
|
|
|
private Path destPath;
|
|
|
|
|
|
+ private static final Path EXTRA_JOB_RESOURCE_PATH
|
|
|
+ = new Path("s3a://osm-pds/planet/planet-latest.orc");
|
|
|
+
|
|
|
+ public static final URI jobResource = EXTRA_JOB_RESOURCE_PATH.toUri();
|
|
|
+
|
|
|
/**
|
|
|
* Test array for parameterized test runs.
|
|
|
* @return a list of parameter tuples.
|
|
@@ -149,6 +162,15 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
|
conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
|
|
10_000);
|
|
|
|
|
|
+ // turn off DDB for the job resource bucket
|
|
|
+ String host = jobResource.getHost();
|
|
|
+ conf.set(
|
|
|
+ String.format("fs.s3a.bucket.%s.metadatastore.impl", host),
|
|
|
+ S3GUARD_METASTORE_NULL);
|
|
|
+ // and fix to the main endpoint if the caller has moved
|
|
|
+ conf.set(
|
|
|
+ String.format("fs.s3a.bucket.%s.endpoint", host), "");
|
|
|
+
|
|
|
// set up DTs
|
|
|
enableDelegationTokens(conf, tokenBinding);
|
|
|
return conf;
|
|
@@ -210,6 +232,15 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
|
return getTestTimeoutSeconds() * 1000;
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCommonCrawlLookup() throws Throwable {
|
|
|
+ FileSystem resourceFS = EXTRA_JOB_RESOURCE_PATH.getFileSystem(
|
|
|
+ getConfiguration());
|
|
|
+ FileStatus status = resourceFS.getFileStatus(EXTRA_JOB_RESOURCE_PATH);
|
|
|
+ LOG.info("Extra job resource is {}", status);
|
|
|
+ assertTrue("Not encrypted: " + status, status.isEncrypted());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testJobSubmissionCollectsTokens() throws Exception {
|
|
|
describe("Mock Job test");
|
|
@@ -242,6 +273,14 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
|
job.setMaxMapAttempts(1);
|
|
|
job.setMaxReduceAttempts(1);
|
|
|
|
|
|
+ // and a file for a different store.
|
|
|
+ // This is to actually stress the terasort code for which
|
|
|
+ // the yarn ResourceLocalizationService was having problems with
|
|
|
+ // fetching resources from.
|
|
|
+ URI partitionUri = new URI(EXTRA_JOB_RESOURCE_PATH.toString() +
|
|
|
+ "#_partition.lst");
|
|
|
+ job.addCacheFile(partitionUri);
|
|
|
+
|
|
|
describe("Executing Mock Job Submission to %s", output);
|
|
|
|
|
|
job.submit();
|
|
@@ -267,6 +306,8 @@ public class ITestDelegatedMRJob extends AbstractDelegationIT {
|
|
|
lookupToken(submittedCredentials, sourceFS.getUri(), tokenKind);
|
|
|
// look up the destination token
|
|
|
lookupToken(submittedCredentials, fs.getUri(), tokenKind);
|
|
|
+ lookupToken(submittedCredentials,
|
|
|
+ EXTRA_JOB_RESOURCE_PATH.getFileSystem(conf).getUri(), tokenKind);
|
|
|
}
|
|
|
|
|
|
}
|