Explorar el Código

HDFS-6982. nntop: top­-like tool for name node users. (Maysam Yabandeh via wang)

(cherry picked from commit dcb8e24427b02e2f3ff9a12d2eb1eb878e3443bb)
Andrew Wang hace 10 años
padre
commit
541172f2ee
Se han modificado 14 ficheros con 1200 adiciones y 5 borrados
  1. 16 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
  2. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  4. 20 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  5. 74 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java
  6. 61 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java
  7. 265 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java
  8. 189 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java
  9. 265 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java
  10. 28 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  11. 26 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java
  12. 61 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
  13. 84 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java
  14. 93 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java

+ 16 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

@@ -775,7 +775,22 @@ public class UserGroupInformation {
     }
     return loginUser;
   }
-  
+
+  /**
+   * remove the login method that is followed by a space from the username
+   * e.g. "jack (auth:SIMPLE)" -> "jack"
+   *
+   * @param userName
+   * @return userName without login method
+   */
+  public static String trimLoginMethod(String userName) {
+    int spaceIndex = userName.indexOf(' ');
+    if (spaceIndex >= 0) {
+      userName = userName.substring(0, spaceIndex);
+    }
+    return userName;
+  }
+
   /**
    * Log in a user using the given subject
    * @parma subject the subject to use when logging in a user, or null to 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -12,6 +12,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-6663. Admin command to track file and locations from block id.
     (Chen He via kihwal)
 
+    HDFS-6982. nntop: top­-like tool for name node users.
+    (Maysam Yabandeh via wang)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -746,4 +746,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY =
       "ignore.secure.ports.for.testing";
   public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false;
+
+  // nntop Configurations
+  public static final String NNTOP_ENABLED_KEY =
+      "dfs.namenode.top.enabled";
+  public static final boolean NNTOP_ENABLED_DEFAULT = true;
+  public static final String NNTOP_BUCKETS_PER_WINDOW_KEY =
+      "dfs.namenode.top.window.num.buckets";
+  public static final int NNTOP_BUCKETS_PER_WINDOW_DEFAULT = 10;
+  public static final String NNTOP_NUM_USERS_KEY =
+      "dfs.namenode.top.num.users";
+  public static final int NNTOP_NUM_USERS_DEFAULT = 10;
+  // comma separated list of nntop reporting periods in minutes
+  public static final String NNTOP_WINDOWS_MINUTES_KEY =
+      "dfs.namenode.top.windows.minutes";
+  public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
 }

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -243,6 +243,9 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -872,7 +875,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw re;
     }
   }
-  
+
+  @VisibleForTesting
+  public List<AuditLogger> getAuditLoggers() {
+    return auditLoggers;
+  }
+
   @VisibleForTesting
   public RetryCache getRetryCache() {
     return retryCache;
@@ -961,6 +969,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (auditLoggers.isEmpty()) {
       auditLoggers.add(new DefaultAuditLogger());
     }
+
+    // Add audit logger to calculate top users
+    if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
+        DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) {
+      String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
+      TopConf nntopConf = new TopConf(conf);
+      TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId,
+          nntopConf.nntopReportingPeriodsMs);
+      auditLoggers.add(new TopAuditLogger());
+    }
+
     return Collections.unmodifiableList(auditLoggers);
   }
 

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top;
+
+import java.net.InetAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.server.namenode.AuditLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+
+/**
+ * An {@link AuditLogger} that sends logged data directly to the metrics
+ * systems. It is used when the top service is used directly by the name node
+ */
+@InterfaceAudience.Private
+public class TopAuditLogger implements AuditLogger {
+  public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  @Override
+  public void logAuditEvent(boolean succeeded, String userName,
+      InetAddress addr, String cmd, String src, String dst, FileStatus status) {
+
+    TopMetrics instance = TopMetrics.getInstance();
+    if (instance != null) {
+      instance.report(succeeded, userName, addr, cmd, src, dst, status);
+    } else {
+      LOG.error("TopMetrics is not initialized yet!");
+    }
+
+    if (LOG.isDebugEnabled()) {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("allowed=").append(succeeded).append("\t");
+      sb.append("ugi=").append(userName).append("\t");
+      sb.append("ip=").append(addr).append("\t");
+      sb.append("cmd=").append(cmd).append("\t");
+      sb.append("src=").append(src).append("\t");
+      sb.append("dst=").append(dst).append("\t");
+      if (null == status) {
+        sb.append("perm=null");
+      } else {
+        sb.append("perm=");
+        sb.append(status.getOwner()).append(":");
+        sb.append(status.getGroup()).append(":");
+        sb.append(status.getPermission());
+      }
+      LOG.debug("------------------- logged event for top service: " + sb);
+    }
+  }
+
+}

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is a common place for NNTop configuration.
+ */
+@InterfaceAudience.Private
+public final class TopConf {
+
+  public static final String TOP_METRICS_REGISTRATION_NAME = "topusers";
+  public static final String TOP_METRICS_RECORD_NAME = "topparam";
+  /**
+   * A meta command representing the total number of commands
+   */
+  public static final String CMD_TOTAL = "total";
+  /**
+   * A meta user representing all users
+   */
+  public static String ALL_USERS = "ALL";
+
+  /**
+   * nntop reporting periods in milliseconds
+   */
+  public final long[] nntopReportingPeriodsMs;
+
+  public TopConf(Configuration conf) {
+    String[] periodsStr = conf.getTrimmedStrings(
+        DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
+        DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
+    nntopReportingPeriodsMs = new long[periodsStr.length];
+    for (int i = 0; i < periodsStr.length; i++) {
+      nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) *
+          60L * 1000L; //min to ms
+    }
+    for (long aPeriodMs: nntopReportingPeriodsMs) {
+      Preconditions.checkArgument(aPeriodMs >= 60L * 1000L,
+          "minimum reporting period is 1 min!");
+    }
+  }
+}

