1
0
Просмотр исходного кода

Merge -c 1299098 from branch-1 to branch-1.0 to fix MAPREDUCE-3773. Add queue metrics with buckets for job run times. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1299099 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 лет назад
Родитель
Сommit
9b519e4d47

+ 3 - 0
CHANGES.txt

@@ -9,6 +9,9 @@ Release 1.0.2 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
+    via acmurthy)
+
   BUG FIXES
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

+ 43 - 0
src/mapred/org/apache/hadoop/mapred/QueueMetrics.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.metrics2.MetricsBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
@@ -36,9 +38,14 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
  */
 @SuppressWarnings("deprecation")
 class QueueMetrics implements MetricsSource {
+
   private static final Log LOG =
     LogFactory.getLog(QueueMetrics.class);
 
+  public static final String BUCKET_PROPERTY = 
+    "mapred.queue.metrics.runtime.buckets";
+  private static final String DEFAULT_BUCKETS = "60,300,1440";
+
   final MetricsRegistry registry = new MetricsRegistry("Queue");
   final MetricMutableCounterInt mapsLaunched =
       registry.newCounter("maps_launched", "", 0);
@@ -76,6 +83,8 @@ class QueueMetrics implements MetricsSource {
       registry.newCounter("maps_killed", "", 0);
   final MetricMutableCounterInt redsKilled =
       registry.newCounter("reduces_killed", "", 0);
+  final MetricMutableGaugeInt[] runningTime;
+  TimeBucketMetrics<JobID> runBuckets;
 
   final String sessionId;
   private String queueName;
@@ -85,13 +94,45 @@ class QueueMetrics implements MetricsSource {
     sessionId = conf.get("session.id", "");
     registry.setContext("mapred").tag("sessionId", "", sessionId);
     registry.tag("Queue", "Metrics by queue", queueName);
+    runningTime = buildBuckets(conf);
   }
 
   public String getQueueName() {
     return this.queueName;
   }
 
+  private static ArrayList<Integer> parseInts(String value) {
+    ArrayList<Integer> result = new ArrayList<Integer>();
+    for(String word: value.split(",")) {
+      result.add(Integer.parseInt(word.trim()));
+    }
+    return result;
+  }
+
+  private MetricMutableGaugeInt[] buildBuckets(Configuration conf) {
+    ArrayList<Integer> buckets = 
+      parseInts(conf.get(BUCKET_PROPERTY, DEFAULT_BUCKETS));
+    MetricMutableGaugeInt[] result = 
+      new MetricMutableGaugeInt[buckets.size() + 1];
+    result[0] = registry.newGauge("running_0", "", 0);
+    long[] cuts = new long[buckets.size()];
+    for(int i=0; i < buckets.size(); ++i) {
+      result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
+      cuts[i] = buckets.get(i) * 1000 * 60; // covert from min to ms
+    }
+    this.runBuckets = new TimeBucketMetrics<JobID>(cuts);
+    return result;
+  }
+
+  private void updateRunningTime() {
+    int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
+    for(int i=0; i < counts.length; ++i) {
+      runningTime[i].set(counts[i]); 
+    }
+  }
+
   public void getMetrics(MetricsBuilder builder, boolean all) {
+    updateRunningTime();
     registry.snapshot(builder.addRecord(registry.name()), all);
   }
 
@@ -181,10 +222,12 @@ class QueueMetrics implements MetricsSource {
 
   public void addRunningJob(JobConf conf, JobID id) {
     jobsRunning.incr();
+    runBuckets.add(id, System.currentTimeMillis());
   }
 
   public void decRunningJob(JobConf conf, JobID id) {
     jobsRunning.decr();
+    runBuckets.remove(id);
   }
 
   public void killedMap(TaskAttemptID taskAttemptID) {

+ 81 - 0
src/mapred/org/apache/hadoop/mapred/TimeBucketMetrics.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+
+/**
+ * Create a set of buckets that hold key-time pairs. When the values of the 
+ * buckets is queried, the number of objects with time differences in the
+ * different buckets is returned.
+ */
+class TimeBucketMetrics<OBJ> {
+
+  private final HashMap<OBJ, Long> map = new HashMap<OBJ, Long>();
+  private final int[] counts;
+  private final long[] cuts;
+
+  /**
+   * Create a set of buckets based on a set of time points. The number of 
+   * buckets is one more than the number of points.
+   */
+  TimeBucketMetrics(long[] cuts) {
+    this.cuts = cuts;
+    counts = new int[cuts.length + 1];
+  }
+
+  /**
+   * Add an object to be counted
+   */
+  synchronized void add(OBJ key, long time) {
+    map.put(key, time);
+  }
+
+  /**
+   * Remove an object to be counted
+   */
+  synchronized void remove(OBJ key) {
+    map.remove(key);
+  }
+
+  /**
+   * Find the bucket based on the cut points.
+   */
+  private int findBucket(long val) {
+    for(int i=0; i < cuts.length; ++i) {
+      if (val < cuts[i]) {
+	return i;
+      }
+    }
+    return cuts.length;
+  }
+
+  /**
+   * Get the counts of how many keys are in each bucket. The same array is
+   * returned by each call to this method.
+   */
+  synchronized int[] getBucketCounts(long now) {
+    for(int i=0; i < counts.length; ++i) {
+      counts[i] = 0;
+    }
+    for(Long time: map.values()) {
+      counts[findBucket(now - time)] += 1;
+    }
+    return counts;
+  }
+}