Selaa lähdekoodia

commit d5ea1fcd178fc408b2aab50fc44ad17c69e44c90
Author: Arun C Murthy <acmurthy@apache.org>
Date: Tue Nov 30 21:51:44 2010 -0800

. Reducing locking contention in TaskTracker.MapOutputServlet's LocalDirAllocator. Contributed by Rajesh Balamohan.

+++ b/YAHOO-CHANGES.txt
+ . Reducing locking contention in TaskTracker.MapOutputServlet's
+ LocalDirAllocator. (Rajesh Balamohan via acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077750 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 vuotta sitten
vanhempi
commit
2a48b3c7f7

+ 60 - 10
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -42,6 +42,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
@@ -177,6 +178,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   InterTrackerProtocol jobClient;
   
   private TrackerDistributedCacheManager distributedCacheManager;
+  static int FILE_CACHE_SIZE = 2000;
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
@@ -1233,6 +1235,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
    */
   public TaskTracker(JobConf conf) throws IOException, InterruptedException {
     originalConf = conf;
+    FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
     maxMapSlots = conf.getInt(
                   "mapred.tasktracker.map.tasks.maximum", 2);
     maxReduceSlots = conf.getInt(
@@ -1280,6 +1283,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
+    LOG.info("FILE_CACHE_SIZE for mapOutputServlet set to : " + FILE_CACHE_SIZE);
     mapRetainSize = conf.getLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 
         TaskLogsTruncater.DEFAULT_RETAIN_SIZE);
     reduceRetainSize = conf.getLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE,
@@ -3360,6 +3364,40 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       System.exit(-1);
     }
   }
+  
+  static class LRUCache<K, V> {
+    private int cacheSize;
+    private LinkedHashMap<K, V> map;
+	
+    public LRUCache(int cacheSize) {
+      this.cacheSize = cacheSize;
+      this.map = new LinkedHashMap<K, V>(cacheSize, 0.75f, true) {
+          protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+	    return size() > LRUCache.this.cacheSize;
+	  }
+      };
+    }
+	
+    public synchronized V get(K key) {
+      return map.get(key);
+    }
+	
+    public synchronized void put(K key, V value) {
+      map.put(key, value);
+    }
+	
+    public synchronized int size() {
+      return map.size();
+    }
+	
+    public Iterator<Entry<K, V>> getIterator() {
+      return new LinkedList<Entry<K, V>>(map.entrySet()).iterator();
+    }
+   
+    public synchronized void clear() {
+      map.clear();
+    }
+  }
 
   /**
    * This class is used in TaskTracker's Jetty to serve the map outputs
@@ -3368,6 +3406,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   public static class MapOutputServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
     private static final int MAX_BYTES_TO_READ = 64 * 1024;
+    
+    private static LRUCache<String, Path> fileCache = new LRUCache<String, Path>(FILE_CACHE_SIZE);
+    private static LRUCache<String, Path> fileIndexCache = new LRUCache<String, Path>(FILE_CACHE_SIZE);
+    
     @Override
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
@@ -3422,16 +3464,22 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
       }
       // Index file
-      Path indexFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out.index", conf);
+      String intermediateOutputDir = TaskTracker.getIntermediateOutputDir(userName, jobId, mapId);
+      String indexKey = intermediateOutputDir + "/file.out.index";
+      Path indexFileName = fileIndexCache.get(indexKey);
+      if (indexFileName == null) {
+        indexFileName = lDirAlloc.getLocalPathToRead(indexKey, conf);
+        fileIndexCache.put(indexKey, indexFileName);
+      }
 
       // Map-output file
-      Path mapOutputFileName =
-          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-              userName, jobId, mapId)
-              + "/file.out", conf);
+      String fileKey = intermediateOutputDir + "/file.out";
+      Path mapOutputFileName = fileCache.get(fileKey);
+      if (mapOutputFileName == null) {
+        mapOutputFileName = lDirAlloc.getLocalPathToRead(fileKey, conf);
+        fileCache.put(fileKey, mapOutputFileName);
+      }
+       
 
         /**
          * Read the index file to get the information about where
@@ -3489,10 +3537,12 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           len =
             mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
-
-        LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
+        
+        if (LOG.isDebugEnabled()) {
+          LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + info.partLength + "/" + 
                  info.rawLength);
+        }
       } catch (IOException ie) {
         Log log = (Log) context.getAttribute("log");
         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 

+ 103 - 0
src/test/org/apache/hadoop/mapred/TestLRUCache.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+
+public class TestLRUCache extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestLRUCache.class);
+  
+  public void testPut() {
+    TaskTracker.LRUCache<String, Path> cache = new TaskTracker.LRUCache<String, Path>(200);
+  
+    for(int i=0;i<200;i++) {
+       cache.put(i+"", new Path("/foo"+i));
+    }
+    
+    Iterator<Map.Entry<String, Path>> iterator = cache.getIterator();
+    int i=0;
+    while(iterator.hasNext()) {
+      Map.Entry<String, Path> entry = iterator.next();
+      String key = entry.getKey();
+      Path val = entry.getValue();
+      assertEquals(i+"", key);
+      i++;
+    }
+    LOG.info("Completed testPut");
+  }
+  
+  
+  public void testGet() {
+    TaskTracker.LRUCache<String, Path> cache = new TaskTracker.LRUCache<String, Path>(200);
+  
+    for(int i=0;i<200;i++) {
+      cache.put(i+"", new Path("/foo"+i));
+    }
+    
+    for(int i=0;i<200;i++) {
+      Path path = cache.get(i+"");
+      assertEquals(path.toString(), (new Path("/foo"+i)).toString());
+    }
+    LOG.info("Completed testGet");
+  }
+  
+  
+  /**
+   * Test if cache can be cleared properly
+   */
+  public void testClear() {
+    TaskTracker.LRUCache<String, Path> cache = new TaskTracker.LRUCache<String, Path>(200);
+  
+    for(int i=0;i<200;i++) {
+      cache.put(i+"", new Path("/foo"+i));
+    }
+    
+    cache.clear();
+    assertTrue(cache.size() == 0);
+    LOG.info("Completed testClear");
+  }
+  
+  /**
+   * Test for cache overflow condition
+   */
+  public void testOverFlow() {
+    TaskTracker.LRUCache<String, Path> cache = new TaskTracker.LRUCache<String, Path>(200);
+  
+    int SIZE = 5000;
+  
+    for(int i=0;i<SIZE;i++) {
+      cache.put(i+"", new Path("/foo"+i));
+    }
+    
+    //Check if the items are removed properly when the cache size is exceeded
+    for(int i=SIZE-1;i>=SIZE-200;i--) {
+      Path path = cache.get(i+"");
+      assertEquals(path.toString(), (new Path("/foo"+i)).toString());
+    }
+    
+    LOG.info("Completed testOverFlow");
+  }
+}