|
@@ -0,0 +1,157 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
+
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.RandomAccessFile;
|
|
|
+import java.nio.channels.WritableByteChannel;
|
|
|
+import java.util.Random;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+public class TestFadvisedFileRegion {
|
|
|
+ private final int FILE_SIZE = 16*1024*1024;
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestFadvisedFileRegion.class);
|
|
|
+
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ public void testCustomShuffleTransfer() throws IOException {
|
|
|
+ File absLogDir = new File("target",
|
|
|
+ TestFadvisedFileRegion.class.getSimpleName() +
|
|
|
+ "LocDir").getAbsoluteFile();
|
|
|
+
|
|
|
+ String testDirPath =
|
|
|
+ StringUtils.join(Path.SEPARATOR,
|
|
|
+ new String[] { absLogDir.getAbsolutePath(),
|
|
|
+ "testCustomShuffleTransfer"});
|
|
|
+ File testDir = new File(testDirPath);
|
|
|
+ testDir.mkdirs();
|
|
|
+
|
|
|
+ System.out.println(testDir.getAbsolutePath());
|
|
|
+
|
|
|
+ File inFile = new File(testDir, "fileIn.out");
|
|
|
+ File outFile = new File(testDir, "fileOut.out");
|
|
|
+
|
|
|
+
|
|
|
+ //Initialize input file
|
|
|
+ byte [] initBuff = new byte[FILE_SIZE];
|
|
|
+ Random rand = new Random();
|
|
|
+ rand.nextBytes(initBuff);
|
|
|
+
|
|
|
+ FileOutputStream out = new FileOutputStream(inFile);
|
|
|
+ try{
|
|
|
+ out.write(initBuff);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, out);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //define position and count to read from a file region.
|
|
|
+ int position = 2*1024*1024;
|
|
|
+ int count = 4*1024*1024 - 1;
|
|
|
+
|
|
|
+ RandomAccessFile inputFile = null;
|
|
|
+ RandomAccessFile targetFile = null;
|
|
|
+ WritableByteChannel target = null;
|
|
|
+ FadvisedFileRegion fileRegion = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ inputFile = new RandomAccessFile(inFile.getAbsolutePath(), "r");
|
|
|
+ targetFile = new RandomAccessFile(outFile.getAbsolutePath(), "rw");
|
|
|
+ target = targetFile.getChannel();
|
|
|
+
|
|
|
+ Assert.assertEquals(FILE_SIZE, inputFile.length());
|
|
|
+
|
|
|
+ //create FadvisedFileRegion
|
|
|
+ fileRegion = new FadvisedFileRegion(
|
|
|
+ inputFile, position, count, false, 0, null, null, 1024, false);
|
|
|
+
|
|
|
+ //test corner cases
|
|
|
+ customShuffleTransferCornerCases(fileRegion, target, count);
|
|
|
+
|
|
|
+ long pos = 0;
|
|
|
+ long size;
|
|
|
+ while((size = fileRegion.customShuffleTransfer(target, pos)) > 0) {
|
|
|
+ pos += size;
|
|
|
+ }
|
|
|
+
|
|
|
+ //assert size
|
|
|
+ Assert.assertEquals(count, (int)pos);
|
|
|
+ Assert.assertEquals(count, targetFile.length());
|
|
|
+ } finally {
|
|
|
+ if (fileRegion != null) {
|
|
|
+ fileRegion.releaseExternalResources();
|
|
|
+ }
|
|
|
+ IOUtils.cleanup(LOG, target);
|
|
|
+ IOUtils.cleanup(LOG, targetFile);
|
|
|
+ IOUtils.cleanup(LOG, inputFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ //Read the target file and verify that copy is done correctly
|
|
|
+ byte [] buff = new byte[FILE_SIZE];
|
|
|
+ FileInputStream in = new FileInputStream(outFile);
|
|
|
+ try {
|
|
|
+ int total = in.read(buff, 0, count);
|
|
|
+
|
|
|
+ Assert.assertEquals(count, total);
|
|
|
+
|
|
|
+ for(int i = 0; i < count; i++) {
|
|
|
+ Assert.assertEquals(initBuff[position+i], buff[i]);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, in);
|
|
|
+ }
|
|
|
+
|
|
|
+ //delete files and folders
|
|
|
+ inFile.delete();
|
|
|
+ outFile.delete();
|
|
|
+ testDir.delete();
|
|
|
+ absLogDir.delete();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void customShuffleTransferCornerCases(
|
|
|
+ FadvisedFileRegion fileRegion, WritableByteChannel target, int count) {
|
|
|
+ try {
|
|
|
+ fileRegion.customShuffleTransfer(target, -1);
|
|
|
+ Assert.fail("Expected a IllegalArgumentException");
|
|
|
+ } catch (IllegalArgumentException ie) {
|
|
|
+ LOG.info("Expected - illegal argument is passed.");
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Expected a IllegalArgumentException");
|
|
|
+ }
|
|
|
+
|
|
|
+ //test corner cases
|
|
|
+ try {
|
|
|
+ fileRegion.customShuffleTransfer(target, count + 1);
|
|
|
+ Assert.fail("Expected a IllegalArgumentException");
|
|
|
+ } catch (IllegalArgumentException ie) {
|
|
|
+ LOG.info("Expected - illegal argument is passed.");
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Expected a IllegalArgumentException");
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|