|
@@ -0,0 +1,206 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
|
+
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.Random;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.fs.*;
|
|
|
|
+import org.apache.hadoop.io.*;
|
|
|
|
+import org.apache.hadoop.mapred.*;
|
|
|
|
+import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
|
+
|
|
|
|
+import junit.framework.TestCase;
|
|
|
|
+import org.apache.commons.logging.*;
|
|
|
|
+
|
|
|
|
+public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
|
|
|
|
+ private static final Log LOG =
|
|
|
|
+ LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
|
|
|
|
+
|
|
|
|
+ private static final int RECORDS = 10000;
|
|
|
|
+
|
|
|
|
+ public void testBinary() throws IOException {
|
|
|
|
+ JobConf job = new JobConf();
|
|
|
|
+ FileSystem fs = FileSystem.getLocal(job);
|
|
|
|
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
|
|
|
|
+ Path file = new Path(dir, "testbinary.seq");
|
|
|
|
+ Random r = new Random();
|
|
|
|
+ long seed = r.nextLong();
|
|
|
|
+ r.setSeed(seed);
|
|
|
|
+
|
|
|
|
+ fs.delete(dir, true);
|
|
|
|
+ if (!fs.mkdirs(dir)) {
|
|
|
|
+ fail("Failed to create output directory");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FileOutputFormat.setWorkOutputPath(job, dir);
|
|
|
|
+
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
|
|
|
|
+ IntWritable.class );
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
|
|
|
|
+ DoubleWritable.class );
|
|
|
|
+
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
|
|
|
|
+ CompressionType.BLOCK);
|
|
|
|
+
|
|
|
|
+ BytesWritable bkey = new BytesWritable();
|
|
|
|
+ BytesWritable bval = new BytesWritable();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ RecordWriter <BytesWritable, BytesWritable> writer =
|
|
|
|
+ new SequenceFileAsBinaryOutputFormat().getRecordWriter(fs,
|
|
|
|
+ job, file.toString(),
|
|
|
|
+ Reporter.NULL);
|
|
|
|
+
|
|
|
|
+ IntWritable iwritable = new IntWritable();
|
|
|
|
+ DoubleWritable dwritable = new DoubleWritable();
|
|
|
|
+ DataOutputBuffer outbuf = new DataOutputBuffer();
|
|
|
|
+ LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < RECORDS; ++i) {
|
|
|
|
+ iwritable = new IntWritable(r.nextInt());
|
|
|
|
+ iwritable.write(outbuf);
|
|
|
|
+ bkey.set(outbuf.getData(), 0, outbuf.getLength());
|
|
|
|
+ outbuf.reset();
|
|
|
|
+ dwritable = new DoubleWritable(r.nextDouble());
|
|
|
|
+ dwritable.write(outbuf);
|
|
|
|
+ bval.set(outbuf.getData(), 0, outbuf.getLength());
|
|
|
|
+ outbuf.reset();
|
|
|
|
+ writer.write(bkey, bval);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ writer.close(Reporter.NULL);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ InputFormat<IntWritable,DoubleWritable> iformat =
|
|
|
|
+ new SequenceFileInputFormat<IntWritable,DoubleWritable>();
|
|
|
|
+ int count = 0;
|
|
|
|
+ r.setSeed(seed);
|
|
|
|
+ DataInputBuffer buf = new DataInputBuffer();
|
|
|
|
+ final int NUM_SPLITS = 3;
|
|
|
|
+ SequenceFileInputFormat.addInputPath(job, file);
|
|
|
|
+ LOG.info("Reading data by SequenceFileInputFormat");
|
|
|
|
+ for (InputSplit split : iformat.getSplits(job, NUM_SPLITS)) {
|
|
|
|
+ RecordReader<IntWritable,DoubleWritable> reader =
|
|
|
|
+ iformat.getRecordReader(split, job, Reporter.NULL);
|
|
|
|
+ try {
|
|
|
|
+ int sourceInt;
|
|
|
|
+ double sourceDouble;
|
|
|
|
+ while (reader.next(iwritable, dwritable)) {
|
|
|
|
+ sourceInt = r.nextInt();
|
|
|
|
+ sourceDouble = r.nextDouble();
|
|
|
|
+ assertEquals(
|
|
|
|
+ "Keys don't match: " + "*" + iwritable.get() + ":" +
|
|
|
|
+ sourceInt + "*",
|
|
|
|
+ sourceInt, iwritable.get());
|
|
|
|
+ assertTrue(
|
|
|
|
+ "Vals don't match: " + "*" + dwritable.get() + ":" +
|
|
|
|
+ sourceDouble + "*",
|
|
|
|
+ Double.compare(dwritable.get(), sourceDouble) == 0 );
|
|
|
|
+ ++count;
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ reader.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals("Some records not found", RECORDS, count);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void testSequenceOutputClassDefaultsToMapRedOutputClass()
|
|
|
|
+ throws IOException {
|
|
|
|
+ JobConf job = new JobConf();
|
|
|
|
+ FileSystem fs = FileSystem.getLocal(job);
|
|
|
|
+
|
|
|
|
+ // Setting Random class to test getSequenceFileOutput{Key,Value}Class
|
|
|
|
+ job.setOutputKeyClass(FloatWritable.class);
|
|
|
|
+ job.setOutputValueClass(BooleanWritable.class);
|
|
|
|
+
|
|
|
|
+ assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass",
|
|
|
|
+ FloatWritable.class,
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
|
|
|
|
+ job));
|
|
|
|
+ assertEquals("SequenceFileOutputValueClass should default to "
|
|
|
|
+ + "ouputValueClass",
|
|
|
|
+ BooleanWritable.class,
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
|
|
|
|
+ job));
|
|
|
|
+
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
|
|
|
|
+ IntWritable.class );
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
|
|
|
|
+ DoubleWritable.class );
|
|
|
|
+
|
|
|
|
+ assertEquals("SequenceFileOutputKeyClass not updated",
|
|
|
|
+ IntWritable.class,
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
|
|
|
|
+ job));
|
|
|
|
+ assertEquals("SequenceFileOutputValueClass not updated",
|
|
|
|
+ DoubleWritable.class,
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
|
|
|
|
+ job));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void testcheckOutputSpecsForbidRecordCompression() throws IOException {
|
|
|
|
+ JobConf job = new JobConf();
|
|
|
|
+ FileSystem fs = FileSystem.getLocal(job);
|
|
|
|
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
|
|
|
|
+ Path outputdir = new Path(System.getProperty("test.build.data",".")
|
|
|
|
+ + "/output");
|
|
|
|
+
|
|
|
|
+ fs.delete(dir, true);
|
|
|
|
+ fs.delete(outputdir, true);
|
|
|
|
+ if (!fs.mkdirs(dir)) {
|
|
|
|
+ fail("Failed to create output directory");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FileOutputFormat.setWorkOutputPath(job, dir);
|
|
|
|
+
|
|
|
|
+ // Without outputpath, FileOutputFormat.checkoutputspecs will throw
|
|
|
|
+ // InvalidJobConfException
|
|
|
|
+ FileOutputFormat.setOutputPath(job, outputdir);
|
|
|
|
+
|
|
|
|
+ // SequenceFileAsBinaryOutputFormat doesn't support record compression
|
|
|
|
+ // It should throw an exception when checked by checkOutputSpecs
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
|
|
|
|
+
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
|
|
|
|
+ CompressionType.BLOCK);
|
|
|
|
+ try {
|
|
|
|
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ fail("Block compression should be allowed for "
|
|
|
|
+ + "SequenceFileAsBinaryOutputFormat:"
|
|
|
|
+ + "Caught " + e.getClass().getName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
|
|
|
|
+ CompressionType.RECORD);
|
|
|
|
+ try {
|
|
|
|
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
|
|
|
|
+ fail("Record compression should not be allowed for "
|
|
|
|
+ +"SequenceFileAsBinaryOutputFormat");
|
|
|
|
+ } catch (InvalidJobConfException ie) {
|
|
|
|
+ // expected
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ fail("Expected " + InvalidJobConfException.class.getName()
|
|
|
|
+ + "but caught " + e.getClass().getName() );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|