wordcount-nopipe.cc 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/TemplateFactory.hh"
  20. #include "hadoop/StringUtils.hh"
  21. #include "hadoop/SerialUtils.hh"
  22. #include <stdio.h>
  23. #include <sys/types.h>
  24. #include <sys/stat.h>
  25. const std::string WORDCOUNT = "WORDCOUNT";
  26. const std::string INPUT_WORDS = "INPUT_WORDS";
  27. const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
  28. class WordCountMap: public HadoopPipes::Mapper {
  29. public:
  30. HadoopPipes::TaskContext::Counter* inputWords;
  31. WordCountMap(HadoopPipes::TaskContext& context) {
  32. inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
  33. }
  34. void map(HadoopPipes::MapContext& context) {
  35. std::vector<std::string> words =
  36. HadoopUtils::splitString(context.getInputValue(), " ");
  37. for(unsigned int i=0; i < words.size(); ++i) {
  38. context.emit(words[i], "1");
  39. }
  40. context.incrementCounter(inputWords, words.size());
  41. }
  42. };
  43. class WordCountReduce: public HadoopPipes::Reducer {
  44. public:
  45. HadoopPipes::TaskContext::Counter* outputWords;
  46. WordCountReduce(HadoopPipes::TaskContext& context) {
  47. outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
  48. }
  49. void reduce(HadoopPipes::ReduceContext& context) {
  50. int sum = 0;
  51. while (context.nextValue()) {
  52. sum += HadoopUtils::toInt(context.getInputValue());
  53. }
  54. context.emit(context.getInputKey(), HadoopUtils::toString(sum));
  55. context.incrementCounter(outputWords, 1);
  56. }
  57. };
  58. class WordCountReader: public HadoopPipes::RecordReader {
  59. private:
  60. int64_t bytesTotal;
  61. int64_t bytesRead;
  62. FILE* file;
  63. public:
  64. WordCountReader(HadoopPipes::MapContext& context) {
  65. std::string filename;
  66. HadoopUtils::StringInStream stream(context.getInputSplit());
  67. HadoopUtils::deserializeString(filename, stream);
  68. struct stat statResult;
  69. stat(filename.c_str(), &statResult);
  70. bytesTotal = statResult.st_size;
  71. bytesRead = 0;
  72. file = fopen(filename.c_str(), "rt");
  73. HADOOP_ASSERT(file != NULL, "failed to open " + filename);
  74. }
  75. ~WordCountReader() {
  76. fclose(file);
  77. }
  78. virtual bool next(std::string& key, std::string& value) {
  79. key = HadoopUtils::toString(ftell(file));
  80. int ch = getc(file);
  81. bytesRead += 1;
  82. value.clear();
  83. while (ch != -1 && ch != '\n') {
  84. value += ch;
  85. ch = getc(file);
  86. bytesRead += 1;
  87. }
  88. return ch != -1;
  89. }
  90. /**
  91. * The progress of the record reader through the split as a value between
  92. * 0.0 and 1.0.
  93. */
  94. virtual float getProgress() {
  95. if (bytesTotal > 0) {
  96. return (float)bytesRead / bytesTotal;
  97. } else {
  98. return 1.0f;
  99. }
  100. }
  101. };
  102. class WordCountWriter: public HadoopPipes::RecordWriter {
  103. private:
  104. FILE* file;
  105. public:
  106. WordCountWriter(HadoopPipes::ReduceContext& context) {
  107. const HadoopPipes::JobConf* job = context.getJobConf();
  108. int part = job->getInt("mapreduce.task.partition");
  109. std::string outDir = job->get("mapreduce.task.output.dir");
  110. // remove the file: schema substring
  111. std::string::size_type posn = outDir.find(":");
  112. HADOOP_ASSERT(posn != std::string::npos,
  113. "no schema found in output dir: " + outDir);
  114. outDir.erase(0, posn+1);
  115. mkdir(outDir.c_str(), 0777);
  116. std::string outFile = outDir + "/part-" + HadoopUtils::toString(part);
  117. file = fopen(outFile.c_str(), "wt");
  118. HADOOP_ASSERT(file != NULL, "can't open file for writing: " + outFile);
  119. }
  120. ~WordCountWriter() {
  121. fclose(file);
  122. }
  123. void emit(const std::string& key, const std::string& value) {
  124. fprintf(file, "%s -> %s\n", key.c_str(), value.c_str());
  125. }
  126. };
  127. int main(int argc, char *argv[]) {
  128. return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
  129. WordCountReduce, void, void, WordCountReader,
  130. WordCountWriter>());
  131. }