|
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.io.RawComparator;
|
|
import org.apache.hadoop.io.RawComparator;
|
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
|
import org.apache.hadoop.mapred.RawKeyValueIterator;
|
|
|
|
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Reduces a set of intermediate values which share a key to a smaller set of
|
|
* Reduces a set of intermediate values which share a key to a smaller set of
|
|
* values.
|
|
* values.
|
|
@@ -162,14 +164,15 @@ public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
|
|
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
|
|
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
|
|
* control how the reduce task works.
|
|
* control how the reduce task works.
|
|
*/
|
|
*/
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
public void run(Context context) throws IOException, InterruptedException {
|
|
public void run(Context context) throws IOException, InterruptedException {
|
|
setup(context);
|
|
setup(context);
|
|
while (context.nextKey()) {
|
|
while (context.nextKey()) {
|
|
reduce(context.getCurrentKey(), context.getValues(), context);
|
|
reduce(context.getCurrentKey(), context.getValues(), context);
|
|
// If a back up store is used, reset it
|
|
// If a back up store is used, reset it
|
|
- ((ReduceContext.ValueIterator)
|
|
|
|
- (context.getValues().iterator())).resetBackupStore();
|
|
|
|
|
|
+ Iterator<VALUEIN> iter = context.getValues().iterator();
|
|
|
|
+ if(iter instanceof ReduceContext.ValueIterator) {
|
|
|
|
+ ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
cleanup(context);
|
|
cleanup(context);
|
|
}
|
|
}
|