|
@@ -18,6 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
@@ -69,8 +73,6 @@ public class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
|
|
|
private static final String jobDir = "localRunner/";
|
|
|
|
|
|
- private static final Counters EMPTY_COUNTERS = new Counters();
|
|
|
-
|
|
|
public long getProtocolVersion(String protocol, long clientVersion) {
|
|
|
return JobSubmissionProtocol.versionID;
|
|
|
}
|
|
@@ -263,10 +265,10 @@ public class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
this.partialMapProgress = new float[numMaps];
|
|
|
this.mapCounters = new Counters[numMaps];
|
|
|
for (int i = 0; i < numMaps; i++) {
|
|
|
- this.mapCounters[i] = EMPTY_COUNTERS;
|
|
|
+ this.mapCounters[i] = new Counters();
|
|
|
}
|
|
|
|
|
|
- this.reduceCounters = EMPTY_COUNTERS;
|
|
|
+ this.reduceCounters = new Counters();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -453,6 +455,14 @@ public class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
|
|
|
public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,
|
|
|
JvmContext context) throws IOException, InterruptedException {
|
|
|
+ // Serialize as we would if distributed in order to make deep copy
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream dos = new DataOutputStream(baos);
|
|
|
+ TaskStatus.writeTaskStatus(dos, taskStatus);
|
|
|
+ dos.close();
|
|
|
+ taskStatus = TaskStatus.readTaskStatus(new DataInputStream(
|
|
|
+ new ByteArrayInputStream(baos.toByteArray())));
|
|
|
+
|
|
|
LOG.info(taskStatus.getStateString());
|
|
|
int taskIndex = mapIds.indexOf(taskId);
|
|
|
if (taskIndex >= 0) { // mapping
|
|
@@ -477,13 +487,13 @@ public class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
/** Return the current values of the counters for this job,
|
|
|
* including tasks that are in progress.
|
|
|
*/
|
|
|
- public synchronized Counters getCurrentCounters() {
|
|
|
+ public synchronized Counters GetCurrentCounters() {
|
|
|
if (null == mapCounters) {
|
|
|
// Counters not yet initialized for job.
|
|
|
- return EMPTY_COUNTERS;
|
|
|
+ return new Counters();
|
|
|
}
|
|
|
|
|
|
- Counters current = EMPTY_COUNTERS;
|
|
|
+ Counters current = new Counters();
|
|
|
for (Counters c : mapCounters) {
|
|
|
current = Counters.sum(current, c);
|
|
|
}
|