|
@@ -0,0 +1,543 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.mapred.lib;
|
|
|
+
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.Stringifier;
|
|
|
+import org.apache.hadoop.io.DefaultStringifier;
|
|
|
+import org.apache.hadoop.io.serializer.Deserializer;
|
|
|
+import org.apache.hadoop.io.serializer.Serialization;
|
|
|
+import org.apache.hadoop.io.serializer.SerializationFactory;
|
|
|
+import org.apache.hadoop.io.serializer.Serializer;
|
|
|
+import org.apache.hadoop.mapred.*;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.util.GenericsUtil;
|
|
|
+
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * The Chain class provides all the common functionality for the
|
|
|
+ * {@link ChainMapper} and the {@link ChainReducer} classes.
|
|
|
+ */
|
|
|
+class Chain {
|
|
|
+ private static final String CHAIN_MAPPER = "chain.mapper";
|
|
|
+ private static final String CHAIN_REDUCER = "chain.reducer";
|
|
|
+
|
|
|
+ private static final String CHAIN_MAPPER_SIZE = ".size";
|
|
|
+ private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
|
|
|
+ private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
|
|
|
+ private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
|
|
|
+ private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
|
|
|
+
|
|
|
+ private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
|
|
|
+ private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
|
|
|
+
|
|
|
+ private static final String MAPPER_INPUT_KEY_CLASS =
|
|
|
+ "chain.mapper.input.key.class";
|
|
|
+ private static final String MAPPER_INPUT_VALUE_CLASS =
|
|
|
+ "chain.mapper.input.value.class";
|
|
|
+ private static final String MAPPER_OUTPUT_KEY_CLASS =
|
|
|
+ "chain.mapper.output.key.class";
|
|
|
+ private static final String MAPPER_OUTPUT_VALUE_CLASS =
|
|
|
+ "chain.mapper.output.value.class";
|
|
|
+ private static final String REDUCER_INPUT_KEY_CLASS =
|
|
|
+ "chain.reducer.input.key.class";
|
|
|
+ private static final String REDUCER_INPUT_VALUE_CLASS =
|
|
|
+ "chain.reducer.input.value.class";
|
|
|
+ private static final String REDUCER_OUTPUT_KEY_CLASS =
|
|
|
+ "chain.reducer.output.key.class";
|
|
|
+ private static final String REDUCER_OUTPUT_VALUE_CLASS =
|
|
|
+ "chain.reducer.output.value.class";
|
|
|
+
|
|
|
+ private boolean isMap;
|
|
|
+
|
|
|
+ private JobConf chainJobConf;
|
|
|
+
|
|
|
+ private List<Mapper> mappers = new ArrayList<Mapper>();
|
|
|
+ private Reducer reducer;
|
|
|
+
|
|
|
+ // to cache the key/value output class serializations for each chain element
|
|
|
+ // to avoid everytime lookup.
|
|
|
+ private List<Serialization> mappersKeySerialization =
|
|
|
+ new ArrayList<Serialization>();
|
|
|
+ private List<Serialization> mappersValueSerialization =
|
|
|
+ new ArrayList<Serialization>();
|
|
|
+ private Serialization reducerKeySerialization;
|
|
|
+ private Serialization reducerValueSerialization;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a Chain instance configured for a Mapper or a Reducer.
|
|
|
+ *
|
|
|
+ * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
|
|
|
+ * Reducer.
|
|
|
+ */
|
|
|
+ Chain(boolean isMap) {
|
|
|
+ this.isMap = isMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the prefix to use for the configuration of the chain depending
|
|
|
+ * if it is for a Mapper or a Reducer.
|
|
|
+ *
|
|
|
+ * @param isMap TRUE for Mapper, FALSE for Reducer.
|
|
|
+ * @return the prefix to use.
|
|
|
+ */
|
|
|
+ private static String getPrefix(boolean isMap) {
|
|
|
+ return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
|
|
|
+ * <p/>
|
|
|
+ * It creates a new JobConf using the chain job's JobConf as base and adds to
|
|
|
+ * it the configuration properties for the chain element. The keys of the
|
|
|
+ * chain element jobConf have precedence over the given JobConf.
|
|
|
+ *
|
|
|
+ * @param jobConf the chain job's JobConf.
|
|
|
+ * @param confKey the key for chain element configuration serialized in the
|
|
|
+ * chain job's JobConf.
|
|
|
+ * @return a new JobConf aggregating the chain job's JobConf with the chain
|
|
|
+ * element configuration properties.
|
|
|
+ */
|
|
|
+ private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
|
|
|
+ JobConf conf;
|
|
|
+ try {
|
|
|
+ Stringifier<WritableJobConf> stringifier =
|
|
|
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
|
|
|
+ conf = stringifier.fromString(jobConf.get(confKey, null));
|
|
|
+ } catch (IOException ioex) {
|
|
|
+ throw new RuntimeException(ioex);
|
|
|
+ }
|
|
|
+ // we have to do this because the Writable desearialization clears all
|
|
|
+ // values set in the conf making not possible do do a new JobConf(jobConf)
|
|
|
+ // in the creation of the conf above
|
|
|
+ jobConf = new JobConf(jobConf);
|
|
|
+
|
|
|
+ for(Map.Entry<String, String> entry : conf) {
|
|
|
+ jobConf.set(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ return jobConf;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Adds a Mapper class to the chain job's JobConf.
|
|
|
+ * <p/>
|
|
|
+ * The configuration properties of the chain job have precedence over the
|
|
|
+ * configuration properties of the Mapper.
|
|
|
+ *
|
|
|
+ * @param isMap indicates if the Chain is for a Mapper or for a
|
|
|
+ * Reducer.
|
|
|
+ * @param jobConf chain job's JobConf to add the Mapper class.
|
|
|
+ * @param klass the Mapper class to add.
|
|
|
+ * @param inputKeyClass mapper input key class.
|
|
|
+ * @param inputValueClass mapper input value class.
|
|
|
+ * @param outputKeyClass mapper output key class.
|
|
|
+ * @param outputValueClass mapper output value class.
|
|
|
+ * @param byValue indicates if key/values should be passed by value
|
|
|
+ * to the next Mapper in the chain, if any.
|
|
|
+ * @param mapperConf a JobConf with the configuration for the Mapper
|
|
|
+ * class. It is recommended to use a JobConf without default values using the
|
|
|
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
|
|
|
+ */
|
|
|
+ public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
|
|
|
+ Class<? extends Mapper<K1, V1, K2, V2>> klass,
|
|
|
+ Class<? extends K1> inputKeyClass,
|
|
|
+ Class<? extends V1> inputValueClass,
|
|
|
+ Class<? extends K2> outputKeyClass,
|
|
|
+ Class<? extends V2> outputValueClass,
|
|
|
+ boolean byValue, JobConf mapperConf) {
|
|
|
+ String prefix = getPrefix(isMap);
|
|
|
+
|
|
|
+ // if a reducer chain check the Reducer has been already set
|
|
|
+ if (!isMap) {
|
|
|
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
|
|
|
+ Reducer.class) == null) {
|
|
|
+ throw new IllegalStateException(
|
|
|
+ "A Mapper can be added to the chain only after the Reducer has " +
|
|
|
+ "been set");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
|
|
|
+ jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
|
|
|
+
|
|
|
+ // if it is a reducer chain and the first Mapper is being added check the
|
|
|
+ // key and value input classes of the mapper match those of the reducer
|
|
|
+ // output.
|
|
|
+ if (!isMap && index == 0) {
|
|
|
+ JobConf reducerConf =
|
|
|
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
|
|
|
+ if (! inputKeyClass.isAssignableFrom(
|
|
|
+ reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
|
|
|
+ throw new IllegalArgumentException("The Reducer output key class does" +
|
|
|
+ " not match the Mapper input key class");
|
|
|
+ }
|
|
|
+ if (! inputValueClass.isAssignableFrom(
|
|
|
+ reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
|
|
|
+ throw new IllegalArgumentException("The Reducer output value class" +
|
|
|
+ " does not match the Mapper input value class");
|
|
|
+ }
|
|
|
+ } else if (index > 0) {
|
|
|
+ // check the that the new Mapper in the chain key and value input classes
|
|
|
+ // match those of the previous Mapper output.
|
|
|
+ JobConf previousMapperConf =
|
|
|
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
|
|
|
+ (index - 1));
|
|
|
+ if (! inputKeyClass.isAssignableFrom(
|
|
|
+ previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
|
|
|
+ throw new IllegalArgumentException("The Mapper output key class does" +
|
|
|
+ " not match the previous Mapper input key class");
|
|
|
+ }
|
|
|
+ if (! inputValueClass.isAssignableFrom(
|
|
|
+ previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
|
|
|
+ throw new IllegalArgumentException("The Mapper output value class" +
|
|
|
+ " does not match the previous Mapper input value class");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // if the Mapper does not have a private JobConf create an empty one
|
|
|
+ if (mapperConf == null) {
|
|
|
+ // using a JobConf without defaults to make it lightweight.
|
|
|
+ // still the chain JobConf may have all defaults and this conf is
|
|
|
+ // overlapped to the chain JobConf one.
|
|
|
+ mapperConf = new JobConf(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ // store in the private mapper conf the input/output classes of the mapper
|
|
|
+ // and if it works by value or by reference
|
|
|
+ mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
|
|
|
+ mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
|
|
|
+ mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
|
|
|
+ Object.class);
|
|
|
+ mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
|
|
|
+ mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
|
|
|
+ Object.class);
|
|
|
+
|
|
|
+ // serialize the private mapper jobconf in the chain jobconf.
|
|
|
+ Stringifier<WritableJobConf> stringifier =
|
|
|
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
|
|
|
+ try {
|
|
|
+ jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
|
|
|
+ stringifier.toString(new WritableJobConf(mapperConf)));
|
|
|
+ }
|
|
|
+ catch (IOException ioEx) {
|
|
|
+ throw new RuntimeException(ioEx);
|
|
|
+ }
|
|
|
+
|
|
|
+ // increment the chain counter
|
|
|
+ jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the Reducer class to the chain job's JobConf.
|
|
|
+ * <p/>
|
|
|
+ * The configuration properties of the chain job have precedence over the
|
|
|
+ * configuration properties of the Reducer.
|
|
|
+ *
|
|
|
+ * @param jobConf chain job's JobConf to add the Reducer class.
|
|
|
+ * @param klass the Reducer class to add.
|
|
|
+ * @param inputKeyClass reducer input key class.
|
|
|
+ * @param inputValueClass reducer input value class.
|
|
|
+ * @param outputKeyClass reducer output key class.
|
|
|
+ * @param outputValueClass reducer output value class.
|
|
|
+ * @param byValue indicates if key/values should be passed by value
|
|
|
+ * to the next Mapper in the chain, if any.
|
|
|
+ * @param reducerConf a JobConf with the configuration for the Reducer
|
|
|
+ * class. It is recommended to use a JobConf without default values using the
|
|
|
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
|
|
|
+ */
|
|
|
+ public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
|
|
|
+ Class<? extends Reducer<K1, V1, K2, V2>> klass,
|
|
|
+ Class<? extends K1> inputKeyClass,
|
|
|
+ Class<? extends V1> inputValueClass,
|
|
|
+ Class<? extends K2> outputKeyClass,
|
|
|
+ Class<? extends V2> outputValueClass,
|
|
|
+ boolean byValue, JobConf reducerConf) {
|
|
|
+ String prefix = getPrefix(false);
|
|
|
+
|
|
|
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
|
|
|
+ throw new IllegalStateException("Reducer has been already set");
|
|
|
+ }
|
|
|
+
|
|
|
+ jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
|
|
|
+
|
|
|
+ // if the Reducer does not have a private JobConf create an empty one
|
|
|
+ if (reducerConf == null) {
|
|
|
+ // using a JobConf without defaults to make it lightweight.
|
|
|
+ // still the chain JobConf may have all defaults and this conf is
|
|
|
+ // overlapped to the chain JobConf one.
|
|
|
+ reducerConf = new JobConf(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ // store in the private reducer conf the input/output classes of the reducer
|
|
|
+ // and if it works by value or by reference
|
|
|
+ reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
|
|
|
+ reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
|
|
|
+ reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
|
|
|
+ Object.class);
|
|
|
+ reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
|
|
|
+ Object.class);
|
|
|
+ reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
|
|
|
+ Object.class);
|
|
|
+
|
|
|
+ // serialize the private mapper jobconf in the chain jobconf.
|
|
|
+ Stringifier<WritableJobConf> stringifier =
|
|
|
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
|
|
|
+ try {
|
|
|
+ jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
|
|
|
+ stringifier.toString(new WritableJobConf(reducerConf)));
|
|
|
+ }
|
|
|
+ catch (IOException ioEx) {
|
|
|
+ throw new RuntimeException(ioEx);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configures all the chain elements for the task.
|
|
|
+ *
|
|
|
+ * @param jobConf chain job's JobConf.
|
|
|
+ */
|
|
|
+ public void configure(JobConf jobConf) {
|
|
|
+ String prefix = getPrefix(isMap);
|
|
|
+ chainJobConf = jobConf;
|
|
|
+ SerializationFactory serializationFactory =
|
|
|
+ new SerializationFactory(chainJobConf);
|
|
|
+ int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
|
|
|
+ for (int i = 0; i < index; i++) {
|
|
|
+ Class<? extends Mapper> klass =
|
|
|
+ jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
|
|
|
+ JobConf mConf =
|
|
|
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
|
|
|
+ Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
|
|
|
+ mappers.add(mapper);
|
|
|
+
|
|
|
+ if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
|
|
|
+ mappersKeySerialization.add(serializationFactory.getSerialization(
|
|
|
+ mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
|
|
|
+ mappersValueSerialization.add(serializationFactory.getSerialization(
|
|
|
+ mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
|
|
|
+ } else {
|
|
|
+ mappersKeySerialization.add(null);
|
|
|
+ mappersValueSerialization.add(null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Class<? extends Reducer> klass =
|
|
|
+ jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
|
|
|
+ if (klass != null) {
|
|
|
+ JobConf rConf =
|
|
|
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
|
|
|
+ reducer = ReflectionUtils.newInstance(klass, rConf);
|
|
|
+ if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
|
|
|
+ reducerKeySerialization = serializationFactory
|
|
|
+ .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
|
|
|
+ reducerValueSerialization = serializationFactory
|
|
|
+ .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
|
|
|
+ } else {
|
|
|
+ reducerKeySerialization = null;
|
|
|
+ reducerValueSerialization = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the chain job conf.
|
|
|
+ *
|
|
|
+ * @return the chain job conf.
|
|
|
+ */
|
|
|
+ protected JobConf getChainJobConf() {
|
|
|
+ return chainJobConf;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the first Mapper instance in the chain.
|
|
|
+ *
|
|
|
+ * @return the first Mapper instance in the chain or NULL if none.
|
|
|
+ */
|
|
|
+ public Mapper getFirstMap() {
|
|
|
+ return (mappers.size() > 0) ? mappers.get(0) : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the Reducer instance in the chain.
|
|
|
+ *
|
|
|
+ * @return the Reducer instance in the chain or NULL if none.
|
|
|
+ */
|
|
|
+ public Reducer getReducer() {
|
|
|
+ return reducer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the OutputCollector to be used by a Mapper instance in the chain.
|
|
|
+ *
|
|
|
+ * @param mapperIndex index of the Mapper instance to get the OutputCollector.
|
|
|
+ * @param output the original OutputCollector of the task.
|
|
|
+ * @param reporter the reporter of the task.
|
|
|
+ * @return the OutputCollector to be used in the chain.
|
|
|
+ */
|
|
|
+ @SuppressWarnings({"unchecked"})
|
|
|
+ public OutputCollector getMapperCollector(int mapperIndex,
|
|
|
+ OutputCollector output,
|
|
|
+ Reporter reporter) {
|
|
|
+ Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
|
|
|
+ Serialization valueSerialization =
|
|
|
+ mappersValueSerialization.get(mapperIndex);
|
|
|
+ return new ChainOutputCollector(mapperIndex, keySerialization,
|
|
|
+ valueSerialization, output, reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the OutputCollector to be used by a Mapper instance in the chain.
|
|
|
+ *
|
|
|
+ * @param output the original OutputCollector of the task.
|
|
|
+ * @param reporter the reporter of the task.
|
|
|
+ * @return the OutputCollector to be used in the chain.
|
|
|
+ */
|
|
|
+ @SuppressWarnings({"unchecked"})
|
|
|
+ public OutputCollector getReducerCollector(OutputCollector output,
|
|
|
+ Reporter reporter) {
|
|
|
+ return new ChainOutputCollector(reducerKeySerialization,
|
|
|
+ reducerValueSerialization, output,
|
|
|
+ reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Closes all the chain elements.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown if any of the chain elements threw an
|
|
|
+ * IOException exception.
|
|
|
+ */
|
|
|
+ public void close() throws IOException {
|
|
|
+ for (Mapper map : mappers) {
|
|
|
+ map.close();
|
|
|
+ }
|
|
|
+ if (reducer != null) {
|
|
|
+ reducer.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
|
|
|
+ // it has to be a thread local because if not it would break if used from a
|
|
|
+ // MultiThreadedMapRunner.
|
|
|
+ private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
|
|
|
+ new ThreadLocal<DataOutputBuffer>() {
|
|
|
+ protected DataOutputBuffer initialValue() {
|
|
|
+ return new DataOutputBuffer(1024);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * OutputCollector implementation used by the chain tasks.
|
|
|
+ * <p/>
|
|
|
+ * If it is not the end of the chain, a {@link #collect} invocation invokes
|
|
|
+ * the next Mapper in the chain. If it is the end of the chain the task
|
|
|
+ * OutputCollector is called.
|
|
|
+ */
|
|
|
+ private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
|
|
|
+ private int nextMapperIndex;
|
|
|
+ private Serialization<K> keySerialization;
|
|
|
+ private Serialization<V> valueSerialization;
|
|
|
+ private OutputCollector output;
|
|
|
+ private Reporter reporter;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Constructor for Mappers
|
|
|
+ */
|
|
|
+ public ChainOutputCollector(int index, Serialization<K> keySerialization,
|
|
|
+ Serialization<V> valueSerialization,
|
|
|
+ OutputCollector output, Reporter reporter) {
|
|
|
+ this.nextMapperIndex = index + 1;
|
|
|
+ this.keySerialization = keySerialization;
|
|
|
+ this.valueSerialization = valueSerialization;
|
|
|
+ this.output = output;
|
|
|
+ this.reporter = reporter;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Constructor for Reducer
|
|
|
+ */
|
|
|
+ public ChainOutputCollector(Serialization<K> keySerialization,
|
|
|
+ Serialization<V> valueSerialization,
|
|
|
+ OutputCollector output, Reporter reporter) {
|
|
|
+ this.nextMapperIndex = 0;
|
|
|
+ this.keySerialization = keySerialization;
|
|
|
+ this.valueSerialization = valueSerialization;
|
|
|
+ this.output = output;
|
|
|
+ this.reporter = reporter;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings({"unchecked"})
|
|
|
+ public void collect(K key, V value) throws IOException {
|
|
|
+ if (nextMapperIndex < mappers.size()) {
|
|
|
+ // there is a next mapper in chain
|
|
|
+
|
|
|
+ // only need to ser/deser if there is next mapper in the chain
|
|
|
+ if (keySerialization != null) {
|
|
|
+ key = makeCopyForPassByValue(keySerialization, key);
|
|
|
+ value = makeCopyForPassByValue(valueSerialization, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ // gets ser/deser and mapper of next in chain
|
|
|
+ Serialization nextKeySerialization =
|
|
|
+ mappersKeySerialization.get(nextMapperIndex);
|
|
|
+ Serialization nextValueSerialization =
|
|
|
+ mappersValueSerialization.get(nextMapperIndex);
|
|
|
+ Mapper nextMapper = mappers.get(nextMapperIndex);
|
|
|
+
|
|
|
+ // invokes next mapper in chain
|
|
|
+ nextMapper.map(key, value,
|
|
|
+ new ChainOutputCollector(nextMapperIndex,
|
|
|
+ nextKeySerialization,
|
|
|
+ nextValueSerialization,
|
|
|
+ output, reporter),
|
|
|
+ reporter);
|
|
|
+ } else {
|
|
|
+ // end of chain, user real output collector
|
|
|
+ output.collect(key, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private <E> E makeCopyForPassByValue(Serialization<E> serialization,
|
|
|
+ E obj) throws IOException {
|
|
|
+ Serializer<E> ser =
|
|
|
+ serialization.getSerializer(GenericsUtil.getClass(obj));
|
|
|
+ Deserializer<E> deser =
|
|
|
+ serialization.getDeserializer(GenericsUtil.getClass(obj));
|
|
|
+
|
|
|
+ DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
|
|
|
+
|
|
|
+ dof.reset();
|
|
|
+ ser.open(dof);
|
|
|
+ ser.serialize(obj);
|
|
|
+ ser.close();
|
|
|
+ obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
|
|
|
+ getChainJobConf());
|
|
|
+ ByteArrayInputStream bais =
|
|
|
+ new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
|
|
|
+ deser.open(bais);
|
|
|
+ deser.deserialize(obj);
|
|
|
+ deser.close();
|
|
|
+ return obj;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|