瀏覽代碼

HADOOP-5277. Added new InputFormat. Thanks Ari.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@748023 13f79535-47bb-0310-9956-ffa450edef68
Eric Yang 16 年之前
父節點
當前提交
a98963a798

+ 133 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/ChukwaInputFormat.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools;
+
+import java.io.IOException;
+
+import java.util.regex.*;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.log4j.Logger;
+
+/***
+ * An InputFormat for processing logfiles in Chukwa.
+ * Designed to be a nearly drop-in replacement for the Hadoop default
+ * TextInputFormat so that code can be ported to use Chukwa with minimal
+ * modification.
+ *
+ * Has an optional configuration option, chukwa.inputfilter.datatype
+ * which can be used to filter the input by datatype. If need
+ * exists, this mechanism could be extended to also filter by
+ * other fields.
+ * 
+ */
+public class ChukwaInputFormat extends SequenceFileInputFormat<LongWritable, Text> {
+
+  public static class ChukwaRecordReader implements RecordReader<LongWritable, Text> {
+    
+    static Logger LOG = Logger.getLogger(ChukwaInputFormat.class);
+    
+    private SequenceFileRecordReader<ChukwaArchiveKey, Chunk> sfrr;
+    private long lineInFile =0;
+    private Chunk curChunk = null;
+    private int lineInChunk; //outside of next, it's the array offset of next line to be returned
+    private int[] lineOffsets = null;
+    private int byteOffsetOfLastLine = 0;
+    Pattern dtPattern;
+    
+    
+    public ChukwaRecordReader(Configuration conf, FileSplit split)
+    throws IOException {
+      sfrr = new SequenceFileRecordReader<ChukwaArchiveKey, Chunk>(conf, split);
+      dtPattern = Pattern.compile(conf.get("chukwa.inputfilter.datatype", ".*"));
+    }
+  
+    @Override
+    public void close() throws IOException {
+      sfrr.close();
+    }
+  
+    @Override
+    public LongWritable createKey() {
+      return new LongWritable();
+    }
+  
+    @Override
+    public Text createValue() {
+      return new Text();
+    }
+  
+    @Override
+    public long getPos() throws IOException {
+      return sfrr.getPos();
+    }
+  
+    @Override
+    public float getProgress() throws IOException {
+      return sfrr.getProgress();
+    }
+  
+    private boolean passesFilters(Chunk c) {
+      return dtPattern.matcher(c.getDataType()).matches();
+    }
+    
+    @Override
+    public boolean next(LongWritable key, Text value) throws IOException {
+      if(curChunk == null) {
+        ChukwaArchiveKey k = new ChukwaArchiveKey();
+        curChunk = ChunkImpl.getBlankChunk();
+        boolean unfilteredChunk = false;
+        while(!unfilteredChunk) {
+          boolean readOK = sfrr.next(k, curChunk);
+          if(!readOK) {
+            curChunk = null;
+            return false;
+          }
+          unfilteredChunk = passesFilters(curChunk);
+        }
+        lineOffsets = curChunk.getRecordOffsets();
+        lineInChunk = 0;
+        byteOffsetOfLastLine = 0;
+      } //end curChunk == null
+      value.set(curChunk.getData(), byteOffsetOfLastLine , lineOffsets[lineInChunk]-byteOffsetOfLastLine);
+      if(lineInChunk >= lineOffsets.length - 1) { //end of chunk
+        curChunk = null;
+      } else
+        byteOffsetOfLastLine = lineOffsets[lineInChunk++]+1;
+      
+      key.set(lineInFile);
+      lineInFile++;
+      return true;
+    }
+  } //end ChukwaRecordReader
+
+  @Override
+  public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter)
+  throws IOException {
+    reporter.setStatus(split.toString());
+    LOG.info("returning a new chukwa record reader");
+    return new ChukwaRecordReader(job, (FileSplit) split);
+  }
+
+}

+ 93 - 0
src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/inputtools/TestInputFormat.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.Reporter;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+
+public class TestInputFormat extends TestCase {
+  
+  String[] lines = {
+    "the rain",
+    "in spain",
+    "falls mainly",
+    "in the plain"
+  };
+  
+  public void testInputFormat() {
+    
+    try {
+    JobConf conf = new JobConf();
+    String TMP_DIR = System.getProperty("test.build.data", "/tmp");
+    Path filename = new Path("file:///"+TMP_DIR+"/tmpSeqFile");
+    SequenceFile.Writer sfw = SequenceFile.createWriter(FileSystem.getLocal(conf),
+        conf, filename, ChukwaArchiveKey.class, ChunkImpl.class,
+        SequenceFile.CompressionType.NONE, Reporter.NULL);
+    
+    
+    StringBuilder buf = new StringBuilder();
+    int offsets[] = new int[lines.length];
+    for(int i= 0; i < lines.length; ++i) {
+      buf.append(lines[i]);
+      buf.append("\n");
+      offsets[i] = buf.length()-1;
+    }
+    ChukwaArchiveKey key = new ChukwaArchiveKey(0, "datatype", "sname", 0);
+    ChunkImpl val = new ChunkImpl("datatype", "sname", 0, buf.toString().getBytes(), null);
+    val.setRecordOffsets(offsets);
+    sfw.append(key, val);
+    sfw.append(key, val); //write it twice
+    sfw.close();
+    
+    
+    long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
+    InputSplit split = new FileSplit(filename, 0, len, (String[] ) null);
+    ChukwaInputFormat in = new ChukwaInputFormat();
+    RecordReader<LongWritable, Text> r= in.getRecordReader(split, conf, Reporter.NULL);
+    
+
+    LongWritable l = r.createKey();
+    Text line = r.createValue();
+    for(int i =0 ; i < lines.length * 2; ++i) {
+      boolean succeeded = r.next(l, line);
+      assertTrue(succeeded);
+      assertEquals(i, l.get());
+      assertEquals(lines[i % lines.length] , line.toString());
+      System.out.println("read line: "+ l.get() + " "+ line);
+    }
+    boolean succeeded = r.next(l, line);
+    assertFalse(succeeded);
+    
+    } catch(IOException e) {
+      e.printStackTrace();
+      fail("IO exception "+ e);
+    }
+  }
+
+}