Browse Source

HADOOP-3617. Removed redundant checks of accounting space in MapTask and makes the spill thread persistent so as to avoid creating a new one for each spill. Contributed by Chris Douglas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@677470 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 years ago
parent
commit
023c5eccf1
2 changed files with 151 additions and 77 deletions
  1. 4 0
      CHANGES.txt
  2. 147 77
      src/mapred/org/apache/hadoop/mapred/MapTask.java

+ 4 - 0
CHANGES.txt

@@ -90,6 +90,10 @@ Trunk (unreleased changes)
     randomization is by the hosts (and not the map outputs themselves). 
     randomization is by the hosts (and not the map outputs themselves). 
     (Jothi Padmanabhan via ddas)
     (Jothi Padmanabhan via ddas)
 
 
+    HADOOP-3617. Removed redundant checks of accounting space in MapTask and
+    makes the spill thread persistent so as to avoid creating a new one for
+    each spill. (Chris Douglas via acmurthy)  
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 147 - 77
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -32,6 +32,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -326,8 +328,12 @@ class MapTask extends Task {
     private final int softBufferLimit;
     private final int softBufferLimit;
     private final int minSpillsForCombine;
     private final int minSpillsForCombine;
     private final IndexedSorter sorter;
     private final IndexedSorter sorter;
-    private final Object spillLock = new Object();
+    private final ReentrantLock spillLock = new ReentrantLock();
+    private final Condition spillDone = spillLock.newCondition();
+    private final Condition spillReady = spillLock.newCondition();
     private final BlockingBuffer bb = new BlockingBuffer();
     private final BlockingBuffer bb = new BlockingBuffer();
+    private volatile boolean spillThreadRunning = false;
+    private final SpillThread spillThread = new SpillThread();
 
 
     private final FileSystem localFs;
     private final FileSystem localFs;
 
 
@@ -403,6 +409,24 @@ class MapTask extends Task {
         ? new CombineOutputCollector(combineOutputCounter)
         ? new CombineOutputCollector(combineOutputCounter)
         : null;
         : null;
       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
+      spillThread.setDaemon(true);
+      spillThread.setName("SpillThread");
+      spillLock.lock();
+      try {
+        spillThread.start();
+        while (!spillThreadRunning) {
+          spillDone.await();
+        }
+      } catch (InterruptedException e) {
+        throw (IOException)new IOException("Spill thread failed to initialize"
+            ).initCause(sortSpillException);
+      } finally {
+        spillLock.unlock();
+      }
+      if (sortSpillException != null) {
+        throw (IOException)new IOException("Spill thread failed to initialize"
+            ).initCause(sortSpillException);
+      }
     }
     }
 
 
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
@@ -419,12 +443,43 @@ class MapTask extends Task {
                               + valClass.getName() + ", recieved "
                               + valClass.getName() + ", recieved "
                               + value.getClass().getName());
                               + value.getClass().getName());
       }
       }
