Prechádzať zdrojové kódy

svn merge -c 1489012 FIXES: MAPREDUCE-5268. Improve history server startup performance. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1489016 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 rokov pred
rodič
commit
af7a0df5a4

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -10,6 +10,9 @@ Release 0.23.9 - UNRELEASED
 
   OPTIMIZATIONS
 
+    MAPREDUCE-5268. Improve history server startup performance (Karthik
+    Kambatla via jlowe)
+
   BUG FIXES
 
 Release 0.23.8 - UNRELEASED

+ 59 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java

@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -131,19 +133,73 @@ public class HistoryFileManager extends AbstractService {
     }
   }
 
-  static class JobListCache {
+  /**
+   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
+   * side for O(1) size() implementation for use in JobListCache.
+   *
+   * Note: The size is not updated atomically with changes additions/removals.
+   * This race can lead to size() returning an incorrect size at times.
+   */
+  static class JobIdHistoryFileInfoMap {
     private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
+    private AtomicInteger mapSize;
+
+    JobIdHistoryFileInfoMap() {
+      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+      mapSize = new AtomicInteger();
+    }
+
+    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
+      HistoryFileInfo ret = cache.putIfAbsent(key, value);
+      if (ret == null) {
+        mapSize.incrementAndGet();
+      }
+      return ret;
+    }
+
+    public HistoryFileInfo remove(JobId key) {
+      HistoryFileInfo ret = cache.remove(key);
+      if (ret != null) {
+        mapSize.decrementAndGet();
+      }
+      return ret;
+    }
+
+    /**
+     * Returns the recorded size of the internal map. Note that this could be out
+     * of sync with the actual size of the map
+     * @return "recorded" size
+     */
+    public int size() {
+      return mapSize.get();
+    }
+
+    public HistoryFileInfo get(JobId key) {
+      return cache.get(key);
+    }
+
+    public NavigableSet<JobId> navigableKeySet() {
+      return cache.navigableKeySet();
+    }
+
+    public Collection<HistoryFileInfo> values() {
+      return cache.values();
+    }
+  }
+
+  static class JobListCache {
+    private JobIdHistoryFileInfoMap cache;
     private int maxSize;
     private long maxAge;
 
     public JobListCache(int maxSize, long maxAge) {
       this.maxSize = maxSize;
       this.maxAge = maxAge;
-      this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
+      this.cache = new JobIdHistoryFileInfoMap();
     }
 
     public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
-      JobId jobId = fileInfo.getJobIndexInfo().getJobId();
+      JobId jobId = fileInfo.getJobId();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding " + jobId + " to job list cache with "
             + fileInfo.getJobIndexInfo());

+ 78 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobIdHistoryFileInfoMap.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.util.Collection;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobIdHistoryFileInfoMap;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestJobIdHistoryFileInfoMap {
+
+  private boolean checkSize(JobIdHistoryFileInfoMap map, int size)
+      throws InterruptedException {
+    for (int i = 0; i < 100; i++) {
+      if (map.size() != size)
+        Thread.sleep(20);
+      else
+        return true;
+    }
+    return false;
+  }
+
+  /**
+   * Trivial test case that verifies basic functionality of {@link
+   * JobIdHistoryFileInfoMap}
+   */
+  @Test(timeout = 2000)
+  public void testWithSingleElement() throws InterruptedException {
+    JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap();
+
+    JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
+    HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
+    Mockito.when(fileInfo1.getJobId()).thenReturn(jobId);
+
+    // add it twice
+    assertEquals("Incorrect return on putIfAbsent()",
+        null, mapWithSize.putIfAbsent(jobId, fileInfo1));
+    assertEquals("Incorrect return on putIfAbsent()",
+        fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1));
+
+    // check get()
+    assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId));
+    assertTrue("Incorrect size()", checkSize(mapWithSize, 1));
+
+    // check navigableKeySet()
+    NavigableSet<JobId> set = mapWithSize.navigableKeySet();
+    assertEquals("Incorrect navigableKeySet()", 1, set.size());
+    assertTrue("Incorrect navigableKeySet()", set.contains(jobId));
+
+    // check values()
+    Collection<HistoryFileInfo> values = mapWithSize.values();
+    assertEquals("Incorrect values()", 1, values.size());
+    assertTrue("Incorrect values()", values.contains(fileInfo1));
+  }
+}

+ 82 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobListCache.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.lang.InterruptedException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+
+public class TestJobListCache {
+
+  @Test (timeout = 1000)
+  public void testAddExisting() {
+    JobListCache cache = new JobListCache(2, 1000);
+
+    JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
+    HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class);
+    Mockito.when(fileInfo.getJobId()).thenReturn(jobId);
+
+    cache.addIfAbsent(fileInfo);
+    cache.addIfAbsent(fileInfo);
+    assertEquals("Incorrect number of cache entries", 1,
+        cache.values().size());
+  }
+
+  @Test (timeout = 1000)
+  public void testEviction() throws InterruptedException {
+    int maxSize = 2;
+    JobListCache cache = new JobListCache(maxSize, 1000);
+
+    JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1);
+    HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
+    Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1);
+
+    JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2);
+    HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class);
+    Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2);
+
+    JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3);
+    HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class);
+    Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3);
+
+    cache.addIfAbsent(fileInfo1);
+    cache.addIfAbsent(fileInfo2);
+    cache.addIfAbsent(fileInfo3);
+
+    Collection <HistoryFileInfo> values;
+    for (int i = 0; i < 9; i++) {
+      values = cache.values();
+      if (values.size() > maxSize) {
+        Thread.sleep(100);
+      } else {
+        assertFalse("fileInfo1 should have been evicted",
+          values.contains(fileInfo1));
+        return;
+      }
+    }
+    fail("JobListCache didn't delete the extra entry");
+  }
+}