|
@@ -31,10 +31,13 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.filecache.DistributedCache;
|
|
import org.apache.hadoop.filecache.DistributedCache;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
|
+import org.apache.hadoop.io.FloatWritable;
|
|
|
|
+import org.apache.hadoop.io.NullWritable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
|
+import org.apache.hadoop.mapred.RecordReader;
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
import org.apache.hadoop.mapred.TaskAttemptID;
|
|
import org.apache.hadoop.mapred.TaskAttemptID;
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
@@ -59,6 +62,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
|
/**
|
|
/**
|
|
* Start the child process to handle the task for us.
|
|
* Start the child process to handle the task for us.
|
|
* @param conf the task's configuration
|
|
* @param conf the task's configuration
|
|
|
|
+ * @param recordReader the fake record reader to update progress with
|
|
* @param output the collector to send output to
|
|
* @param output the collector to send output to
|
|
* @param reporter the reporter for the task
|
|
* @param reporter the reporter for the task
|
|
* @param outputKeyClass the class of the output keys
|
|
* @param outputKeyClass the class of the output keys
|
|
@@ -66,7 +70,9 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* @throws InterruptedException
|
|
* @throws InterruptedException
|
|
*/
|
|
*/
|
|
- Application(JobConf conf, OutputCollector<K2, V2> output, Reporter reporter,
|
|
|
|
|
|
+ Application(JobConf conf,
|
|
|
|
+ RecordReader<FloatWritable, NullWritable> recordReader,
|
|
|
|
+ OutputCollector<K2,V2> output, Reporter reporter,
|
|
Class<? extends K2> outputKeyClass,
|
|
Class<? extends K2> outputKeyClass,
|
|
Class<? extends V2> outputValueClass
|
|
Class<? extends V2> outputValueClass
|
|
) throws IOException, InterruptedException {
|
|
) throws IOException, InterruptedException {
|
|
@@ -89,7 +95,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
|
|
|
|
|
process = runClient(cmd, env);
|
|
process = runClient(cmd, env);
|
|
clientSocket = serverSocket.accept();
|
|
clientSocket = serverSocket.accept();
|
|
- handler = new OutputHandler<K2, V2>(output, reporter);
|
|
|
|
|
|
+ handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
|
|
K2 outputKey = (K2)
|
|
K2 outputKey = (K2)
|
|
ReflectionUtils.newInstance(outputKeyClass, conf);
|
|
ReflectionUtils.newInstance(outputKeyClass, conf);
|
|
V2 outputValue = (V2)
|
|
V2 outputValue = (V2)
|