-      if (sortSpillException != null) {
-        throw (IOException)new IOException("Spill failed"
-            ).initCause(sortSpillException);
+      final int kvnext = (kvindex + 1) % kvoffsets.length;
+      spillLock.lock();
+      try {
+        boolean kvfull;
+        do {
+          if (sortSpillException != null) {
+            throw (IOException)new IOException("Spill failed"
+                ).initCause(sortSpillException);
+          }
+          // sufficient acct space
+          kvfull = kvnext == kvstart;
+          final boolean kvsoftlimit = ((kvnext > kvend)
+              ? kvnext - kvend > softRecordLimit
+              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+          if (kvstart == kvend && kvsoftlimit) {
+            LOG.info("Spilling map output: record full = " + kvsoftlimit);
+            startSpill();
+          }
+          if (kvfull) {
+            try {
+              while (kvstart != kvend) {
+                reporter.progress();
+                spillDone.await();
+              }
+            } catch (InterruptedException e) {
+              throw (IOException)new IOException(
+                  "Collector interrupted while waiting for the writer"
+                  ).initCause(e);
+            }
+          }
+        } while (kvfull);
+      } finally {
+        spillLock.unlock();
       }
       }
+
       try {
       try {
-          // serialize key bytes into buffer
+        // serialize key bytes into buffer
         int keystart = bufindex;
         int keystart = bufindex;
         keySerializer.serialize(key);
         keySerializer.serialize(key);
         if (bufindex < keystart) {
         if (bufindex < keystart) {
@@ -433,25 +488,20 @@ class MapTask extends Task {
           keystart = 0;
           keystart = 0;
         }
         }
         // serialize value bytes into buffer
         // serialize value bytes into buffer
-        int valstart = bufindex;
+        final int valstart = bufindex;
         valSerializer.serialize(value);
         valSerializer.serialize(value);
         int valend = bb.markRecord();
         int valend = bb.markRecord();
-        mapOutputByteCounter.increment(valend >= keystart
-            ? valend - keystart
-            : (bufvoid - keystart) + valend);
 
 
-        if (keystart == bufindex) {
-          // if emitted records make no writes, it's possible to wrap
-          // accounting space without notice
-          bb.write(new byte[0], 0, 0);
-        }
-
-        int partition = partitioner.getPartition(key, value, partitions);
+        final int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
           throw new IOException("Illegal partition for " + key + " (" +
               partition + ")");
               partition + ")");
         }
         }
+
         mapOutputRecordCounter.increment(1);
         mapOutputRecordCounter.increment(1);
+        mapOutputByteCounter.increment(valend >= keystart
+            ? valend - keystart
+            : (bufvoid - keystart) + valend);
 
 
         // update accounting info
         // update accounting info
         int ind = kvindex * ACCTSIZE;
         int ind = kvindex * ACCTSIZE;
@@ -459,7 +509,7 @@ class MapTask extends Task {
         kvindices[ind + PARTITION] = partition;
         kvindices[ind + PARTITION] = partition;
         kvindices[ind + KEYSTART] = keystart;
         kvindices[ind + KEYSTART] = keystart;
         kvindices[ind + VALSTART] = valstart;
         kvindices[ind + VALSTART] = valstart;
-        kvindex = (kvindex + 1) % kvoffsets.length;
+        kvindex = kvnext;
       } catch (MapBufferTooSmallException e) {
       } catch (MapBufferTooSmallException e) {
         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
         spillSingleRecord(key, value);
         spillSingleRecord(key, value);
@@ -578,19 +628,16 @@ class MapTask extends Task {
       @Override
       @Override
       public synchronized void write(byte b[], int off, int len)
       public synchronized void write(byte b[], int off, int len)
           throws IOException {
           throws IOException {
-        boolean kvfull = false;
         boolean buffull = false;
         boolean buffull = false;
         boolean wrap = false;
         boolean wrap = false;
-        synchronized(spillLock) {
+        spillLock.lock();
+        try {
           do {
           do {
             if (sortSpillException != null) {
             if (sortSpillException != null) {
               throw (IOException)new IOException("Spill failed"
               throw (IOException)new IOException("Spill failed"
                   ).initCause(sortSpillException);
                   ).initCause(sortSpillException);
             }
             }
 
 
-            // sufficient accounting space?
-            final int kvnext = (kvindex + 1) % kvoffsets.length;
-            kvfull = kvnext == kvstart;
             // sufficient buffer space?
             // sufficient buffer space?
             if (bufstart <= bufend && bufend <= bufindex) {
             if (bufstart <= bufend && bufend <= bufindex) {
               buffull = bufindex + len > bufvoid;
               buffull = bufindex + len > bufvoid;
@@ -606,26 +653,12 @@ class MapTask extends Task {
               // spill thread not running
               // spill thread not running
               if (kvend != kvindex) {
               if (kvend != kvindex) {
                 // we have records we can spill
                 // we have records we can spill
-                final boolean kvsoftlimit = (kvnext > kvend)
-                  ? kvnext - kvend > softRecordLimit
-                  : kvend - kvnext <= kvoffsets.length - softRecordLimit;
                 final boolean bufsoftlimit = (bufindex > bufend)
                 final boolean bufsoftlimit = (bufindex > bufend)
                   ? bufindex - bufend > softBufferLimit
                   ? bufindex - bufend > softBufferLimit
                   : bufend - bufindex < bufvoid - softBufferLimit;
                   : bufend - bufindex < bufvoid - softBufferLimit;
-                if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
-                  LOG.info("Spilling map output: buffer full = " + bufsoftlimit+
-                           " and record full = " + kvsoftlimit);
-                  LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                           "; bufvoid = " + bufvoid);
-                  LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
-                           "; length = " + kvoffsets.length);
-                  kvend = kvindex;
-                  bufend = bufmark;
-                  // TODO No need to recreate this thread every time
-                  SpillThread t = new SpillThread();
-                  t.setDaemon(true);
-                  t.setName("SpillThread");
-                  t.start();
+                if (bufsoftlimit || (buffull && !wrap)) {
+                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
+                  startSpill();
                 }
                 }
               } else if (buffull && !wrap) {
               } else if (buffull && !wrap) {
                 // We have no buffered records, and this record is too large
                 // We have no buffered records, and this record is too large
@@ -641,19 +674,21 @@ class MapTask extends Task {
               }
               }
             }
             }
 
 
-            if (kvfull || (buffull && !wrap)) {
-              while (kvstart != kvend) {
-                reporter.progress();
-                try {
-                  spillLock.wait();
-                } catch (InterruptedException e) {
+            if (buffull && !wrap) {
+              try {
+                while (kvstart != kvend) {
+                  reporter.progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
                   throw (IOException)new IOException(
                   throw (IOException)new IOException(
                       "Buffer interrupted while waiting for the writer"
                       "Buffer interrupted while waiting for the writer"
                       ).initCause(e);
                       ).initCause(e);
-                }
               }
               }
             }
             }
-          } while (kvfull || (buffull && !wrap));
+          } while (buffull && !wrap);
+        } finally {
+          spillLock.unlock();
         }
         }
         // here, we know that we have sufficient space to write
         // here, we know that we have sufficient space to write
         if (buffull) {
         if (buffull) {
@@ -670,30 +705,41 @@ class MapTask extends Task {
 
 
     public synchronized void flush() throws IOException {
     public synchronized void flush() throws IOException {
       LOG.info("Starting flush of map output");
       LOG.info("Starting flush of map output");
-      synchronized (spillLock) {
+      spillLock.lock();
+      try {
         while (kvstart != kvend) {
         while (kvstart != kvend) {
-          try {
-            reporter.progress();
-            spillLock.wait();
-          } catch (InterruptedException e) {
-            throw (IOException)new IOException(
-                "Buffer interrupted while waiting for the writer"
-                ).initCause(e);
-          }
+          reporter.progress();
+          spillDone.await();
+        }
+        if (sortSpillException != null) {
+          throw (IOException)new IOException("Spill failed"
+              ).initCause(sortSpillException);
+        }
+        if (kvend != kvindex) {
+          kvend = kvindex;
+          bufend = bufmark;
+          sortAndSpill();
         }
         }
+      } catch (InterruptedException e) {
+        throw (IOException)new IOException(
+            "Buffer interrupted while waiting for the writer"
+            ).initCause(e);
+      } finally {
+        spillLock.unlock();
       }
       }
-      if (sortSpillException != null) {
+      assert !spillLock.isHeldByCurrentThread();
+      // shut down spill thread and wait for it to exit. Since the preceding
+      // ensures that it is finished with its work (and sortAndSpill did not
+      // throw), we elect to use an interrupt instead of setting a flag.
+      // Spilling simultaneously from this thread while the spill thread
+      // finishes its work might be both a useful way to extend this and also
+      // sufficient motivation for the latter approach.
+      try {
+        spillThread.interrupt();
+        spillThread.join();
+      } catch (InterruptedException e) {
         throw (IOException)new IOException("Spill failed"
         throw (IOException)new IOException("Spill failed"
-            ).initCause(sortSpillException);
-      }
-      if (kvend != kvindex) {
-        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                 "; bufvoid = " + bufvoid);
-        LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
-                 "; length = " + kvoffsets.length);
-        kvend = kvindex;
-        bufend = bufmark;
-        sortAndSpill();
+            ).initCause(e);
       }
       }
       // release sort buffer before the merge
       // release sort buffer before the merge
       kvbuffer = null;
       kvbuffer = null;
@@ -706,23 +752,47 @@ class MapTask extends Task {
 
 
       @Override
       @Override
       public void run() {
       public void run() {
+        spillLock.lock();
+        spillThreadRunning = true;
         try {
         try {
-          sortAndSpill();
-        } catch (Throwable e) {
-          sortSpillException = e;
-        } finally {
-          synchronized(spillLock) {
-            if (bufend < bufindex && bufindex < bufstart) {
-              bufvoid = kvbuffer.length;
+          while (true) {
+            spillDone.signal();
+            while (kvstart == kvend) {
+              spillReady.await();
+            }
+            try {
+              spillLock.unlock();
+              sortAndSpill();
+            } catch (Throwable e) {
+              sortSpillException = e;
+            } finally {
+              spillLock.lock();
+              if (bufend < bufindex && bufindex < bufstart) {
+                bufvoid = kvbuffer.length;
+              }
+              kvstart = kvend;
+              bufstart = bufend;
             }
             }
-            kvstart = kvend;
-            bufstart = bufend;
-            spillLock.notify();
           }
           }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          spillLock.unlock();
+          spillThreadRunning = false;
         }
         }
       }
       }
     }
     }
 
 
+    private synchronized void startSpill() {
+      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+               "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
+               "; length = " + kvoffsets.length);
+      kvend = kvindex;
+      bufend = bufmark;
+      spillReady.signal();
+    }
+
     private void sortAndSpill() throws IOException {
     private void sortAndSpill() throws IOException {
       //approximate the length of the output file to be the length of the
       //approximate the length of the output file to be the length of the
       //buffer + header lengths for the partitions
       //buffer + header lengths for the partitions