|
@@ -24,22 +24,48 @@ import static org.apache.hadoop.test.MockitoMaker.make;
|
|
|
import static org.apache.hadoop.test.MockitoMaker.stub;
|
|
|
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assume.assumeTrue;
|
|
|
+
|
|
|
import java.io.DataInputStream;
|
|
|
+import java.io.EOFException;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.SocketException;
|
|
|
import java.net.URL;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.zip.CheckedOutputStream;
|
|
|
+import java.util.zip.Checksum;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
|
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
import org.apache.hadoop.metrics2.MetricsSource;
|
|
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.util.PureJavaCrc32;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
|
+import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.jboss.netty.channel.Channel;
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
@@ -245,4 +271,131 @@ public class TestShuffleHandler {
|
|
|
|
|
|
shuffleHandler.stop();
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ public void testMapFileAccess() throws IOException {
|
|
|
+ // This will run only in NativeIO is enabled as SecureIOUtils need it
|
|
|
+ assumeTrue(NativeIO.isAvailable());
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
|
|
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
|
|
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
+ "kerberos");
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
+ File absLogDir = new File("target",
|
|
|
+ TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
|
|
|
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(12345, 1);
|
|
|
+ System.out.println(appId.toString());
|
|
|
+ String appAttemptId = "attempt_12345_1_m_1_0";
|
|
|
+ String user = "randomUser";
|
|
|
+ String reducerId = "0";
|
|
|
+ List<File> fileMap = new ArrayList<File>();
|
|
|
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
|
|
|
+ conf, fileMap);
|
|
|
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Shuffle getShuffle(Configuration conf) {
|
|
|
+ // replace the shuffle handler with one stubbed for testing
|
|
|
+ return new Shuffle(conf) {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
|
|
+ HttpRequest request, HttpResponse response, URL requestUri)
|
|
|
+ throws IOException {
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ shuffleHandler.init(conf);
|
|
|
+ try {
|
|
|
+ shuffleHandler.start();
|
|
|
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
|
|
|
+ outputBuffer.reset();
|
|
|
+ Token<JobTokenIdentifier> jt =
|
|
|
+ new Token<JobTokenIdentifier>("identifier".getBytes(),
|
|
|
+ "password".getBytes(), new Text(user), new Text("shuffleService"));
|
|
|
+ jt.write(outputBuffer);
|
|
|
+ shuffleHandler.initApp(user, appId,
|
|
|
+ ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()));
|
|
|
+ URL url =
|
|
|
+ new URL(
|
|
|
+ "http://127.0.0.1:"
|
|
|
+ + shuffleHandler.getConfig().get(
|
|
|
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
|
|
|
+ + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
|
|
|
+ + "&map=attempt_12345_1_m_1_0");
|
|
|
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
|
+ conn.connect();
|
|
|
+ byte[] byteArr = new byte[10000];
|
|
|
+ try {
|
|
|
+ DataInputStream is = new DataInputStream(conn.getInputStream());
|
|
|
+ is.readFully(byteArr);
|
|
|
+ } catch (EOFException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+ // Retrieve file owner name
|
|
|
+ FileInputStream is = new FileInputStream(fileMap.get(0));
|
|
|
+ String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner();
|
|
|
+ is.close();
|
|
|
+
|
|
|
+ String message =
|
|
|
+ "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath()
|
|
|
+ + " did not match expected owner '" + user + "'";
|
|
|
+ Assert.assertTrue((new String(byteArr)).contains(message));
|
|
|
+ } finally {
|
|
|
+ shuffleHandler.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void createShuffleHandlerFiles(File logDir, String user,
|
|
|
+ String appId, String appAttemptId, Configuration conf,
|
|
|
+ List<File> fileMap) throws IOException {
|
|
|
+ String attemptDir =
|
|
|
+ StringUtils.join(Path.SEPARATOR,
|
|
|
+ new String[] { logDir.getAbsolutePath(),
|
|
|
+ ContainerLocalizer.USERCACHE, user,
|
|
|
+ ContainerLocalizer.APPCACHE, appId, "output", appAttemptId });
|
|
|
+ File appAttemptDir = new File(attemptDir);
|
|
|
+ appAttemptDir.mkdirs();
|
|
|
+ System.out.println(appAttemptDir.getAbsolutePath());
|
|
|
+ File indexFile = new File(appAttemptDir, "file.out.index");
|
|
|
+ fileMap.add(indexFile);
|
|
|
+ createIndexFile(indexFile, conf);
|
|
|
+ File mapOutputFile = new File(appAttemptDir, "file.out");
|
|
|
+ fileMap.add(mapOutputFile);
|
|
|
+ createMapOutputFile(mapOutputFile, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void
|
|
|
+ createMapOutputFile(File mapOutputFile, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ FileOutputStream out = new FileOutputStream(mapOutputFile);
|
|
|
+ out.write("Creating new dummy map output file. Used only for testing"
|
|
|
+ .getBytes());
|
|
|
+ out.flush();
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void createIndexFile(File indexFile, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ if (indexFile.exists()) {
|
|
|
+ System.out.println("Deleting existing file");
|
|
|
+ indexFile.delete();
|
|
|
+ }
|
|
|
+ indexFile.createNewFile();
|
|
|
+ FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
|
|
|
+ new Path(indexFile.getAbsolutePath()));
|
|
|
+ Checksum crc = new PureJavaCrc32();
|
|
|
+ crc.reset();
|
|
|
+ CheckedOutputStream chk = new CheckedOutputStream(output, crc);
|
|
|
+ String msg = "Writing new index file. This file will be used only " +
|
|
|
+ "for the testing.";
|
|
|
+ chk.write(Arrays.copyOf(msg.getBytes(),
|
|
|
+ MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
|
|
|
+ output.writeLong(chk.getChecksum().getValue());
|
|
|
+ output.close();
|
|
|
+ }
|
|
|
}
|