浏览代码

HADOOP-918. Add an example of of Abacus use with Python. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@498809 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
d0682d5d46

+ 34 - 0
src/contrib/abacus/examples/pyAbacus/JyAbacusWCPlugIN.py

@@ -0,0 +1,34 @@
+#
+# Copyright 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from org.apache.hadoop.fs import Path
+from org.apache.hadoop.io import *
+from org.apache.hadoop.mapred import *
+
+from org.apache.hadoop.abacus import *;
+
+from java.util import *;
+
+import sys
+
+class AbacusWordCount(ValueAggregatorBaseDescriptor):
+    def generateKeyValPairs(self, key, val):
+        retv = ArrayList();
+        for w in val.toString().split():
+            en =  ValueAggregatorBaseDescriptor.generateEntry(ValueAggregatorBaseDescriptor.LONG_VALUE_SUM, w, ValueAggregatorBaseDescriptor.ONE);
+            retv.add(en);
+        return retv;
+

+ 80 - 0
src/contrib/abacus/examples/pyAbacus/JythonAbacus.py

@@ -0,0 +1,80 @@
+#
+# Copyright 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from org.apache.hadoop.fs import Path
+from org.apache.hadoop.io import *
+from org.apache.hadoop.mapred import *
+
+from org.apache.hadoop.abacus import *
+
+from java.util import *;
+
+import sys
+
+class AbacusMapper(ValueAggregatorMapper):
+    def map(self, key, value, output, reporter):
+        ValueAggregatorMapper.map(self, key, value, output, reporter);
+
+class AbacusReducer(ValueAggregatorReducer):
+    def reduce(self, key, values, output, reporter):
+        ValueAggregatorReducer.reduce(self, key, values, output, reporter);
+
+class AbacusCombiner(ValueAggregatorCombiner):
+    def reduce(self, key, values, output, reporter):
+        ValueAggregatorCombiner.reduce(self, key, values, output, reporter);
+
+def printUsage(code):
+    print "Abacus <input> <output> <numOfReducers> <inputformat> <specfile>"
+    sys.exit(code)
+
+def main(args):
+    if len(args) < 6:
+        printUsage(1);
+
+    inDir = args[1];
+    outDir = args[2];
+    numOfReducers = int(args[3]);
+    theInputFormat = args[4];
+    specFile = args[5];
+                                        
+    print "numOfReducers: ", numOfReducers, "theInputFormat: ", theInputFormat, "specFile: ", specFile
+
+    conf = JobConf(AbacusMapper);
+    conf.setJobName("recordcount");
+    conf.addDefaultResource(Path(specFile));
+ 
+    if theInputFormat=="textinputformat":
+        conf.setInputFormat(TextInputFormat);
+    else:
+        conf.setInputFormat(SequenceFileInputFormat);
+    conf.setOutputFormat(TextOutputFormat);
+    conf.setMapOutputKeyClass(Text);
+    conf.setMapOutputValueClass(Text);
+    conf.setOutputKeyClass(Text);
+    conf.setOutputValueClass(Text);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(numOfReducers);
+
+    conf.setMapperClass(AbacusMapper);        
+    conf.setCombinerClass(AbacusCombiner);
+    conf.setReducerClass(AbacusReducer);
+    conf.setInputPath(Path(args[1]))
+    conf.setOutputPath(Path(args[2]))
+
+    JobClient.runJob(conf);
+
+if __name__ == "__main__":
+    main(sys.argv)

+ 25 - 0
src/contrib/abacus/examples/pyAbacus/compile

@@ -0,0 +1,25 @@
+#!/bin/bash
+
+export HADOOP_HOME=../../../../..
+
+export CLASSPATH="$HADOOP_HOME/build/classes"
+export CLASSPATH=${CLASSPATH}:"$HADOOP_HOME/build/contrib/abacus/classes"
+
+# so that filenames w/ spaces are handled correctly in loops below
+IFS=
+
+# add libs to CLASSPATH
+for f in $HADOOP_HOME/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+# restore ordinary behaviour
+unset IFS
+jythonc -p org.apache.hadoop.abacus.examples -d -j jwc.jar -c JythonAbacus.py JyAbacusWCPlugIN.py
+
+jar -uvf jwc.jar -C $HADOOP_HOME/build/contrib/abacus/classes .
+

+ 15 - 0
src/contrib/abacus/examples/pyAbacus/wordcountaggregator.spec

@@ -0,0 +1,15 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+<property>
+  <name>aggregator.descriptor.num</name>
+  <value>1</value>
+</property>
+
+<property>
+   <name>aggregator.descriptor.0</name>
+   <value>UserDefined,org.apache.hadoop.abacus.examples.JyAbacusWCPlugIN$AbacusWordCount</value>
+o</property>
+</configuration>