wordcount-part.cc 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/TemplateFactory.hh"
  20. #include "hadoop/StringUtils.hh"
  21. const std::string WORDCOUNT = "WORDCOUNT";
  22. const std::string INPUT_WORDS = "INPUT_WORDS";
  23. const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
  24. class WordCountMap: public HadoopPipes::Mapper {
  25. public:
  26. HadoopPipes::TaskContext::Counter* inputWords;
  27. WordCountMap(HadoopPipes::TaskContext& context) {
  28. inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
  29. }
  30. void map(HadoopPipes::MapContext& context) {
  31. std::vector<std::string> words =
  32. HadoopUtils::splitString(context.getInputValue(), " ");
  33. for(unsigned int i=0; i < words.size(); ++i) {
  34. context.emit(words[i], "1");
  35. }
  36. context.incrementCounter(inputWords, words.size());
  37. }
  38. };
  39. class WordCountReduce: public HadoopPipes::Reducer {
  40. public:
  41. HadoopPipes::TaskContext::Counter* outputWords;
  42. WordCountReduce(HadoopPipes::TaskContext& context) {
  43. outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
  44. }
  45. void reduce(HadoopPipes::ReduceContext& context) {
  46. int sum = 0;
  47. while (context.nextValue()) {
  48. sum += HadoopUtils::toInt(context.getInputValue());
  49. }
  50. context.emit(context.getInputKey(), HadoopUtils::toString(sum));
  51. context.incrementCounter(outputWords, 1);
  52. }
  53. };
  54. class WordCountPartitioner: public HadoopPipes::Partitioner {
  55. public:
  56. WordCountPartitioner(HadoopPipes::TaskContext& context){}
  57. virtual int partition(const std::string& key, int numOfReduces) {
  58. return 0;
  59. }
  60. };
  61. int main(int argc, char *argv[]) {
  62. return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
  63. WordCountReduce,WordCountPartitioner,
  64. WordCountReduce>());
  65. }