+ 265 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.metrics;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+
+/***
+ * The interface to the top metrics
+ * <p/>
+ * The producers use the {@link #report} method to report events and the
+ * consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the
+ * current top metrics. The default consumer is JMX but it could be any other
+ * user interface.
+ * <p/>
+ * Thread-safe: relies on thread-safety of RollingWindowManager
+ */
+@InterfaceAudience.Private
+public class TopMetrics implements MetricsSource {
+  public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
+
+  enum Singleton {
+    INSTANCE;
+
+    volatile TopMetrics impl = null;
+
+    synchronized TopMetrics init(Configuration conf, String processName,
+        String sessionId, long[] reportingPeriods) {
+      if (impl == null) {
+        impl =
+            create(conf, processName, sessionId, reportingPeriods,
+                DefaultMetricsSystem.instance());
+      }
+      logConf(conf);
+      return impl;
+    }
+  }
+
+  private static void logConf(Configuration conf) {
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_NUM_USERS_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_NUM_USERS_KEY));
+    LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY +
+        " = " +  conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY));
+  }
+
+  /**
+   * Return only the shortest periods for default
+   * TODO: make it configurable
+   */
+  final boolean smallestOnlyDefault = true;
+
+  /**
+   * The smallest of reporting periods
+   */
+  long smallestPeriod = Long.MAX_VALUE;
+
+  /**
+   * processName and sessionId might later be leveraged later when we aggregate
+   * report from multiple federated name nodes
+   */
+  final String processName, sessionId;
+
+  /**
+   * A map from reporting periods to WindowManager. Thread-safety is provided by
+   * the fact that the mapping is not changed after construction.
+   */
+  final Map<Long, RollingWindowManager> rollingWindowManagers =
+      new HashMap<Long, RollingWindowManager>();
+
+  TopMetrics(Configuration conf, String processName, String sessionId,
+      long[] reportingPeriods) {
+    this.processName = processName;
+    this.sessionId = sessionId;
+    for (int i = 0; i < reportingPeriods.length; i++) {
+      smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]);
+      rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
+          conf, reportingPeriods[i]));
+    }
+  }
+
+  public static TopMetrics create(Configuration conf, String processName,
+      String sessionId, long[] reportingPeriods, MetricsSystem ms) {
+    return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME,
+        "top metrics of the namenode in a last period of time", new TopMetrics(
+            conf, processName, sessionId, reportingPeriods));
+  }
+
+  public static TopMetrics initSingleton(Configuration conf,
+      String processName, String sessionId, long[] reportingPeriods) {
+    return Singleton.INSTANCE.init(conf, processName, sessionId,
+        reportingPeriods);
+  }
+
+  public static TopMetrics getInstance() {
+    TopMetrics topMetrics = Singleton.INSTANCE.impl;
+    Preconditions.checkArgument(topMetrics != null,
+          "The TopMetric singleton instance is not initialized."
+              + " Have you called initSingleton first?");
+    return topMetrics;
+  }
+
+  /**
+   * In testing, the previous initialization should be reset if the entire
+   * metric system is reinitialized
+   */
+  @VisibleForTesting
+  public static void reset() {
+    Singleton.INSTANCE.impl = null;
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    long realTime = Time.monotonicNow();
+    getMetrics(smallestOnlyDefault, realTime, collector, all);
+  }
+
+  public void getMetrics(boolean smallestOnly, long currTime,
+      MetricsCollector collector, boolean all) {
+    for (Entry<Long, RollingWindowManager> entry : rollingWindowManagers
+        .entrySet()) {
+      if (!smallestOnly || smallestPeriod == entry.getKey()) {
+        getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all);
+      }
+    }
+  }
+
+  /**
+   * Get metrics for a particular recording period and its corresponding
+   * {@link RollingWindowManager}
+   * <p/>
+   *
+   * @param collector the metric collector
+   * @param period the reporting period
+   * @param rollingWindowManager the window manager corresponding to the
+   *          reporting period
+   * @param all currently ignored
+   */
+  void getMetrics(long currTime, MetricsCollector collector, Long period,
+      RollingWindowManager rollingWindowManager, boolean all) {
+    MetricsRecordBuilder rb =
+        collector.addRecord(createTopMetricsRecordName(period))
+            .setContext("namenode").tag(ProcessName, processName)
+            .tag(SessionId, sessionId);
+
+    MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime);
+    LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size());
+    for (Map.Entry<String, Number> entry : snapshotMetrics.entrySet()) {
+      String key = entry.getKey();
+      Number value = entry.getValue();
+      LOG.debug("checking an entry: key: {} value: {}", key, value);
+      long min = period / 1000L / 60L; //ms -> min
+      String desc = "top user of name node in the past " + min + " minutes";
+
+      if (value instanceof Integer) {
+        rb.addGauge(info(key, desc), (Integer) value);
+      } else if (value instanceof Long) {
+        rb.addGauge(info(key, desc), (Long) value);
+      } else if (value instanceof Float) {
+        rb.addGauge(info(key, desc), (Float) value);
+      } else if (value instanceof Double) {
+        rb.addGauge(info(key, desc), (Double) value);
+      } else {
+        LOG.warn("Unsupported metric type: " + value.getClass());
+      }
+    }
+    LOG.debug("END iterating over metrics, result size is: {}",
+        snapshotMetrics.size());
+  }
+
+  /**
+   * Pick the same information that DefaultAuditLogger does before writing to a
+   * log file. This is to be consistent when {@link TopMetrics} is charged with
+   * data read back from log files instead of being invoked directly by the
+   * FsNamesystem
+   *
+   * @param succeeded
+   * @param userName
+   * @param addr
+   * @param cmd
+   * @param src
+   * @param dst
+   * @param status
+   */
+  public void report(boolean succeeded, String userName, InetAddress addr,
+      String cmd, String src, String dst, FileStatus status) {
+    //currently we nntop makes use of only the username and the command
+    report(userName, cmd);
+  }
+
+  public void report(String userName, String cmd) {
+    long currTime = Time.monotonicNow();
+    report(currTime, userName, cmd);
+  }
+
+  public void report(long currTime, String userName, String cmd) {
+    LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
+    userName = UserGroupInformation.trimLoginMethod(userName);
+    try {
+      for (RollingWindowManager rollingWindowManager : rollingWindowManagers
+          .values()) {
+        rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
+        rollingWindowManager.recordMetric(currTime,
+            TopConf.CMD_TOTAL, userName, 1);
+      }
+    } catch (Throwable t) {
+      LOG.error("An error occurred while reflecting the event in top service, "
+          + "event: (time,cmd,userName)=(" + currTime + "," + cmd + ","
+          + userName);
+    }
+  }
+
+  /***
+   *
+   * @param period the reporting period length in ms
+   * @return
+   */
+  public static String createTopMetricsRecordName(Long period) {
+    return TopConf.TOP_METRICS_RECORD_NAME + "-" + period;
+  }
+
+}

