HadoopPipes.cc 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. #include "hadoop/Pipes.hh"
  2. #include "hadoop/SerialUtils.hh"
  3. #include "hadoop/StringUtils.hh"
  4. #include <map>
  5. #include <vector>
  6. #include <errno.h>
  7. #include <netinet/in.h>
  8. #include <stdint.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <strings.h>
  12. #include <sys/socket.h>
  13. using std::map;
  14. using std::string;
  15. using std::vector;
  16. using namespace HadoopUtils;
  17. namespace HadoopPipes {
  18. class JobConfImpl: public JobConf {
  19. private:
  20. map<string, string> values;
  21. public:
  22. void set(const string& key, const string& value) {
  23. values[key] = value;
  24. }
  25. virtual bool hasKey(const string& key) const {
  26. return values.find(key) != values.end();
  27. }
  28. virtual const string& get(const string& key) const {
  29. map<string,string>::const_iterator itr = values.find(key);
  30. if (itr == values.end()) {
  31. throw Error("Key " + key + " not found in JobConf");
  32. }
  33. return itr->second;
  34. }
  35. virtual int getInt(const string& key) const {
  36. const string& val = get(key);
  37. return toInt(val);
  38. }
  39. virtual float getFloat(const string& key) const {
  40. const string& val = get(key);
  41. return toFloat(val);
  42. }
  43. virtual bool getBoolean(const string&key) const {
  44. const string& val = get(key);
  45. return toBool(val);
  46. }
  47. };
  48. class DownwardProtocol {
  49. public:
  50. virtual void start(int protocol) = 0;
  51. virtual void setJobConf(vector<string> values) = 0;
  52. virtual void setInputTypes(string keyType, string valueType) = 0;
  53. virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0;
  54. virtual void mapItem(const string& key, const string& value) = 0;
  55. virtual void runReduce(int reduce, bool pipedOutput) = 0;
  56. virtual void reduceKey(const string& key) = 0;
  57. virtual void reduceValue(const string& value) = 0;
  58. virtual void close() = 0;
  59. virtual void abort() = 0;
  60. virtual ~DownwardProtocol() {}
  61. };
  62. class UpwardProtocol {
  63. public:
  64. virtual void output(const string& key, const string& value) = 0;
  65. virtual void partitionedOutput(int reduce, const string& key,
  66. const string& value) = 0;
  67. virtual void status(const string& message) = 0;
  68. virtual void progress(float progress) = 0;
  69. virtual void done() = 0;
  70. virtual ~UpwardProtocol() {}
  71. };
  72. class Protocol {
  73. public:
  74. virtual void nextEvent() = 0;
  75. virtual UpwardProtocol* getUplink() = 0;
  76. virtual ~Protocol() {}
  77. };
  78. class TextUpwardProtocol: public UpwardProtocol {
  79. private:
  80. FILE* stream;
  81. static const char fieldSeparator = '\t';
  82. static const char lineSeparator = '\n';
  83. void writeBuffer(const string& buffer) {
  84. fprintf(stream, quoteString(buffer, "\t\n").c_str());
  85. }
  86. public:
  87. TextUpwardProtocol(FILE* _stream): stream(_stream) {}
  88. virtual void output(const string& key, const string& value) {
  89. fprintf(stream, "output%c", fieldSeparator);
  90. writeBuffer(key);
  91. fprintf(stream, "%c", fieldSeparator);
  92. writeBuffer(value);
  93. fprintf(stream, "%c", lineSeparator);
  94. }
  95. virtual void partitionedOutput(int reduce, const string& key,
  96. const string& value) {
  97. fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce,
  98. fieldSeparator);
  99. writeBuffer(key);
  100. fprintf(stream, "%c", fieldSeparator);
  101. writeBuffer(value);
  102. fprintf(stream, "%c", lineSeparator);
  103. }
  104. virtual void status(const string& message) {
  105. fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(),
  106. lineSeparator);
  107. }
  108. virtual void progress(float progress) {
  109. fprintf(stream, "progress%c%f%c", fieldSeparator, progress,
  110. lineSeparator);
  111. }
  112. virtual void done() {
  113. fprintf(stream, "done%c", lineSeparator);
  114. }
  115. };
  116. class TextProtocol: public Protocol {
  117. private:
  118. FILE* downStream;
  119. DownwardProtocol* handler;
  120. UpwardProtocol* uplink;
  121. string key;
  122. string value;
  123. int readUpto(string& buffer, const char* limit) {
  124. int ch;
  125. buffer.clear();
  126. while ((ch = getc(downStream)) != -1) {
  127. if (strchr(limit, ch) != NULL) {
  128. return ch;
  129. }
  130. buffer += ch;
  131. }
  132. return -1;
  133. }
  134. static const char* delim;
  135. public:
  136. TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  137. downStream = down;
  138. uplink = new TextUpwardProtocol(up);
  139. handler = _handler;
  140. }
  141. UpwardProtocol* getUplink() {
  142. return uplink;
  143. }
  144. virtual void nextEvent() {
  145. string command;
  146. string arg;
  147. int sep;
  148. sep = readUpto(command, delim);
  149. if (command == "mapItem") {
  150. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  151. sep = readUpto(key, delim);
  152. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  153. sep = readUpto(value, delim);
  154. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  155. handler->mapItem(key, value);
  156. } else if (command == "reduceValue") {
  157. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  158. sep = readUpto(value, delim);
  159. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  160. handler->reduceValue(value);
  161. } else if (command == "reduceKey") {
  162. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  163. sep = readUpto(key, delim);
  164. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  165. handler->reduceKey(key);
  166. } else if (command == "start") {
  167. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  168. sep = readUpto(arg, delim);
  169. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  170. handler->start(toInt(arg));
  171. } else if (command == "setJobConf") {
  172. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  173. sep = readUpto(arg, delim);
  174. int len = toInt(arg);
  175. vector<string> values(len);
  176. for(int i=0; i < len; ++i) {
  177. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  178. sep = readUpto(arg, delim);
  179. values.push_back(arg);
  180. }
  181. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  182. handler->setJobConf(values);
  183. } else if (command == "setInputTypes") {
  184. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  185. sep = readUpto(key, delim);
  186. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  187. sep = readUpto(value, delim);
  188. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  189. handler->setInputTypes(key, value);
  190. } else if (command == "runMap") {
  191. string split;
  192. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  193. sep = readUpto(split, delim);
  194. string reduces;
  195. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  196. sep = readUpto(reduces, delim);
  197. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  198. sep = readUpto(arg, delim);
  199. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  200. handler->runMap(split, toInt(reduces), toBool(arg));
  201. } else if (command == "runReduce") {
  202. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  203. sep = readUpto(arg, delim);
  204. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  205. string piped;
  206. sep = readUpto(piped, delim);
  207. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  208. handler->runReduce(toInt(arg), toBool(piped));
  209. } else if (command == "abort") {
  210. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  211. handler->abort();
  212. } else if (command == "close") {
  213. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  214. handler->close();
  215. } else {
  216. throw Error("Illegal text protocol command " + command);
  217. }
  218. }
  219. ~TextProtocol() {
  220. delete uplink;
  221. }
  222. };
  223. const char* TextProtocol::delim = "\t\n";
  224. enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP,
  225. MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE,
  226. CLOSE, ABORT,
  227. OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE};
  228. class BinaryUpwardProtocol: public UpwardProtocol {
  229. private:
  230. FileOutStream* stream;
  231. public:
  232. BinaryUpwardProtocol(FILE* _stream) {
  233. stream = new FileOutStream();
  234. HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
  235. }
  236. virtual void output(const string& key, const string& value) {
  237. serializeInt(OUTPUT, *stream);
  238. serializeString(key, *stream);
  239. serializeString(value, *stream);
  240. }
  241. virtual void partitionedOutput(int reduce, const string& key,
  242. const string& value) {
  243. serializeInt(PARTITIONED_OUTPUT, *stream);
  244. serializeInt(reduce, *stream);
  245. serializeString(key, *stream);
  246. serializeString(value, *stream);
  247. }
  248. virtual void status(const string& message) {
  249. serializeInt(STATUS, *stream);
  250. serializeString(message, *stream);
  251. }
  252. virtual void progress(float progress) {
  253. serializeInt(PROGRESS, *stream);
  254. serializeFloat(progress, *stream);
  255. stream->flush();
  256. }
  257. virtual void done() {
  258. serializeInt(DONE, *stream);
  259. }
  260. ~BinaryUpwardProtocol() {
  261. delete stream;
  262. }
  263. };
  264. class BinaryProtocol: public Protocol {
  265. private:
  266. FileInStream* downStream;
  267. DownwardProtocol* handler;
  268. BinaryUpwardProtocol * uplink;
  269. string key;
  270. string value;
  271. public:
  272. BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  273. downStream = new FileInStream();
  274. downStream->open(down);
  275. uplink = new BinaryUpwardProtocol(up);
  276. handler = _handler;
  277. }
  278. UpwardProtocol* getUplink() {
  279. return uplink;
  280. }
  281. virtual void nextEvent() {
  282. int32_t cmd;
  283. cmd = deserializeInt(*downStream);
  284. switch (cmd) {
  285. case START_MESSAGE: {
  286. int32_t prot;
  287. prot = deserializeInt(*downStream);
  288. handler->start(prot);
  289. break;
  290. }
  291. case SET_JOB_CONF: {
  292. int32_t entries;
  293. entries = deserializeInt(*downStream);
  294. vector<string> result(entries);
  295. for(int i=0; i < entries; ++i) {
  296. string item;
  297. deserializeString(item, *downStream);
  298. result.push_back(item);
  299. }
  300. handler->setJobConf(result);
  301. break;
  302. }
  303. case SET_INPUT_TYPES: {
  304. string keyType;
  305. string valueType;
  306. deserializeString(keyType, *downStream);
  307. deserializeString(valueType, *downStream);
  308. handler->setInputTypes(keyType, valueType);
  309. break;
  310. }
  311. case RUN_MAP: {
  312. string split;
  313. int32_t numReduces;
  314. int32_t piped;
  315. deserializeString(split, *downStream);
  316. numReduces = deserializeInt(*downStream);
  317. piped = deserializeInt(*downStream);
  318. handler->runMap(split, numReduces, piped);
  319. break;
  320. }
  321. case MAP_ITEM: {
  322. deserializeString(key, *downStream);
  323. deserializeString(value, *downStream);
  324. handler->mapItem(key, value);
  325. break;
  326. }
  327. case RUN_REDUCE: {
  328. int32_t reduce;
  329. int32_t piped;
  330. reduce = deserializeInt(*downStream);
  331. piped = deserializeInt(*downStream);
  332. handler->runReduce(reduce, piped);
  333. break;
  334. }
  335. case REDUCE_KEY: {
  336. deserializeString(key, *downStream);
  337. handler->reduceKey(key);
  338. break;
  339. }
  340. case REDUCE_VALUE: {
  341. deserializeString(value, *downStream);
  342. handler->reduceValue(value);
  343. break;
  344. }
  345. case CLOSE:
  346. handler->close();
  347. break;
  348. case ABORT:
  349. handler->abort();
  350. break;
  351. default:
  352. HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
  353. }
  354. }
  355. virtual ~BinaryProtocol() {
  356. delete downStream;
  357. delete uplink;
  358. }
  359. };
  360. /**
  361. * Define a context object to give to combiners that will let them
  362. * go through the values and emit their results correctly.
  363. */
  364. class CombineContext: public ReduceContext {
  365. private:
  366. ReduceContext* baseContext;
  367. Partitioner* partitioner;
  368. int numReduces;
  369. UpwardProtocol* uplink;
  370. bool firstKey;
  371. bool firstValue;
  372. map<string, vector<string> >::iterator keyItr;
  373. map<string, vector<string> >::iterator endKeyItr;
  374. vector<string>::iterator valueItr;
  375. vector<string>::iterator endValueItr;
  376. public:
  377. CombineContext(ReduceContext* _baseContext,
  378. Partitioner* _partitioner,
  379. int _numReduces,
  380. UpwardProtocol* _uplink,
  381. map<string, vector<string> >& data) {
  382. baseContext = _baseContext;
  383. partitioner = _partitioner;
  384. numReduces = _numReduces;
  385. uplink = _uplink;
  386. keyItr = data.begin();
  387. endKeyItr = data.end();
  388. firstKey = true;
  389. firstValue = true;
  390. }
  391. virtual const JobConf* getJobConf() {
  392. return baseContext->getJobConf();
  393. }
  394. virtual const std::string& getInputKey() {
  395. return keyItr->first;
  396. }
  397. virtual const std::string& getInputValue() {
  398. return *valueItr;
  399. }
  400. virtual void emit(const std::string& key, const std::string& value) {
  401. if (partitioner != NULL) {
  402. uplink->partitionedOutput(partitioner->partition(key, numReduces),
  403. key, value);
  404. } else {
  405. uplink->output(key, value);
  406. }
  407. }
  408. virtual void progress() {
  409. baseContext->progress();
  410. }
  411. virtual void setStatus(const std::string& status) {
  412. baseContext->setStatus(status);
  413. }
  414. bool nextKey() {
  415. if (firstKey) {
  416. firstKey = false;
  417. } else {
  418. ++keyItr;
  419. }
  420. if (keyItr != endKeyItr) {
  421. valueItr = keyItr->second.begin();
  422. endValueItr = keyItr->second.end();
  423. firstValue = true;
  424. return true;
  425. }
  426. return false;
  427. }
  428. virtual bool nextValue() {
  429. if (firstValue) {
  430. firstValue = false;
  431. } else {
  432. ++valueItr;
  433. }
  434. return valueItr != endValueItr;
  435. }
  436. };
  437. /**
  438. * A RecordWriter that will take the map outputs, buffer them up and then
  439. * combine then when the buffer is full.
  440. */
  441. class CombineRunner: public RecordWriter {
  442. private:
  443. map<string, vector<string> > data;
  444. int64_t spillSize;
  445. int64_t numBytes;
  446. ReduceContext* baseContext;
  447. Partitioner* partitioner;
  448. int numReduces;
  449. UpwardProtocol* uplink;
  450. Reducer* combiner;
  451. public:
  452. CombineRunner(int64_t _spillSize, ReduceContext* _baseContext,
  453. Reducer* _combiner, UpwardProtocol* _uplink,
  454. Partitioner* _partitioner, int _numReduces) {
  455. numBytes = 0;
  456. spillSize = _spillSize;
  457. baseContext = _baseContext;
  458. partitioner = _partitioner;
  459. numReduces = _numReduces;
  460. uplink = _uplink;
  461. combiner = _combiner;
  462. }
  463. virtual void emit(const std::string& key,
  464. const std::string& value) {
  465. numBytes += key.length() + value.length();
  466. data[key].push_back(value);
  467. if (numBytes >= spillSize) {
  468. spillAll();
  469. }
  470. }
  471. virtual void close() {
  472. spillAll();
  473. }
  474. private:
  475. void spillAll() {
  476. CombineContext context(baseContext, partitioner, numReduces,
  477. uplink, data);
  478. while (context.nextKey()) {
  479. combiner->reduce(context);
  480. }
  481. data.clear();
  482. }
  483. };
  484. class TaskContextImpl: public MapContext, public ReduceContext,
  485. public DownwardProtocol {
  486. private:
  487. bool done;
  488. JobConf* jobConf;
  489. string key;
  490. const string* newKey;
  491. const string* value;
  492. bool hasTask;
  493. bool isNewKey;
  494. bool isNewValue;
  495. string* inputKeyClass;
  496. string* inputValueClass;
  497. string status;
  498. float progressFloat;
  499. uint64_t lastProgress;
  500. bool statusSet;
  501. Protocol* protocol;
  502. UpwardProtocol *uplink;
  503. string* inputSplit;
  504. RecordReader* reader;
  505. Mapper* mapper;
  506. Reducer* reducer;
  507. RecordWriter* writer;
  508. Partitioner* partitioner;
  509. int numReduces;
  510. const Factory* factory;
  511. public:
  512. TaskContextImpl(const Factory& _factory) {
  513. statusSet = false;
  514. done = false;
  515. newKey = NULL;
  516. factory = &_factory;
  517. jobConf = NULL;
  518. inputKeyClass = NULL;
  519. inputValueClass = NULL;
  520. inputSplit = NULL;
  521. mapper = NULL;
  522. reducer = NULL;
  523. reader = NULL;
  524. writer = NULL;
  525. partitioner = NULL;
  526. protocol = NULL;
  527. isNewKey = false;
  528. isNewValue = false;
  529. lastProgress = 0;
  530. progressFloat = 0.0f;
  531. hasTask = false;
  532. }
  533. void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
  534. protocol = _protocol;
  535. uplink = _uplink;
  536. }
  537. virtual void start(int protocol) {
  538. if (protocol != 0) {
  539. throw Error("Protocol version " + toString(protocol) +
  540. " not supported");
  541. }
  542. }
  543. virtual void setJobConf(vector<string> values) {
  544. int len = values.size();
  545. JobConfImpl* result = new JobConfImpl();
  546. HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
  547. for(int i=0; i < len; i += 2) {
  548. result->set(values[i], values[i+1]);
  549. }
  550. jobConf = result;
  551. }
  552. virtual void setInputTypes(string keyType, string valueType) {
  553. inputKeyClass = new string(keyType);
  554. inputValueClass = new string(valueType);
  555. }
  556. virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) {
  557. inputSplit = new string(_inputSplit);
  558. reader = factory->createRecordReader(*this);
  559. HADOOP_ASSERT((reader == NULL) == pipedInput,
  560. pipedInput ? "RecordReader defined when not needed.":
  561. "RecordReader not defined");
  562. if (reader != NULL) {
  563. value = new string();
  564. }
  565. mapper = factory->createMapper(*this);
  566. numReduces = _numReduces;
  567. if (numReduces != 0) {
  568. reducer = factory->createCombiner(*this);
  569. partitioner = factory->createPartitioner(*this);
  570. }
  571. if (reducer != NULL) {
  572. int64_t spillSize = 100;
  573. if (jobConf->hasKey("io.sort.mb")) {
  574. spillSize = jobConf->getInt("io.sort.mb");
  575. }
  576. writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer,
  577. uplink, partitioner, numReduces);
  578. }
  579. hasTask = true;
  580. }
  581. virtual void mapItem(const string& _key, const string& _value) {
  582. newKey = &_key;
  583. value = &_value;
  584. isNewKey = true;
  585. }
  586. virtual void runReduce(int reduce, bool pipedOutput) {
  587. reducer = factory->createReducer(*this);
  588. writer = factory->createRecordWriter(*this);
  589. HADOOP_ASSERT((writer == NULL) == pipedOutput,
  590. pipedOutput ? "RecordWriter defined when not needed.":
  591. "RecordWriter not defined");
  592. hasTask = true;
  593. }
  594. virtual void reduceKey(const string& _key) {
  595. isNewKey = true;
  596. newKey = &_key;
  597. }
  598. virtual void reduceValue(const string& _value) {
  599. isNewValue = true;
  600. value = &_value;
  601. }
  602. virtual bool isDone() {
  603. return done;
  604. }
  605. virtual void close() {
  606. done = true;
  607. }
  608. virtual void abort() {
  609. throw Error("Aborted by driver");
  610. }
  611. void waitForTask() {
  612. while (!done && !hasTask) {
  613. protocol->nextEvent();
  614. }
  615. }
  616. bool nextKey() {
  617. if (reader == NULL) {
  618. while (!isNewKey) {
  619. nextValue();
  620. if (done) {
  621. return false;
  622. }
  623. }
  624. key = *newKey;
  625. } else {
  626. if (!reader->next(key, const_cast<string&>(*value))) {
  627. done = true;
  628. return false;
  629. }
  630. progressFloat = reader->getProgress();
  631. }
  632. isNewKey = false;
  633. if (mapper != NULL) {
  634. mapper->map(*this);
  635. } else {
  636. reducer->reduce(*this);
  637. }
  638. return true;
  639. }
  640. /**
  641. * Advance to the next value.
  642. */
  643. virtual bool nextValue() {
  644. if (isNewKey || done) {
  645. return false;
  646. }
  647. isNewValue = false;
  648. progress();
  649. protocol->nextEvent();
  650. return isNewValue;
  651. }
  652. /**
  653. * Get the JobConf for the current task.
  654. */
  655. virtual JobConf* getJobConf() {
  656. return jobConf;
  657. }
  658. /**
  659. * Get the current key.
  660. * @return the current key or NULL if called before the first map or reduce
  661. */
  662. virtual const string& getInputKey() {
  663. return key;
  664. }
  665. /**
  666. * Get the current value.
  667. * @return the current value or NULL if called before the first map or
  668. * reduce
  669. */
  670. virtual const string& getInputValue() {
  671. return *value;
  672. }
  673. /**
  674. * Mark your task as having made progress without changing the status
  675. * message.
  676. */
  677. virtual void progress() {
  678. if (uplink != 0) {
  679. uint64_t now = getCurrentMillis();
  680. if (now - lastProgress > 1000) {
  681. lastProgress = now;
  682. if (statusSet) {
  683. uplink->status(status);
  684. statusSet = false;
  685. }
  686. uplink->progress(progressFloat);
  687. }
  688. }
  689. }
  690. /**
  691. * Set the status message and call progress.
  692. */
  693. virtual void setStatus(const string& status) {
  694. this->status = status;
  695. statusSet = true;
  696. progress();
  697. }
  698. /**
  699. * Get the name of the key class of the input to this task.
  700. */
  701. virtual const string& getInputKeyClass() {
  702. return *inputKeyClass;
  703. }
  704. /**
  705. * Get the name of the value class of the input to this task.
  706. */
  707. virtual const string& getInputValueClass() {
  708. return *inputValueClass;
  709. }
  710. /**
  711. * Access the InputSplit of the mapper.
  712. */
  713. virtual const std::string& getInputSplit() {
  714. return *inputSplit;
  715. }
  716. virtual void emit(const string& key, const string& value) {
  717. progress();
  718. if (writer != NULL) {
  719. writer->emit(key, value);
  720. } else if (partitioner != NULL) {
  721. int part = partitioner->partition(key, numReduces);
  722. uplink->partitionedOutput(part, key, value);
  723. } else {
  724. uplink->output(key, value);
  725. }
  726. }
  727. void closeAll() {
  728. if (reader) {
  729. reader->close();
  730. }
  731. if (mapper) {
  732. mapper->close();
  733. }
  734. if (reducer) {
  735. reducer->close();
  736. }
  737. if (writer) {
  738. writer->close();
  739. }
  740. }
  741. virtual ~TaskContextImpl() {
  742. delete jobConf;
  743. delete inputKeyClass;
  744. delete inputValueClass;
  745. delete inputSplit;
  746. if (reader) {
  747. delete value;
  748. }
  749. delete reader;
  750. delete mapper;
  751. delete reducer;
  752. delete writer;
  753. delete partitioner;
  754. }
  755. };
  756. /**
  757. * Run the assigned task in the framework.
  758. * The user's main function should set the various functions using the
  759. * set* functions above and then call this.
  760. * @return true, if the task succeeded.
  761. */
  762. bool runTask(const Factory& factory) {
  763. try {
  764. TaskContextImpl* context = new TaskContextImpl(factory);
  765. Protocol* connection;
  766. char* portStr = getenv("hadoop.pipes.command.port");
  767. int sock = -1;
  768. FILE* stream = NULL;
  769. FILE* outStream = NULL;
  770. char *bufin = NULL;
  771. char *bufout = NULL;
  772. if (portStr) {
  773. sock = socket(PF_INET, SOCK_STREAM, 0);
  774. HADOOP_ASSERT(sock != - 1,
  775. string("problem creating socket: ") + strerror(errno));
  776. sockaddr_in addr;
  777. addr.sin_family = AF_INET;
  778. addr.sin_port = htons(toInt(portStr));
  779. addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  780. HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
  781. string("problem connecting command socket: ") +
  782. strerror(errno));
  783. stream = fdopen(sock, "r");
  784. outStream = fdopen(sock, "w");
  785. // increase buffer size
  786. int bufsize = 128*1024;
  787. int setbuf;
  788. bufin = new char[bufsize];
  789. bufout = new char[bufsize];
  790. setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
  791. HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
  792. + strerror(errno));
  793. setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
  794. HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
  795. + strerror(errno));
  796. connection = new BinaryProtocol(stream, context, outStream);
  797. } else if (getenv("hadoop.pipes.command.file")) {
  798. char* filename = getenv("hadoop.pipes.command.file");
  799. string outFilename = filename;
  800. outFilename += ".out";
  801. stream = fopen(filename, "r");
  802. outStream = fopen(outFilename.c_str(), "w");
  803. connection = new BinaryProtocol(stream, context, outStream);
  804. } else {
  805. connection = new TextProtocol(stdin, context, stdout);
  806. }
  807. context->setProtocol(connection, connection->getUplink());
  808. context->waitForTask();
  809. while (!context->isDone()) {
  810. context->nextKey();
  811. }
  812. context->closeAll();
  813. connection->getUplink()->done();
  814. delete context;
  815. delete connection;
  816. if (stream != NULL) {
  817. fflush(stream);
  818. }
  819. if (outStream != NULL) {
  820. fflush(outStream);
  821. }
  822. fflush(stdout);
  823. if (sock != -1) {
  824. int result = shutdown(sock, SHUT_RDWR);
  825. HADOOP_ASSERT(result == 0, "problem shutting socket");
  826. result = close(sock);
  827. HADOOP_ASSERT(result == 0, "problem closing socket");
  828. }
  829. if (stream != NULL) {
  830. //fclose(stream);
  831. }
  832. if (outStream != NULL) {
  833. //fclose(outStream);
  834. }
  835. delete bufin;
  836. delete bufout;
  837. return true;
  838. } catch (Error& err) {
  839. fprintf(stderr, "Hadoop Pipes Exception: %s\n",
  840. err.getMessage().c_str());
  841. return false;
  842. }
  843. }
  844. }