Browse Source

HADOOP-1983. Make pipes flush the command socket when the application calls
progress.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@582391 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 18 years ago
parent
commit
348eab09cc

+ 3 - 0
CHANGES.txt

@@ -244,6 +244,9 @@ Trunk (unreleased changes)
     HADOOP-1695.  The SecondaryNamenode waits for the Primary NameNode to
     start up.  (Dhruba Borthakur)
 
+    HADOOP-1983.  Have Pipes flush the command socket when progress is sent
+    to prevent timeouts during long computations. (omalley)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

+ 2 - 1
src/c++/pipes/impl/HadoopPipes.cc

@@ -287,6 +287,7 @@ namespace HadoopPipes {
     virtual void progress(float progress) {
       serializeInt(PROGRESS, *stream);
       serializeFloat(progress, *stream);
+      stream->flush();
     }
 
     virtual void done() {
@@ -758,11 +759,11 @@ namespace HadoopPipes {
         uint64_t now = getCurrentMillis();
         if (now - lastProgress > 1000) {
           lastProgress = now;
-          uplink->progress(progressFloat);
           if (statusSet) {
             uplink->status(status);
             statusSet = false;
           }
+          uplink->progress(progressFloat);
         }
       }
     }

+ 7 - 0
src/c++/utils/api/hadoop/SerialUtils.hh

@@ -73,6 +73,7 @@ namespace HadoopUtils {
      * @throws Error if there are problems reading
      */
     virtual void read(void *buf, size_t len) = 0;
+    virtual ~InStream() {}
   };
 
   /**
@@ -87,6 +88,11 @@ namespace HadoopUtils {
      * @throws Error if there are problems writing
      */
     virtual void write(const void *buf, size_t len) = 0;
+    /**
+     * Flush the data to the underlying store.
+     */
+    virtual void flush() = 0;
+    virtual ~OutStream() {}
   };
 
   /**
@@ -130,6 +136,7 @@ namespace HadoopUtils {
     bool open(FILE* file);
     void write(const void* buf, size_t len);
     bool advance(size_t nbytes);
+    void flush();
     bool close();
     virtual ~FileOutStream();
   private:

+ 5 - 0
src/c++/utils/impl/SerialUtils.cc

@@ -144,6 +144,11 @@ namespace HadoopUtils {
     return (ret == 0);
   }
 
+  void FileOutStream::flush()
+  {
+    fflush(mFile);
+  }
+
   FileOutStream::~FileOutStream()
   {
     if (mFile != NULL) {

+ 4 - 5
src/examples/pipes/Makefile.in

@@ -39,11 +39,10 @@ POST_UNINSTALL = :
 host_triplet = @host@
 bin_PROGRAMS = wordcount-simple$(EXEEXT) wordcount-part$(EXEEXT) \
 	wordcount-nopipe$(EXEEXT)
-DIST_COMMON = config.guess config.guess config.sub config.sub \
-	$(srcdir)/Makefile.in $(srcdir)/Makefile.am \
-	$(top_srcdir)/configure $(am__configure_deps) \
-	$(top_srcdir)/impl/config.h.in depcomp depcomp ltmain.sh \
-	ltmain.sh config.guess config.guess config.sub config.sub
+DIST_COMMON = config.guess config.sub $(srcdir)/Makefile.in \
+	$(srcdir)/Makefile.am $(top_srcdir)/configure \
+	$(am__configure_deps) $(top_srcdir)/impl/config.h.in depcomp \
+	ltmain.sh config.guess config.sub
 subdir = .
 ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
 am__aclocal_m4_deps =  \