+ 189 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java

@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for exposing a rolling window view on the event that occur over time.
+ * Events are reported based on occurrence time. The total number of events in
+ * the last period covered by the rolling window can be retrieved by the
+ * {@link #getSum(long)} method.
+ * <p/>
+ *
+ * Assumptions:
+ * <p/>
+ *
+ * (1) Concurrent invocation of {@link #incAt} method are possible
+ * <p/>
+ *
+ * (2) The time parameter of two consecutive invocation of {@link #incAt} could
+ * be in any given order
+ * <p/>
+ *
+ * (3) The buffering delays are not more than the window length, i.e., after two
+ * consecutive invocation {@link #incAt(long time1, long)} and
+ * {@link #incAt(long time2, long)}, time1 < time2 || time1 - time2 < windowLenMs.
+ * This assumption helps avoiding unnecessary synchronizations.
+ * <p/>
+ *
+ * Thread-safety is built in the {@link RollingWindow.Bucket}
+ */
+@InterfaceAudience.Private
+public class RollingWindow {
+  private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class);
+
+  /**
+   * Each window is composed of buckets, which offer a trade-off between
+   * accuracy and space complexity: the lower the number of buckets, the less
+   * memory is required by the rolling window but more inaccuracy is possible in
+   * reading window total values.
+   */
+  Bucket[] buckets;
+  final int windowLenMs;
+  final int bucketSize;
+
+  /**
+   * @param windowLenMs The period that is covered by the window. This period must
+   *          be more than the buffering delays.
+   * @param numBuckets number of buckets in the window
+   */
+  RollingWindow(int windowLenMs, int numBuckets) {
+    buckets = new Bucket[numBuckets];
+    for (int i = 0; i < numBuckets; i++) {
+      buckets[i] = new Bucket();
+    }
+    this.windowLenMs = windowLenMs;
+    this.bucketSize = windowLenMs / numBuckets;
+    if (this.bucketSize % bucketSize != 0) {
+      throw new IllegalArgumentException(
+          "The bucket size in the rolling window is not integer: windowLenMs= "
+              + windowLenMs + " numBuckets= " + numBuckets);
+    }
+  }
+
+  /**
+   * When an event occurs at the specified time, this method reflects that in
+   * the rolling window.
+   * <p/>
+   *
+   * @param time the time at which the event occurred
+   * @param delta the delta that will be added to the window
+   */
+  public void incAt(long time, long delta) {
+    int bi = computeBucketIndex(time);
+    Bucket bucket = buckets[bi];
+    // If the last time the bucket was updated is out of the scope of the
+    // rolling window, reset the bucket.
+    if (bucket.isStaleNow(time)) {
+      bucket.safeReset(time);
+    }
+    bucket.inc(delta);
+  }
+
+  private int computeBucketIndex(long time) {
+    int positionOnWindow = (int) (time % windowLenMs);
+    int bucketIndex = positionOnWindow * buckets.length / windowLenMs;
+    return bucketIndex;
+  }
+
+  /**
+   * Thread-safety is provided by synchronization when resetting the update time
+   * as well as atomic fields.
+   */
+  private class Bucket {
+    AtomicLong value = new AtomicLong(0);
+    AtomicLong updateTime = new AtomicLong(0);
+
+    /**
+     * Check whether the last time that the bucket was updated is no longer
+     * covered by rolling window.
+     *
+     * @param time the current time
+     * @return true if the bucket state is stale
+     */
+    boolean isStaleNow(long time) {
+      long utime = updateTime.get();
+      return time - utime >= windowLenMs;
+    }
+
+    /**
+     * Safely reset the bucket state considering concurrent updates (inc) and
+     * resets.
+     *
+     * @param time the current time
+     */
+    void safeReset(long time) {
+      // At any point in time, only one thread is allowed to reset the
+      // bucket
+      synchronized (this) {
+        if (isStaleNow(time)) {
+          // reset the value before setting the time, it allows other
+          // threads to safely assume that the value is updated if the
+          // time is not stale
+          value.set(0);
+          updateTime.set(time);
+        }
+        // else a concurrent thread has already reset it: do nothing
+      }
+    }
+
+    /**
+     * Increment the bucket. It assumes that staleness check is already
+     * performed. We do not need to update the {@link #updateTime} because as
+     * long as the {@link #updateTime} belongs to the current view of the
+     * rolling window, the algorithm works fine.
+     */
+    void inc(long delta) {
+      value.addAndGet(delta);
+    }
+  }
+
+  /**
+   * Get value represented by this window at the specified time
+   * <p/>
+   *
+   * If time lags behind the latest update time, the new updates are still
+   * included in the sum
+   *
+   * @param time
+   * @return number of events occurred in the past period
+   */
+  public long getSum(long time) {
+    long sum = 0;
+    for (Bucket bucket : buckets) {
+      boolean stale = bucket.isStaleNow(time);
+      if (!stale) {
+        sum += bucket.value.get();
+      }
+      if (LOG.isDebugEnabled()) {
+        long bucketTime = bucket.updateTime.get();
+        String timeStr = new Date(bucketTime).toString();
+        LOG.debug("Sum: + " + sum + " Bucket: updateTime: " + timeStr + " ("
+            + bucketTime + ") isStale " + stale + " at " + time);
+      }
+    }
+    return sum;
+  }
+
+}

+ 265 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A class to manage the set of {@link RollingWindow}s. This class is the
+ * interface of metrics system to the {@link RollingWindow}s to retrieve the
+ * current top metrics.
+ * <p/>
+ * Thread-safety is provided by each {@link RollingWindow} being thread-safe as
+ * well as {@link ConcurrentHashMap} for the collection of them.
+ */
+@InterfaceAudience.Private
+public class RollingWindowManager {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      RollingWindowManager.class);
+
+  private int windowLenMs;
+  private int bucketsPerWindow; // e.g., 10 buckets per minute
+  private int topUsersCnt; // e.g., report top 10 metrics
+
+  /**
+   * Create a metric name composed of the command and user
+   *
+   * @param command the command executed
+   * @param user    the user
+   * @return a composed metric name
+   */
+  @VisibleForTesting
+  public static String createMetricName(String command, String user) {
+    return command + "." + user;
+  }
+
+  static private class RollingWindowMap extends
+      ConcurrentHashMap<String, RollingWindow> {
+    private static final long serialVersionUID = -6785807073237052051L;
+  }
+
+  /**
+   * A mapping from each reported metric to its {@link RollingWindowMap} that
+   * maintains the set of {@link RollingWindow}s for the users that have
+   * operated on that metric.
+   */
+  public ConcurrentHashMap<String, RollingWindowMap> metricMap =
+      new ConcurrentHashMap<String, RollingWindowMap>();
+
+  public RollingWindowManager(Configuration conf, long reportingPeriodMs) {
+    windowLenMs = (int) reportingPeriodMs;
+    bucketsPerWindow =
+        conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
+            DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
+    Preconditions.checkArgument(bucketsPerWindow > 0,
+        "a window should have at least one bucket");
+    Preconditions.checkArgument(bucketsPerWindow <= windowLenMs,
+        "the minimum size of a bucket is 1 ms");
+    //same-size buckets
+    Preconditions.checkArgument(windowLenMs % bucketsPerWindow == 0,
+        "window size must be a multiplication of number of buckets");
+    topUsersCnt =
+        conf.getInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY,
+            DFSConfigKeys.NNTOP_NUM_USERS_DEFAULT);
+    Preconditions.checkArgument(topUsersCnt > 0,
+        "the number of requested top users must be at least 1");
+  }
+
+  /**
+   * Called when the metric command is changed by "delta" units at time "time"
+   * via user "user"
+   *
+   * @param time the time of the event
+   * @param command the metric that is updated, e.g., the operation name
+   * @param user the user that updated the metric
+   * @param delta the amount of change in the metric, e.g., +1
+   */
+  public void recordMetric(long time, String command, String user, long delta) {
+    RollingWindow window = getRollingWindow(command, user);
+    window.incAt(time, delta);
+  }
+
+  /**
+   * Take a snapshot of current top users in the past period.
+   *
+   * @param time the current time
+   * @return a map between the top metrics and their values. The user is encoded
+   * in the metric name. Refer to {@link RollingWindowManager#createMetricName} for
+   * the actual format.
+   */
+  public MetricValueMap snapshot(long time) {
+    MetricValueMap map = new MetricValueMap();
+    Set<String> metricNames = metricMap.keySet();
+    LOG.debug("iterating in reported metrics, size={} values={}",
+        metricNames.size(), metricNames);
+    for (Map.Entry<String,RollingWindowMap> rwEntry: metricMap.entrySet()) {
+      String metricName = rwEntry.getKey();
+      RollingWindowMap rollingWindows = rwEntry.getValue();
+      TopN topN = new TopN(topUsersCnt);
+      Iterator<Map.Entry<String, RollingWindow>> iterator =
+          rollingWindows.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, RollingWindow> entry = iterator.next();
+        String userName = entry.getKey();
+        RollingWindow aWindow = entry.getValue();
+        long windowSum = aWindow.getSum(time);
+        // do the gc here
+        if (windowSum == 0) {
+          LOG.debug("gc window of metric: {} userName: {}",
+              metricName, userName);
+          iterator.remove();
+          continue;
+        }
+        LOG.debug("offer window of metric: {} userName: {} sum: {}",
+            metricName, userName, windowSum);
+        topN.offer(new NameValuePair(userName, windowSum));
+      }
+      int n = topN.size();
+      LOG.info("topN size for command " + metricName + " is: " + n);
+      if (n == 0) {
+        continue;
+      }
+      String allMetricName =
+          createMetricName(metricName, TopConf.ALL_USERS);
+      map.put(allMetricName, Long.valueOf(topN.total));
+      for (int i = 0; i < n; i++) {
+        NameValuePair userEntry = topN.poll();
+        String userMetricName =
+            createMetricName(metricName, userEntry.name);
+        map.put(userMetricName, Long.valueOf(userEntry.value));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get the rolling window specified by metric and user.
+   *
+   * @param metric the updated metric
+   * @param user the user that updated the metric
+   * @return the rolling window
+   */
+  private RollingWindow getRollingWindow(String metric, String user) {
+    RollingWindowMap rwMap = metricMap.get(metric);
+    if (rwMap == null) {
+      rwMap = new RollingWindowMap();
+      RollingWindowMap prevRwMap = metricMap.putIfAbsent(metric, rwMap);
+      if (prevRwMap != null) {
+        rwMap = prevRwMap;
+      }
+    }
+    RollingWindow window = rwMap.get(user);
+    if (window != null) {
+      return window;
+    }
+    window = new RollingWindow(windowLenMs, bucketsPerWindow);
+    RollingWindow prevWindow = rwMap.putIfAbsent(user, window);
+    if (prevWindow != null) {
+      window = prevWindow;
+    }
+    return window;
+  }
+
+  /**
+   * A pair of a name and its corresponding value
+   */
+  static private class NameValuePair implements Comparable<NameValuePair> {
+    String name;
+    long value;
+
+    public NameValuePair(String metricName, long value) {
+      this.name = metricName;
+      this.value = value;
+    }
+
+    @Override
+    public int compareTo(NameValuePair other) {
+      return (int) (value - other.value);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof NameValuePair) {
+        return compareTo((NameValuePair)other) == 0;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.valueOf(value).hashCode();
+    }
+  }
+
+  /**
+   * A fixed-size priority queue, used to retrieve top-n of offered entries.
+   */
+  static private class TopN extends PriorityQueue<NameValuePair> {
+    private static final long serialVersionUID = 5134028249611535803L;
+    int n; // > 0
+    private long total = 0;
+
+    TopN(int n) {
+      super(n);
+      this.n = n;
+    }
+
+    @Override
+    public boolean offer(NameValuePair entry) {
+      updateTotal(entry.value);
+      if (size() == n) {
+        NameValuePair smallest = peek();
+        if (smallest.value >= entry.value) {
+          return false;
+        }
+        poll(); // remove smallest
+      }
+      return super.offer(entry);
+    }
+
+    private void updateTotal(long value) {
+      total += value;
+    }
+
+    public long getTotal() {
+      return total;
+    }
+  }
+
+  /**
+   * A mapping from metric names to their absolute values and their percentage
+   */
+  @InterfaceAudience.Private
+  public static class MetricValueMap extends HashMap<String, Number> {
+    private static final long serialVersionUID = 8936732010242400171L;
+  }
+}

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2224,4 +2224,32 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.top.enabled</name>
+  <value>true</value>
+  <description>Enable nntop: reporting top users on namenode
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.window.num.buckets</name>
+  <value>10</value>
+  <description>Number of buckets in the rolling window implementation of nntop
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.num.users</name>
+  <value>10</value>
+  <description>Number of top users returned by the top tool
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.top.windows.minutes</name>
+  <value>1,5,25</value>
+  <description>comma separated list of nntop reporting periods in minutes
+  </description>
+</property>
+
 </configuration>

+ 26 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java

@@ -20,10 +20,9 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 
@@ -43,6 +42,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -94,6 +94,29 @@ public class TestAuditLogger {
     }
   }
 
+  /**
+   * Tests that TopAuditLogger can be disabled
+   */
+  @Test
+  public void testDisableTopAuditLogger() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(NNTOP_ENABLED_KEY, false);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    try {
+      cluster.waitClusterUp();
+      List<AuditLogger> auditLoggers =
+          cluster.getNameNode().getNamesystem().getAuditLoggers();
+      for (AuditLogger auditLogger : auditLoggers) {
+        assertFalse(
+            "top audit logger is still hooked in after it is disabled",
+            auditLogger instanceof TopAuditLogger);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testWebHdfsAuditLogger() throws IOException, URISyntaxException {
     Configuration conf = new HdfsConfiguration();

+ 61 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.metrics;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
@@ -46,6 +47,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
+import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
+import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -53,6 +58,7 @@ import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -87,6 +93,11 @@ public class TestNameNodeMetrics {
     CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
     ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
       .getLogger().setLevel(Level.DEBUG);
+    /**
+     * need it to test {@link #testTopAuditLogger}
+     */
+    CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
+        TopAuditLogger.class.getName());
   }
   
   private MiniDFSCluster cluster;
@@ -101,6 +112,7 @@ public class TestNameNodeMetrics {
   
   @Before
   public void setUp() throws Exception {
+    TopMetrics.reset();//reset the static init done by prev test
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
     cluster.waitActive();
     namesystem = cluster.getNamesystem();
@@ -454,4 +466,53 @@ public class TestNameNodeMetrics {
     assertQuantileGauges("Syncs1s", rb);
     assertQuantileGauges("BlockReport1s", rb);
   }
+
+  /**
+   * Test whether {@link TopMetrics} is registered with metrics system
+   * @throws Exception
+   */
+  @Test
+  public void testTopMetrics() throws Exception {
+    final String testUser = "NNTopTestUser";
+    final String testOp = "NNTopTestOp";
+    final String metricName =
+        RollingWindowManager.createMetricName(testOp, testUser);
+    TopMetrics.getInstance().report(testUser, testOp);
+    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
+    MetricsRecordBuilder rb = getMetrics(regName);
+    assertGauge(metricName, 1L, rb);
+  }
+
+  /**
+   * Test whether {@link TopAuditLogger} is registered as an audit logger
+   * @throws Exception
+   */
+  @Test
+  public void testTopAuditLogger() throws Exception {
+    //note: the top audit logger should already be set in conf
+    //issue one command, any command is fine
+    FileSystem fs = cluster.getFileSystem();
+    long time = System.currentTimeMillis();
+    fs.setTimes(new Path("/"), time, time);
+    //the command should be reflected in the total count of all users
+    final String testUser = TopConf.ALL_USERS;
+    final String testOp = TopConf.CMD_TOTAL;
+    final String metricName =
+        RollingWindowManager.createMetricName(testOp, testUser);
+    final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
+    MetricsRecordBuilder rb = getMetrics(regName);
+    assertGaugeGreaterThan(metricName, 1L, rb);
+  }
+
+  /**
+   * Assert a long gauge metric greater than
+   * @param name  of the metric
+   * @param expected  minimum expected value of the metric
+   * @param rb  the record builder mock used to getMetrics
+   */
+  public static void assertGaugeGreaterThan(String name, long expected,
+                                 MetricsRecordBuilder rb) {
+    Assert.assertTrue("Bad value for metric " + name,
+        expected <= MetricsAsserts.getLongGauge(name, rb));
+  }
 }

+ 84 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindow.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRollingWindow {
+
+  final int WINDOW_LEN = 60000;
+  final int BUCKET_CNT = 10;
+  final int BUCKET_LEN = WINDOW_LEN / BUCKET_CNT;
+
+  @Test
+  public void testBasics() {
+    RollingWindow window = new RollingWindow(WINDOW_LEN, BUCKET_CNT);
+    long time = 1;
+    Assert.assertEquals("The initial sum of rolling window must be 0", 0,
+        window.getSum(time));
+    time = WINDOW_LEN + BUCKET_LEN * 3 / 2;
+    Assert.assertEquals("The initial sum of rolling window must be 0", 0,
+        window.getSum(time));
+
+    window.incAt(time, 5);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 5,
+        window.getSum(time));
+
+    time += BUCKET_LEN;
+    window.incAt(time, 6);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 11,
+        window.getSum(time));
+
+    time += WINDOW_LEN - BUCKET_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 6,
+        window.getSum(time));
+
+    time += BUCKET_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 0,
+        window.getSum(time));
+  }
+
+  @Test
+  public void testReorderedAccess() {
+    RollingWindow window = new RollingWindow(WINDOW_LEN, BUCKET_CNT);
+    long time = 2 * WINDOW_LEN + BUCKET_LEN * 3 / 2;
+    window.incAt(time, 5);
+
+    time++;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the recent update", 5,
+        window.getSum(time));
+
+    long reorderedTime = time - 2 * BUCKET_LEN;
+    window.incAt(reorderedTime, 6);
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect the reordered update", 11,
+        window.getSum(time));
+
+    time = reorderedTime + WINDOW_LEN;
+    Assert.assertEquals(
+        "The sum of rolling window does not reflect rolling effect", 5,
+        window.getSum(time));
+  }
+
+}

