|
@@ -21,7 +21,6 @@ package org.apache.hadoop.streaming;
|
|
import java.io.*;
|
|
import java.io.*;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
-import java.nio.channels.*;
|
|
|
|
import java.nio.charset.CharacterCodingException;
|
|
import java.nio.charset.CharacterCodingException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.Date;
|
|
import java.util.Date;
|
|
@@ -34,6 +33,7 @@ import java.util.regex.*;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.mapred.FileSplit;
|
|
import org.apache.hadoop.mapred.FileSplit;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.PhasedFileSystem;
|
|
import org.apache.hadoop.mapred.PhasedFileSystem;
|
|
@@ -45,10 +45,8 @@ import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
|
|
|
|
-import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
-import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
|
|
|
|
|
/** Shared functionality for PipeMapper, PipeReducer.
|
|
/** Shared functionality for PipeMapper, PipeReducer.
|
|
* @author Michel Tourn
|
|
* @author Michel Tourn
|
|
@@ -229,13 +227,12 @@ public abstract class PipeMapRed {
|
|
}
|
|
}
|
|
String[] argvSplit = splitArgs(argv);
|
|
String[] argvSplit = splitArgs(argv);
|
|
String prog = argvSplit[0];
|
|
String prog = argvSplit[0];
|
|
- String userdir = System.getProperty("user.dir");
|
|
|
|
File currentDir = new File(".").getAbsoluteFile();
|
|
File currentDir = new File(".").getAbsoluteFile();
|
|
File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
|
|
File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
|
|
if (new File(prog).isAbsolute()) {
|
|
if (new File(prog).isAbsolute()) {
|
|
// we don't own it. Hope it is executable
|
|
// we don't own it. Hope it is executable
|
|
} else {
|
|
} else {
|
|
- new MustangFile(new File(jobCacheDir, prog).toString()).setExecutable(true, true);
|
|
|
|
|
|
+ FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
|
|
}
|
|
}
|
|
|
|
|
|
if (job_.getInputValueClass().equals(BytesWritable.class)) {
|
|
if (job_.getInputValueClass().equals(BytesWritable.class)) {
|
|
@@ -628,7 +625,6 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
String numRecInfo() {
|
|
String numRecInfo() {
|
|
long elapsed = (System.currentTimeMillis() - startTime_) / 1000;
|
|
long elapsed = (System.currentTimeMillis() - startTime_) / 1000;
|
|
- long total = numRecRead_ + numRecWritten_ + numRecSkipped_;
|
|
|
|
return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:"
|
|
return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:"
|
|
+ safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed)
|
|
+ safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed)
|
|
+ " [rec/s]";
|
|
+ " [rec/s]";
|