|
@@ -30,6 +30,7 @@
|
|
|
#include <stdlib.h>
|
|
|
#include <strings.h>
|
|
|
#include <sys/socket.h>
|
|
|
+#include <pthread.h>
|
|
|
|
|
|
using std::map;
|
|
|
using std::string;
|
|
@@ -584,6 +585,7 @@ namespace HadoopPipes {
|
|
|
Partitioner* partitioner;
|
|
|
int numReduces;
|
|
|
const Factory* factory;
|
|
|
+ pthread_mutex_t mutexDone;
|
|
|
|
|
|
public:
|
|
|
|
|
@@ -607,6 +609,7 @@ namespace HadoopPipes {
|
|
|
lastProgress = 0;
|
|
|
progressFloat = 0.0f;
|
|
|
hasTask = false;
|
|
|
+ pthread_mutex_init(&mutexDone, NULL);
|
|
|
}
|
|
|
|
|
|
void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
|
|
@@ -689,11 +692,16 @@ namespace HadoopPipes {
|
|
|
}
|
|
|
|
|
|
virtual bool isDone() {
|
|
|
- return done;
|
|
|
+ pthread_mutex_lock(&mutexDone);
|
|
|
+ bool doneCopy = done;
|
|
|
+ pthread_mutex_unlock(&mutexDone);
|
|
|
+ return doneCopy;
|
|
|
}
|
|
|
|
|
|
virtual void close() {
|
|
|
+ pthread_mutex_lock(&mutexDone);
|
|
|
done = true;
|
|
|
+ pthread_mutex_unlock(&mutexDone);
|
|
|
}
|
|
|
|
|
|
virtual void abort() {
|
|
@@ -717,7 +725,9 @@ namespace HadoopPipes {
|
|
|
key = *newKey;
|
|
|
} else {
|
|
|
if (!reader->next(key, const_cast<string&>(*value))) {
|
|
|
+ pthread_mutex_lock(&mutexDone);
|
|
|
done = true;
|
|
|
+ pthread_mutex_unlock(&mutexDone);
|
|
|
return false;
|
|
|
}
|
|
|
progressFloat = reader->getProgress();
|
|
@@ -856,9 +866,58 @@ namespace HadoopPipes {
|
|
|
delete reducer;
|
|
|
delete writer;
|
|
|
delete partitioner;
|
|
|
+ pthread_mutex_destroy(&mutexDone);
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ /**
|
|
|
+ * Ping the parent every 5 seconds to know if it is alive
|
|
|
+ */
|
|
|
+ void* ping(void* ptr) {
|
|
|
+ TaskContextImpl* context = (TaskContextImpl*) ptr;
|
|
|
+ char* portStr = getenv("hadoop.pipes.command.port");
|
|
|
+ int MAX_RETRIES = 3;
|
|
|
+ int remaining_retries = MAX_RETRIES;
|
|
|
+ while (!context->isDone()) {
|
|
|
+ try{
|
|
|
+ sleep(5);
|
|
|
+ int sock = -1;
|
|
|
+ if (portStr) {
|
|
|
+ sock = socket(PF_INET, SOCK_STREAM, 0);
|
|
|
+ HADOOP_ASSERT(sock != - 1,
|
|
|
+ string("problem creating socket: ") + strerror(errno));
|
|
|
+ sockaddr_in addr;
|
|
|
+ addr.sin_family = AF_INET;
|
|
|
+ addr.sin_port = htons(toInt(portStr));
|
|
|
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
|
|
+ HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
|
|
|
+ string("problem connecting command socket: ") +
|
|
|
+ strerror(errno));
|
|
|
+
|
|
|
+ }
|
|
|
+ if (sock != -1) {
|
|
|
+ int result = shutdown(sock, SHUT_RDWR);
|
|
|
+ HADOOP_ASSERT(result == 0, "problem shutting socket");
|
|
|
+ result = close(sock);
|
|
|
+ HADOOP_ASSERT(result == 0, "problem closing socket");
|
|
|
+ }
|
|
|
+ remaining_retries = MAX_RETRIES;
|
|
|
+ } catch (Error& err) {
|
|
|
+ if (!context->isDone()) {
|
|
|
+ fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n",
|
|
|
+ err.getMessage().c_str());
|
|
|
+ remaining_retries -= 1;
|
|
|
+ if (remaining_retries == 0) {
|
|
|
+ exit(1);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Run the assigned task in the framework.
|
|
|
* The user's main function should set the various functions using the
|
|
@@ -914,12 +973,15 @@ namespace HadoopPipes {
|
|
|
connection = new TextProtocol(stdin, context, stdout);
|
|
|
}
|
|
|
context->setProtocol(connection, connection->getUplink());
|
|
|
+ pthread_t pingThread;
|
|
|
+ pthread_create(&pingThread, NULL, ping, (void*)(context));
|
|
|
context->waitForTask();
|
|
|
while (!context->isDone()) {
|
|
|
context->nextKey();
|
|
|
}
|
|
|
context->closeAll();
|
|
|
connection->getUplink()->done();
|
|
|
+ pthread_join(pingThread,NULL);
|
|
|
delete context;
|
|
|
delete connection;
|
|
|
if (stream != NULL) {
|