|
@@ -87,9 +87,15 @@ public:
|
|
const HadoopPipes::JobConf* job = context.getJobConf();
|
|
const HadoopPipes::JobConf* job = context.getJobConf();
|
|
int part = job->getInt("mapred.task.partition");
|
|
int part = job->getInt("mapred.task.partition");
|
|
std::string outDir = job->get("mapred.output.dir");
|
|
std::string outDir = job->get("mapred.output.dir");
|
|
|
|
+ // remove the file: schema substring
|
|
|
|
+ std::string::size_type posn = outDir.find(":");
|
|
|
|
+ HADOOP_ASSERT(posn != std::string::npos,
|
|
|
|
+ "no schema found in output dir: " + outDir);
|
|
|
|
+ outDir.erase(0, posn+1);
|
|
mkdir(outDir.c_str(), 0777);
|
|
mkdir(outDir.c_str(), 0777);
|
|
std::string outFile = outDir + "/part-" + HadoopUtils::toString(part);
|
|
std::string outFile = outDir + "/part-" + HadoopUtils::toString(part);
|
|
file = fopen(outFile.c_str(), "wt");
|
|
file = fopen(outFile.c_str(), "wt");
|
|
|
|
+ HADOOP_ASSERT(file != NULL, "can't open file for writing: " + outFile);
|
|
}
|
|
}
|
|
|
|
|
|
~WordCountWriter() {
|
|
~WordCountWriter() {
|