HadoopPipes.cc 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "hadoop/Pipes.hh"
  19. #include "hadoop/SerialUtils.hh"
  20. #include "hadoop/StringUtils.hh"
  21. #include <map>
  22. #include <vector>
  23. #include <errno.h>
  24. #include <netinet/in.h>
  25. #include <stdint.h>
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <string.h>
  29. #include <strings.h>
  30. #include <unistd.h>
  31. #include <sys/socket.h>
  32. #include <pthread.h>
  33. #include <iostream>
  34. #include <fstream>
  35. #include <openssl/hmac.h>
  36. #include <openssl/buffer.h>
  37. using std::map;
  38. using std::string;
  39. using std::vector;
  40. using namespace HadoopUtils;
  41. namespace HadoopPipes {
  42. class JobConfImpl: public JobConf {
  43. private:
  44. map<string, string> values;
  45. public:
  46. void set(const string& key, const string& value) {
  47. values[key] = value;
  48. }
  49. virtual bool hasKey(const string& key) const {
  50. return values.find(key) != values.end();
  51. }
  52. virtual const string& get(const string& key) const {
  53. map<string,string>::const_iterator itr = values.find(key);
  54. if (itr == values.end()) {
  55. throw Error("Key " + key + " not found in JobConf");
  56. }
  57. return itr->second;
  58. }
  59. virtual int getInt(const string& key) const {
  60. const string& val = get(key);
  61. return toInt(val);
  62. }
  63. virtual float getFloat(const string& key) const {
  64. const string& val = get(key);
  65. return toFloat(val);
  66. }
  67. virtual bool getBoolean(const string&key) const {
  68. const string& val = get(key);
  69. return toBool(val);
  70. }
  71. };
  72. class DownwardProtocol {
  73. public:
  74. virtual void start(int protocol) = 0;
  75. virtual void setJobConf(vector<string> values) = 0;
  76. virtual void setInputTypes(string keyType, string valueType) = 0;
  77. virtual void runMap(string inputSplit, int numReduces, bool pipedInput)= 0;
  78. virtual void mapItem(const string& key, const string& value) = 0;
  79. virtual void runReduce(int reduce, bool pipedOutput) = 0;
  80. virtual void reduceKey(const string& key) = 0;
  81. virtual void reduceValue(const string& value) = 0;
  82. virtual void close() = 0;
  83. virtual void abort() = 0;
  84. virtual ~DownwardProtocol() {}
  85. };
  86. class UpwardProtocol {
  87. public:
  88. virtual void output(const string& key, const string& value) = 0;
  89. virtual void partitionedOutput(int reduce, const string& key,
  90. const string& value) = 0;
  91. virtual void status(const string& message) = 0;
  92. virtual void progress(float progress) = 0;
  93. virtual void done() = 0;
  94. virtual void registerCounter(int id, const string& group,
  95. const string& name) = 0;
  96. virtual void
  97. incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
  98. virtual ~UpwardProtocol() {}
  99. };
  100. class Protocol {
  101. public:
  102. virtual void nextEvent() = 0;
  103. virtual UpwardProtocol* getUplink() = 0;
  104. virtual ~Protocol() {}
  105. };
  106. class TextUpwardProtocol: public UpwardProtocol {
  107. private:
  108. FILE* stream;
  109. static const char fieldSeparator = '\t';
  110. static const char lineSeparator = '\n';
  111. void writeBuffer(const string& buffer) {
  112. fprintf(stream, quoteString(buffer, "\t\n").c_str());
  113. }
  114. public:
  115. TextUpwardProtocol(FILE* _stream): stream(_stream) {}
  116. virtual void output(const string& key, const string& value) {
  117. fprintf(stream, "output%c", fieldSeparator);
  118. writeBuffer(key);
  119. fprintf(stream, "%c", fieldSeparator);
  120. writeBuffer(value);
  121. fprintf(stream, "%c", lineSeparator);
  122. }
  123. virtual void partitionedOutput(int reduce, const string& key,
  124. const string& value) {
  125. fprintf(stream, "parititionedOutput%c%d%c", fieldSeparator, reduce,
  126. fieldSeparator);
  127. writeBuffer(key);
  128. fprintf(stream, "%c", fieldSeparator);
  129. writeBuffer(value);
  130. fprintf(stream, "%c", lineSeparator);
  131. }
  132. virtual void status(const string& message) {
  133. fprintf(stream, "status%c%s%c", fieldSeparator, message.c_str(),
  134. lineSeparator);
  135. }
  136. virtual void progress(float progress) {
  137. fprintf(stream, "progress%c%f%c", fieldSeparator, progress,
  138. lineSeparator);
  139. }
  140. virtual void registerCounter(int id, const string& group,
  141. const string& name) {
  142. fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
  143. fieldSeparator, group.c_str(), fieldSeparator, name.c_str(),
  144. lineSeparator);
  145. }
  146. virtual void incrementCounter(const TaskContext::Counter* counter,
  147. uint64_t amount) {
  148. fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(),
  149. fieldSeparator, (long)amount, lineSeparator);
  150. }
  151. virtual void done() {
  152. fprintf(stream, "done%c", lineSeparator);
  153. }
  154. };
  155. class TextProtocol: public Protocol {
  156. private:
  157. FILE* downStream;
  158. DownwardProtocol* handler;
  159. UpwardProtocol* uplink;
  160. string key;
  161. string value;
  162. int readUpto(string& buffer, const char* limit) {
  163. int ch;
  164. buffer.clear();
  165. while ((ch = getc(downStream)) != -1) {
  166. if (strchr(limit, ch) != NULL) {
  167. return ch;
  168. }
  169. buffer += ch;
  170. }
  171. return -1;
  172. }
  173. static const char* delim;
  174. public:
  175. TextProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  176. downStream = down;
  177. uplink = new TextUpwardProtocol(up);
  178. handler = _handler;
  179. }
  180. UpwardProtocol* getUplink() {
  181. return uplink;
  182. }
  183. virtual void nextEvent() {
  184. string command;
  185. string arg;
  186. int sep;
  187. sep = readUpto(command, delim);
  188. if (command == "mapItem") {
  189. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  190. sep = readUpto(key, delim);
  191. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  192. sep = readUpto(value, delim);
  193. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  194. handler->mapItem(key, value);
  195. } else if (command == "reduceValue") {
  196. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  197. sep = readUpto(value, delim);
  198. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  199. handler->reduceValue(value);
  200. } else if (command == "reduceKey") {
  201. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  202. sep = readUpto(key, delim);
  203. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  204. handler->reduceKey(key);
  205. } else if (command == "start") {
  206. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  207. sep = readUpto(arg, delim);
  208. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  209. handler->start(toInt(arg));
  210. } else if (command == "setJobConf") {
  211. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  212. sep = readUpto(arg, delim);
  213. int len = toInt(arg);
  214. vector<string> values(len);
  215. for(int i=0; i < len; ++i) {
  216. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  217. sep = readUpto(arg, delim);
  218. values.push_back(arg);
  219. }
  220. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  221. handler->setJobConf(values);
  222. } else if (command == "setInputTypes") {
  223. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  224. sep = readUpto(key, delim);
  225. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  226. sep = readUpto(value, delim);
  227. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  228. handler->setInputTypes(key, value);
  229. } else if (command == "runMap") {
  230. string split;
  231. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  232. sep = readUpto(split, delim);
  233. string reduces;
  234. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  235. sep = readUpto(reduces, delim);
  236. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  237. sep = readUpto(arg, delim);
  238. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  239. handler->runMap(split, toInt(reduces), toBool(arg));
  240. } else if (command == "runReduce") {
  241. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  242. sep = readUpto(arg, delim);
  243. HADOOP_ASSERT(sep == '\t', "Short text protocol command " + command);
  244. string piped;
  245. sep = readUpto(piped, delim);
  246. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  247. handler->runReduce(toInt(arg), toBool(piped));
  248. } else if (command == "abort") {
  249. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  250. handler->abort();
  251. } else if (command == "close") {
  252. HADOOP_ASSERT(sep == '\n', "Long text protocol command " + command);
  253. handler->close();
  254. } else {
  255. throw Error("Illegal text protocol command " + command);
  256. }
  257. }
  258. ~TextProtocol() {
  259. delete uplink;
  260. }
  261. };
  262. const char* TextProtocol::delim = "\t\n";
  263. enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP,
  264. MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE,
  265. CLOSE, ABORT, AUTHENTICATION_REQ,
  266. OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
  267. REGISTER_COUNTER, INCREMENT_COUNTER, AUTHENTICATION_RESP};
  268. class BinaryUpwardProtocol: public UpwardProtocol {
  269. private:
  270. FileOutStream* stream;
  271. public:
  272. BinaryUpwardProtocol(FILE* _stream) {
  273. stream = new FileOutStream();
  274. HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
  275. }
  276. virtual void authenticate(const string &responseDigest) {
  277. serializeInt(AUTHENTICATION_RESP, *stream);
  278. serializeString(responseDigest, *stream);
  279. stream->flush();
  280. }
  281. virtual void output(const string& key, const string& value) {
  282. serializeInt(OUTPUT, *stream);
  283. serializeString(key, *stream);
  284. serializeString(value, *stream);
  285. }
  286. virtual void partitionedOutput(int reduce, const string& key,
  287. const string& value) {
  288. serializeInt(PARTITIONED_OUTPUT, *stream);
  289. serializeInt(reduce, *stream);
  290. serializeString(key, *stream);
  291. serializeString(value, *stream);
  292. }
  293. virtual void status(const string& message) {
  294. serializeInt(STATUS, *stream);
  295. serializeString(message, *stream);
  296. }
  297. virtual void progress(float progress) {
  298. serializeInt(PROGRESS, *stream);
  299. serializeFloat(progress, *stream);
  300. stream->flush();
  301. }
  302. virtual void done() {
  303. serializeInt(DONE, *stream);
  304. }
  305. virtual void registerCounter(int id, const string& group,
  306. const string& name) {
  307. serializeInt(REGISTER_COUNTER, *stream);
  308. serializeInt(id, *stream);
  309. serializeString(group, *stream);
  310. serializeString(name, *stream);
  311. }
  312. virtual void incrementCounter(const TaskContext::Counter* counter,
  313. uint64_t amount) {
  314. serializeInt(INCREMENT_COUNTER, *stream);
  315. serializeInt(counter->getId(), *stream);
  316. serializeLong(amount, *stream);
  317. }
  318. ~BinaryUpwardProtocol() {
  319. delete stream;
  320. }
  321. };
  322. class BinaryProtocol: public Protocol {
  323. private:
  324. FileInStream* downStream;
  325. DownwardProtocol* handler;
  326. BinaryUpwardProtocol * uplink;
  327. string key;
  328. string value;
  329. string password;
  330. bool authDone;
  331. void getPassword(string &password) {
  332. const char *passwordFile = getenv("hadoop.pipes.shared.secret.location");
  333. if (passwordFile == NULL) {
  334. return;
  335. }
  336. std::ifstream fstr(passwordFile, std::fstream::binary);
  337. if (fstr.fail()) {
  338. std::cerr << "Could not open the password file" << std::endl;
  339. return;
  340. }
  341. unsigned char * passBuff = new unsigned char [512];
  342. fstr.read((char *)passBuff, 512);
  343. int passwordLength = fstr.gcount();
  344. fstr.close();
  345. passBuff[passwordLength] = 0;
  346. password.replace(0, passwordLength, (const char *) passBuff, passwordLength);
  347. delete [] passBuff;
  348. return;
  349. }
  350. void verifyDigestAndRespond(string& digest, string& challenge) {
  351. if (password.empty()) {
  352. //password can be empty if process is running in debug mode from
  353. //command file.
  354. authDone = true;
  355. return;
  356. }
  357. if (!verifyDigest(password, digest, challenge)) {
  358. std::cerr << "Server failed to authenticate. Exiting" << std::endl;
  359. exit(-1);
  360. }
  361. authDone = true;
  362. string responseDigest = createDigest(password, digest);
  363. uplink->authenticate(responseDigest);
  364. }
  365. bool verifyDigest(string &password, string& digest, string& challenge) {
  366. string expectedDigest = createDigest(password, challenge);
  367. if (digest == expectedDigest) {
  368. return true;
  369. } else {
  370. return false;
  371. }
  372. }
  373. string createDigest(string &password, string& msg) {
  374. HMAC_CTX ctx;
  375. unsigned char digest[EVP_MAX_MD_SIZE];
  376. HMAC_Init(&ctx, (const unsigned char *)password.c_str(),
  377. password.length(), EVP_sha1());
  378. HMAC_Update(&ctx, (const unsigned char *)msg.c_str(), msg.length());
  379. unsigned int digestLen;
  380. HMAC_Final(&ctx, digest, &digestLen);
  381. HMAC_cleanup(&ctx);
  382. //now apply base64 encoding
  383. BIO *bmem, *b64;
  384. BUF_MEM *bptr;
  385. b64 = BIO_new(BIO_f_base64());
  386. bmem = BIO_new(BIO_s_mem());
  387. b64 = BIO_push(b64, bmem);
  388. BIO_write(b64, digest, digestLen);
  389. BIO_flush(b64);
  390. BIO_get_mem_ptr(b64, &bptr);
  391. char digestBuffer[bptr->length];
  392. memcpy(digestBuffer, bptr->data, bptr->length-1);
  393. digestBuffer[bptr->length-1] = 0;
  394. BIO_free_all(b64);
  395. return string(digestBuffer);
  396. }
  397. public:
  398. BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
  399. downStream = new FileInStream();
  400. downStream->open(down);
  401. uplink = new BinaryUpwardProtocol(up);
  402. handler = _handler;
  403. authDone = false;
  404. getPassword(password);
  405. }
  406. UpwardProtocol* getUplink() {
  407. return uplink;
  408. }
  409. virtual void nextEvent() {
  410. int32_t cmd;
  411. cmd = deserializeInt(*downStream);
  412. if (!authDone && cmd != AUTHENTICATION_REQ) {
  413. //Authentication request must be the first message if
  414. //authentication is not complete
  415. std::cerr << "Command:" << cmd << "received before authentication. "
  416. << "Exiting.." << std::endl;
  417. exit(-1);
  418. }
  419. switch (cmd) {
  420. case AUTHENTICATION_REQ: {
  421. string digest;
  422. string challenge;
  423. deserializeString(digest, *downStream);
  424. deserializeString(challenge, *downStream);
  425. verifyDigestAndRespond(digest, challenge);
  426. break;
  427. }
  428. case START_MESSAGE: {
  429. int32_t prot;
  430. prot = deserializeInt(*downStream);
  431. handler->start(prot);
  432. break;
  433. }
  434. case SET_JOB_CONF: {
  435. int32_t entries;
  436. entries = deserializeInt(*downStream);
  437. vector<string> result(entries);
  438. for(int i=0; i < entries; ++i) {
  439. string item;
  440. deserializeString(item, *downStream);
  441. result.push_back(item);
  442. }
  443. handler->setJobConf(result);
  444. break;
  445. }
  446. case SET_INPUT_TYPES: {
  447. string keyType;
  448. string valueType;
  449. deserializeString(keyType, *downStream);
  450. deserializeString(valueType, *downStream);
  451. handler->setInputTypes(keyType, valueType);
  452. break;
  453. }
  454. case RUN_MAP: {
  455. string split;
  456. int32_t numReduces;
  457. int32_t piped;
  458. deserializeString(split, *downStream);
  459. numReduces = deserializeInt(*downStream);
  460. piped = deserializeInt(*downStream);
  461. handler->runMap(split, numReduces, piped);
  462. break;
  463. }
  464. case MAP_ITEM: {
  465. deserializeString(key, *downStream);
  466. deserializeString(value, *downStream);
  467. handler->mapItem(key, value);
  468. break;
  469. }
  470. case RUN_REDUCE: {
  471. int32_t reduce;
  472. int32_t piped;
  473. reduce = deserializeInt(*downStream);
  474. piped = deserializeInt(*downStream);
  475. handler->runReduce(reduce, piped);
  476. break;
  477. }
  478. case REDUCE_KEY: {
  479. deserializeString(key, *downStream);
  480. handler->reduceKey(key);
  481. break;
  482. }
  483. case REDUCE_VALUE: {
  484. deserializeString(value, *downStream);
  485. handler->reduceValue(value);
  486. break;
  487. }
  488. case CLOSE:
  489. handler->close();
  490. break;
  491. case ABORT:
  492. handler->abort();
  493. break;
  494. default:
  495. HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
  496. }
  497. }
  498. virtual ~BinaryProtocol() {
  499. delete downStream;
  500. delete uplink;
  501. }
  502. };
  503. /**
  504. * Define a context object to give to combiners that will let them
  505. * go through the values and emit their results correctly.
  506. */
  507. class CombineContext: public ReduceContext {
  508. private:
  509. ReduceContext* baseContext;
  510. Partitioner* partitioner;
  511. int numReduces;
  512. UpwardProtocol* uplink;
  513. bool firstKey;
  514. bool firstValue;
  515. map<string, vector<string> >::iterator keyItr;
  516. map<string, vector<string> >::iterator endKeyItr;
  517. vector<string>::iterator valueItr;
  518. vector<string>::iterator endValueItr;
  519. public:
  520. CombineContext(ReduceContext* _baseContext,
  521. Partitioner* _partitioner,
  522. int _numReduces,
  523. UpwardProtocol* _uplink,
  524. map<string, vector<string> >& data) {
  525. baseContext = _baseContext;
  526. partitioner = _partitioner;
  527. numReduces = _numReduces;
  528. uplink = _uplink;
  529. keyItr = data.begin();
  530. endKeyItr = data.end();
  531. firstKey = true;
  532. firstValue = true;
  533. }
  534. virtual const JobConf* getJobConf() {
  535. return baseContext->getJobConf();
  536. }
  537. virtual const std::string& getInputKey() {
  538. return keyItr->first;
  539. }
  540. virtual const std::string& getInputValue() {
  541. return *valueItr;
  542. }
  543. virtual void emit(const std::string& key, const std::string& value) {
  544. if (partitioner != NULL) {
  545. uplink->partitionedOutput(partitioner->partition(key, numReduces),
  546. key, value);
  547. } else {
  548. uplink->output(key, value);
  549. }
  550. }
  551. virtual void progress() {
  552. baseContext->progress();
  553. }
  554. virtual void setStatus(const std::string& status) {
  555. baseContext->setStatus(status);
  556. }
  557. bool nextKey() {
  558. if (firstKey) {
  559. firstKey = false;
  560. } else {
  561. ++keyItr;
  562. }
  563. if (keyItr != endKeyItr) {
  564. valueItr = keyItr->second.begin();
  565. endValueItr = keyItr->second.end();
  566. firstValue = true;
  567. return true;
  568. }
  569. return false;
  570. }
  571. virtual bool nextValue() {
  572. if (firstValue) {
  573. firstValue = false;
  574. } else {
  575. ++valueItr;
  576. }
  577. return valueItr != endValueItr;
  578. }
  579. virtual Counter* getCounter(const std::string& group,
  580. const std::string& name) {
  581. return baseContext->getCounter(group, name);
  582. }
  583. virtual void incrementCounter(const Counter* counter, uint64_t amount) {
  584. baseContext->incrementCounter(counter, amount);
  585. }
  586. };
  587. /**
  588. * A RecordWriter that will take the map outputs, buffer them up and then
  589. * combine then when the buffer is full.
  590. */
  591. class CombineRunner: public RecordWriter {
  592. private:
  593. map<string, vector<string> > data;
  594. int64_t spillSize;
  595. int64_t numBytes;
  596. ReduceContext* baseContext;
  597. Partitioner* partitioner;
  598. int numReduces;
  599. UpwardProtocol* uplink;
  600. Reducer* combiner;
  601. public:
  602. CombineRunner(int64_t _spillSize, ReduceContext* _baseContext,
  603. Reducer* _combiner, UpwardProtocol* _uplink,
  604. Partitioner* _partitioner, int _numReduces) {
  605. numBytes = 0;
  606. spillSize = _spillSize;
  607. baseContext = _baseContext;
  608. partitioner = _partitioner;
  609. numReduces = _numReduces;
  610. uplink = _uplink;
  611. combiner = _combiner;
  612. }
  613. virtual void emit(const std::string& key,
  614. const std::string& value) {
  615. numBytes += key.length() + value.length();
  616. data[key].push_back(value);
  617. if (numBytes >= spillSize) {
  618. spillAll();
  619. }
  620. }
  621. virtual void close() {
  622. spillAll();
  623. }
  624. private:
  625. void spillAll() {
  626. CombineContext context(baseContext, partitioner, numReduces,
  627. uplink, data);
  628. while (context.nextKey()) {
  629. combiner->reduce(context);
  630. }
  631. data.clear();
  632. numBytes = 0;
  633. }
  634. };
  635. class TaskContextImpl: public MapContext, public ReduceContext,
  636. public DownwardProtocol {
  637. private:
  638. bool done;
  639. JobConf* jobConf;
  640. string key;
  641. const string* newKey;
  642. const string* value;
  643. bool hasTask;
  644. bool isNewKey;
  645. bool isNewValue;
  646. string* inputKeyClass;
  647. string* inputValueClass;
  648. string status;
  649. float progressFloat;
  650. uint64_t lastProgress;
  651. bool statusSet;
  652. Protocol* protocol;
  653. UpwardProtocol *uplink;
  654. string* inputSplit;
  655. RecordReader* reader;
  656. Mapper* mapper;
  657. Reducer* reducer;
  658. RecordWriter* writer;
  659. Partitioner* partitioner;
  660. int numReduces;
  661. const Factory* factory;
  662. pthread_mutex_t mutexDone;
  663. std::vector<int> registeredCounterIds;
  664. public:
  665. TaskContextImpl(const Factory& _factory) {
  666. statusSet = false;
  667. done = false;
  668. newKey = NULL;
  669. factory = &_factory;
  670. jobConf = NULL;
  671. inputKeyClass = NULL;
  672. inputValueClass = NULL;
  673. inputSplit = NULL;
  674. mapper = NULL;
  675. reducer = NULL;
  676. reader = NULL;
  677. writer = NULL;
  678. partitioner = NULL;
  679. protocol = NULL;
  680. isNewKey = false;
  681. isNewValue = false;
  682. lastProgress = 0;
  683. progressFloat = 0.0f;
  684. hasTask = false;
  685. pthread_mutex_init(&mutexDone, NULL);
  686. }
  687. void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
  688. protocol = _protocol;
  689. uplink = _uplink;
  690. }
  691. virtual void start(int protocol) {
  692. if (protocol != 0) {
  693. throw Error("Protocol version " + toString(protocol) +
  694. " not supported");
  695. }
  696. }
  697. virtual void setJobConf(vector<string> values) {
  698. int len = values.size();
  699. JobConfImpl* result = new JobConfImpl();
  700. HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
  701. for(int i=0; i < len; i += 2) {
  702. result->set(values[i], values[i+1]);
  703. }
  704. jobConf = result;
  705. }
  706. virtual void setInputTypes(string keyType, string valueType) {
  707. inputKeyClass = new string(keyType);
  708. inputValueClass = new string(valueType);
  709. }
  710. virtual void runMap(string _inputSplit, int _numReduces, bool pipedInput) {
  711. inputSplit = new string(_inputSplit);
  712. reader = factory->createRecordReader(*this);
  713. HADOOP_ASSERT((reader == NULL) == pipedInput,
  714. pipedInput ? "RecordReader defined when not needed.":
  715. "RecordReader not defined");
  716. if (reader != NULL) {
  717. value = new string();
  718. }
  719. mapper = factory->createMapper(*this);
  720. numReduces = _numReduces;
  721. if (numReduces != 0) {
  722. reducer = factory->createCombiner(*this);
  723. partitioner = factory->createPartitioner(*this);
  724. }
  725. if (reducer != NULL) {
  726. int64_t spillSize = 100;
  727. if (jobConf->hasKey("mapreduce.task.io.sort.mb")) {
  728. spillSize = jobConf->getInt("mapreduce.task.io.sort.mb");
  729. }
  730. writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer,
  731. uplink, partitioner, numReduces);
  732. }
  733. hasTask = true;
  734. }
  735. virtual void mapItem(const string& _key, const string& _value) {
  736. newKey = &_key;
  737. value = &_value;
  738. isNewKey = true;
  739. }
  740. virtual void runReduce(int reduce, bool pipedOutput) {
  741. reducer = factory->createReducer(*this);
  742. writer = factory->createRecordWriter(*this);
  743. HADOOP_ASSERT((writer == NULL) == pipedOutput,
  744. pipedOutput ? "RecordWriter defined when not needed.":
  745. "RecordWriter not defined");
  746. hasTask = true;
  747. }
  748. virtual void reduceKey(const string& _key) {
  749. isNewKey = true;
  750. newKey = &_key;
  751. }
  752. virtual void reduceValue(const string& _value) {
  753. isNewValue = true;
  754. value = &_value;
  755. }
  756. virtual bool isDone() {
  757. pthread_mutex_lock(&mutexDone);
  758. bool doneCopy = done;
  759. pthread_mutex_unlock(&mutexDone);
  760. return doneCopy;
  761. }
  762. virtual void close() {
  763. pthread_mutex_lock(&mutexDone);
  764. done = true;
  765. pthread_mutex_unlock(&mutexDone);
  766. }
  767. virtual void abort() {
  768. throw Error("Aborted by driver");
  769. }
  770. void waitForTask() {
  771. while (!done && !hasTask) {
  772. protocol->nextEvent();
  773. }
  774. }
  775. bool nextKey() {
  776. if (reader == NULL) {
  777. while (!isNewKey) {
  778. nextValue();
  779. if (done) {
  780. return false;
  781. }
  782. }
  783. key = *newKey;
  784. } else {
  785. if (!reader->next(key, const_cast<string&>(*value))) {
  786. pthread_mutex_lock(&mutexDone);
  787. done = true;
  788. pthread_mutex_unlock(&mutexDone);
  789. return false;
  790. }
  791. progressFloat = reader->getProgress();
  792. }
  793. isNewKey = false;
  794. if (mapper != NULL) {
  795. mapper->map(*this);
  796. } else {
  797. reducer->reduce(*this);
  798. }
  799. return true;
  800. }
  801. /**
  802. * Advance to the next value.
  803. */
  804. virtual bool nextValue() {
  805. if (isNewKey || done) {
  806. return false;
  807. }
  808. isNewValue = false;
  809. progress();
  810. protocol->nextEvent();
  811. return isNewValue;
  812. }
  813. /**
  814. * Get the JobConf for the current task.
  815. */
  816. virtual JobConf* getJobConf() {
  817. return jobConf;
  818. }
  819. /**
  820. * Get the current key.
  821. * @return the current key or NULL if called before the first map or reduce
  822. */
  823. virtual const string& getInputKey() {
  824. return key;
  825. }
  826. /**
  827. * Get the current value.
  828. * @return the current value or NULL if called before the first map or
  829. * reduce
  830. */
  831. virtual const string& getInputValue() {
  832. return *value;
  833. }
  834. /**
  835. * Mark your task as having made progress without changing the status
  836. * message.
  837. */
  838. virtual void progress() {
  839. if (uplink != 0) {
  840. uint64_t now = getCurrentMillis();
  841. if (now - lastProgress > 1000) {
  842. lastProgress = now;
  843. if (statusSet) {
  844. uplink->status(status);
  845. statusSet = false;
  846. }
  847. uplink->progress(progressFloat);
  848. }
  849. }
  850. }
  851. /**
  852. * Set the status message and call progress.
  853. */
  854. virtual void setStatus(const string& status) {
  855. this->status = status;
  856. statusSet = true;
  857. progress();
  858. }
  859. /**
  860. * Get the name of the key class of the input to this task.
  861. */
  862. virtual const string& getInputKeyClass() {
  863. return *inputKeyClass;
  864. }
  865. /**
  866. * Get the name of the value class of the input to this task.
  867. */
  868. virtual const string& getInputValueClass() {
  869. return *inputValueClass;
  870. }
  871. /**
  872. * Access the InputSplit of the mapper.
  873. */
  874. virtual const std::string& getInputSplit() {
  875. return *inputSplit;
  876. }
  877. virtual void emit(const string& key, const string& value) {
  878. progress();
  879. if (writer != NULL) {
  880. writer->emit(key, value);
  881. } else if (partitioner != NULL) {
  882. int part = partitioner->partition(key, numReduces);
  883. uplink->partitionedOutput(part, key, value);
  884. } else {
  885. uplink->output(key, value);
  886. }
  887. }
  888. /**
  889. * Register a counter with the given group and name.
  890. */
  891. virtual Counter* getCounter(const std::string& group,
  892. const std::string& name) {
  893. int id = registeredCounterIds.size();
  894. registeredCounterIds.push_back(id);
  895. uplink->registerCounter(id, group, name);
  896. return new Counter(id);
  897. }
  898. /**
  899. * Increment the value of the counter with the given amount.
  900. */
  901. virtual void incrementCounter(const Counter* counter, uint64_t amount) {
  902. uplink->incrementCounter(counter, amount);
  903. }
  904. void closeAll() {
  905. if (reader) {
  906. reader->close();
  907. }
  908. if (mapper) {
  909. mapper->close();
  910. }
  911. if (reducer) {
  912. reducer->close();
  913. }
  914. if (writer) {
  915. writer->close();
  916. }
  917. }
  918. virtual ~TaskContextImpl() {
  919. delete jobConf;
  920. delete inputKeyClass;
  921. delete inputValueClass;
  922. delete inputSplit;
  923. if (reader) {
  924. delete value;
  925. }
  926. delete reader;
  927. delete mapper;
  928. delete reducer;
  929. delete writer;
  930. delete partitioner;
  931. pthread_mutex_destroy(&mutexDone);
  932. }
  933. };
  934. /**
  935. * Ping the parent every 5 seconds to know if it is alive
  936. */
  937. void* ping(void* ptr) {
  938. TaskContextImpl* context = (TaskContextImpl*) ptr;
  939. char* portStr = getenv("mapreduce.pipes.command.port");
  940. int MAX_RETRIES = 3;
  941. int remaining_retries = MAX_RETRIES;
  942. while (!context->isDone()) {
  943. try{
  944. sleep(5);
  945. int sock = -1;
  946. if (portStr) {
  947. sock = socket(PF_INET, SOCK_STREAM, 0);
  948. HADOOP_ASSERT(sock != - 1,
  949. string("problem creating socket: ") + strerror(errno));
  950. sockaddr_in addr;
  951. addr.sin_family = AF_INET;
  952. addr.sin_port = htons(toInt(portStr));
  953. addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  954. HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
  955. string("problem connecting command socket: ") +
  956. strerror(errno));
  957. }
  958. if (sock != -1) {
  959. int result = shutdown(sock, SHUT_RDWR);
  960. HADOOP_ASSERT(result == 0, "problem shutting socket");
  961. result = close(sock);
  962. HADOOP_ASSERT(result == 0, "problem closing socket");
  963. }
  964. remaining_retries = MAX_RETRIES;
  965. } catch (Error& err) {
  966. if (!context->isDone()) {
  967. fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n",
  968. err.getMessage().c_str());
  969. remaining_retries -= 1;
  970. if (remaining_retries == 0) {
  971. exit(1);
  972. }
  973. } else {
  974. return NULL;
  975. }
  976. }
  977. }
  978. return NULL;
  979. }
  980. /**
  981. * Run the assigned task in the framework.
  982. * The user's main function should set the various functions using the
  983. * set* functions above and then call this.
  984. * @return true, if the task succeeded.
  985. */
  986. bool runTask(const Factory& factory) {
  987. try {
  988. TaskContextImpl* context = new TaskContextImpl(factory);
  989. Protocol* connection;
  990. char* portStr = getenv("mapreduce.pipes.command.port");
  991. int sock = -1;
  992. FILE* stream = NULL;
  993. FILE* outStream = NULL;
  994. char *bufin = NULL;
  995. char *bufout = NULL;
  996. if (portStr) {
  997. sock = socket(PF_INET, SOCK_STREAM, 0);
  998. HADOOP_ASSERT(sock != - 1,
  999. string("problem creating socket: ") + strerror(errno));
  1000. sockaddr_in addr;
  1001. addr.sin_family = AF_INET;
  1002. addr.sin_port = htons(toInt(portStr));
  1003. addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  1004. HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
  1005. string("problem connecting command socket: ") +
  1006. strerror(errno));
  1007. stream = fdopen(sock, "r");
  1008. outStream = fdopen(sock, "w");
  1009. // increase buffer size
  1010. int bufsize = 128*1024;
  1011. int setbuf;
  1012. bufin = new char[bufsize];
  1013. bufout = new char[bufsize];
  1014. setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
  1015. HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
  1016. + strerror(errno));
  1017. setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
  1018. HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
  1019. + strerror(errno));
  1020. connection = new BinaryProtocol(stream, context, outStream);
  1021. } else if (getenv("mapreduce.pipes.commandfile")) {
  1022. char* filename = getenv("mapreduce.pipes.commandfile");
  1023. string outFilename = filename;
  1024. outFilename += ".out";
  1025. stream = fopen(filename, "r");
  1026. outStream = fopen(outFilename.c_str(), "w");
  1027. connection = new BinaryProtocol(stream, context, outStream);
  1028. } else {
  1029. connection = new TextProtocol(stdin, context, stdout);
  1030. }
  1031. context->setProtocol(connection, connection->getUplink());
  1032. pthread_t pingThread;
  1033. pthread_create(&pingThread, NULL, ping, (void*)(context));
  1034. context->waitForTask();
  1035. while (!context->isDone()) {
  1036. context->nextKey();
  1037. }
  1038. context->closeAll();
  1039. connection->getUplink()->done();
  1040. pthread_join(pingThread,NULL);
  1041. delete context;
  1042. delete connection;
  1043. if (stream != NULL) {
  1044. fflush(stream);
  1045. }
  1046. if (outStream != NULL) {
  1047. fflush(outStream);
  1048. }
  1049. fflush(stdout);
  1050. if (sock != -1) {
  1051. int result = shutdown(sock, SHUT_RDWR);
  1052. HADOOP_ASSERT(result == 0, "problem shutting socket");
  1053. result = close(sock);
  1054. HADOOP_ASSERT(result == 0, "problem closing socket");
  1055. }
  1056. if (stream != NULL) {
  1057. //fclose(stream);
  1058. }
  1059. if (outStream != NULL) {
  1060. //fclose(outStream);
  1061. }
  1062. delete bufin;
  1063. delete bufout;
  1064. return true;
  1065. } catch (Error& err) {
  1066. fprintf(stderr, "Hadoop Pipes Exception: %s\n",
  1067. err.getMessage().c_str());
  1068. return false;
  1069. }
  1070. }
  1071. }