+ 93 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.top.window;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
+
+public class TestRollingWindowManager {
+
+  Configuration conf;
+  RollingWindowManager manager;
+  String[] users;
+  final static int MIN_2_MS = 60000;
+
+  final int WINDOW_LEN_MS = 1 * MIN_2_MS;
+  final int BUCKET_CNT = 10;
+  final int N_TOP_USERS = 10;
+  final int BUCKET_LEN = WINDOW_LEN_MS / BUCKET_CNT;
+
+  @Before
+  public void init() {
+    conf = new Configuration();
+    conf.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, BUCKET_CNT);
+    conf.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
+    manager = new RollingWindowManager(conf, WINDOW_LEN_MS);
+    users = new String[2 * N_TOP_USERS];
+    for (int i = 0; i < users.length; i++) {
+      users[i] = "user" + i;
+    }
+  }
+
+  @Test
+  public void testTops() {
+    long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
+    for (int i = 0; i < users.length; i++)
+      manager.recordMetric(time, "open", users[i], (i + 1) * 2);
+    time++;
+    for (int i = 0; i < users.length; i++)
+      manager.recordMetric(time, "close", users[i], i + 1);
+    time++;
+    MetricValueMap tops = manager.snapshot(time);
+
+    assertEquals("The number of returned top metrics is invalid",
+        2 * (N_TOP_USERS + 1), tops.size());
+    int userIndex = users.length - 2;
+    String metricName = RollingWindowManager.createMetricName("open",
+        users[userIndex]);
+    boolean includes = tops.containsKey(metricName);
+    assertTrue("The order of entries in top metrics is wrong", includes);
+    assertEquals("The reported value by top is different from recorded one",
+        (userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue());
+
+    // move the window forward not to see the "open" results
+    time += WINDOW_LEN_MS - 2;
+    // top should not include only "close" results
+    tops = manager.snapshot(time);
+    assertEquals("The number of returned top metrics is invalid",
+        N_TOP_USERS + 1, tops.size());
+    includes = tops.containsKey(metricName);
+    assertFalse("After rolling, the top list still includes the stale metrics",
+        includes);
+
+    metricName = RollingWindowManager.createMetricName("close",
+        users[userIndex]);
+    includes = tops.containsKey(metricName);
+    assertTrue("The order of entries in top metrics is wrong", includes);
+    assertEquals("The reported value by top is different from recorded one",
+        (userIndex + 1), ((Long) tops.get(metricName)).longValue());
+  }
+}