Browse Source

HDFS-13055. Aggregate usage statistics from datanodes. Contributed by Ajay Kumar.

Arpit Agarwal 7 years ago
parent
commit
1c1ce63cda

+ 181 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/DataNodeUsageReport.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class that allows DataNode to communicate information about
+ * usage statistics/metrics to NameNode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DataNodeUsageReport {
+
+  private long bytesWrittenPerSec;
+  private long bytesReadPerSec;
+  private long writeTime;
+  private long readTime;
+  private long blocksWrittenPerSec;
+  private long blocksReadPerSec;
+  private long timestamp;
+
+  DataNodeUsageReport() {
+  }
+
+  private DataNodeUsageReport(Builder builder) {
+    this.bytesWrittenPerSec = builder.bytesWrittenPerSec;
+    this.bytesReadPerSec = builder.bytesReadPerSec;
+    this.writeTime = builder.writeTime;
+    this.readTime = builder.readTime;
+    this.blocksWrittenPerSec = builder.blocksWrittenPerSec;
+    this.blocksReadPerSec = builder.blocksReadPerSec;
+    this.timestamp = builder.timestamp;
+  }
+
+  /**
+   * An object representing a DataNodeUsageReport with default values. Should
+   * be used instead of null or creating new objects when there are
+   * no statistics to report.
+   */
+  public static final DataNodeUsageReport EMPTY_REPORT =
+      new DataNodeUsageReport();
+
+  @Override
+  public String toString() {
+    return "bytesWrittenPerSec:" + bytesWrittenPerSec + " "
+        + " bytesReadPerSec:"
+        + bytesReadPerSec + " writeTime:" + writeTime + " readTime:" + readTime
+        + " blocksWrittenPerSec:" + blocksWrittenPerSec + " blocksReadPerSec:" +
+        blocksReadPerSec + " timestamp:" + timestamp;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (timestamp + bytesWrittenPerSec + bytesReadPerSec + writeTime
+        + readTime + blocksWrittenPerSec + blocksReadPerSec);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    // If the object is compared with itself then return true
+    if (o == this) {
+      return true;
+    }
+
+    if (!(o instanceof DataNodeUsageReport)) {
+      return false;
+    }
+
+    DataNodeUsageReport c = (DataNodeUsageReport) o;
+    return this.timestamp == c.timestamp
+        && this.readTime == c.readTime
+        && this.writeTime == c.writeTime
+        && this.bytesWrittenPerSec == c.bytesWrittenPerSec
+        && this.bytesReadPerSec == c.bytesReadPerSec
+        && this.blocksWrittenPerSec == c.blocksWrittenPerSec
+        && this.blocksReadPerSec == c.blocksReadPerSec;
+  }
+
+  public long getBytesWrittenPerSec() {
+    return bytesWrittenPerSec;
+  }
+
+  public long getBytesReadPerSec() {
+    return bytesReadPerSec;
+  }
+
+  public long getWriteTime() {
+    return writeTime;
+  }
+
+  public long getReadTime() {
+    return readTime;
+  }
+
+  public long getBlocksWrittenPerSec() {
+    return blocksWrittenPerSec;
+  }
+
+  public long getBlocksReadPerSec() {
+    return blocksReadPerSec;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * Builder class for {@link DataNodeUsageReport}.
+   */
+  public static class Builder {
+
+    private long bytesWrittenPerSec;
+    private long bytesReadPerSec;
+    private long writeTime;
+    private long readTime;
+    private long blocksWrittenPerSec;
+    private long blocksReadPerSec;
+    private long timestamp;
+
+    public DataNodeUsageReport build() {
+      return new DataNodeUsageReport(this);
+    }
+
+    public Builder setBytesWrittenPerSec(long bWrittenPerSec) {
+      this.bytesWrittenPerSec = bWrittenPerSec;
+      return this;
+    }
+
+    public Builder setBytesReadPerSec(long bReadPerSec) {
+      this.bytesReadPerSec = bReadPerSec;
+      return this;
+    }
+
+    public Builder setWriteTime(long wTime) {
+      this.writeTime = wTime;
+      return this;
+    }
+
+    public Builder setReadTime(long rTime) {
+      this.readTime = rTime;
+      return this;
+    }
+
+    public Builder setBlocksWrittenPerSec(long wBlock) {
+      this.blocksWrittenPerSec = wBlock;
+      return this;
+    }
+
+    public Builder setBlocksReadPerSec(long rBlock) {
+      this.blocksReadPerSec = rBlock;
+      return this;
+    }
+
+    public Builder setTimestamp(long ts) {
+      this.timestamp = ts;
+      return this;
+    }
+
+    public Builder() {
+    }
+
+  }
+
+}

+ 101 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/DataNodeUsageReportUtil.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Time;
+
+/**
+ * This class is helper class to generate a live usage report by calculating
+ * the delta between  current DataNode usage metrics and the usage metrics
+ * captured at the time of the last report.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DataNodeUsageReportUtil {
+
+  private long bytesWritten;
+  private long bytesRead;
+  private long writeTime;
+  private long readTime;
+  private long blocksWritten;
+  private long blocksRead;
+  private DataNodeUsageReport lastReport;
+
+  public DataNodeUsageReport getUsageReport(long bWritten, long
+      bRead, long wTime, long rTime, long wBlockOp, long
+      rBlockOp, long timeSinceLastReport) {
+    if (timeSinceLastReport == 0) {
+      if (lastReport == null) {
+        lastReport = DataNodeUsageReport.EMPTY_REPORT;
+      }
+      return lastReport;
+    }
+    DataNodeUsageReport.Builder builder = new DataNodeUsageReport.Builder();
+    DataNodeUsageReport report = builder.setBytesWrittenPerSec(
+        getBytesWrittenPerSec(bWritten, timeSinceLastReport))
+        .setBytesReadPerSec(getBytesReadPerSec(bRead, timeSinceLastReport))
+        .setWriteTime(getWriteTime(wTime))
+        .setReadTime(getReadTime(rTime)).setBlocksWrittenPerSec(
+            getWriteBlockOpPerSec(wBlockOp, timeSinceLastReport))
+        .setBlocksReadPerSec(
+            getReadBlockOpPerSec(rBlockOp, timeSinceLastReport))
+        .setTimestamp(Time.monotonicNow()).build();
+
+    // Save raw metrics
+    this.bytesRead = bRead;
+    this.bytesWritten = bWritten;
+    this.blocksWritten = wBlockOp;
+    this.blocksRead = rBlockOp;
+    this.readTime = rTime;
+    this.writeTime = wTime;
+    lastReport = report;
+    return report;
+  }
+
+  private long getBytesReadPerSec(long bRead, long
+      timeInSec) {
+    return (bRead - this.bytesRead) / timeInSec;
+  }
+
+  private long getBytesWrittenPerSec(long
+      bWritten, long timeInSec) {
+    return (bWritten - this.bytesWritten) / timeInSec;
+  }
+
+  private long getWriteBlockOpPerSec(
+      long totalWriteBlocks, long timeInSec) {
+    return (totalWriteBlocks - this.blocksWritten) / timeInSec;
+  }
+
+  private long getReadBlockOpPerSec(long totalReadBlockOp,
+      long timeInSec) {
+    return (totalReadBlockOp - this.blocksRead) / timeInSec;
+  }
+
+  private long getReadTime(long totalReadTime) {
+    return totalReadTime - this.readTime;
+
+  }
+
+  private long getWriteTime(long totalWriteTime) {
+    return totalWriteTime - this.writeTime;
+  }
+
+}

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/package-info.java

@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package contains classes that allows HDFS to communicate information b/w
+ * DataNode and NameNode.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.protocol;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -22,6 +22,8 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.protocol.DataNodeUsageReport;
+import org.apache.hadoop.hdfs.server.protocol.DataNodeUsageReportUtil;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -161,7 +163,8 @@ public class DataNodeMetrics {
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
   JvmMetrics jvmMetrics = null;
-  
+  private DataNodeUsageReportUtil dnUsageReportUtil;
+
   public DataNodeMetrics(String name, String sessionId, int[] intervals,
       final JvmMetrics jvmMetrics) {
     this.name = name;
@@ -169,6 +172,7 @@ public class DataNodeMetrics {
     registry.tag(SessionId, sessionId);
     
     final int len = intervals.length;
+    dnUsageReportUtil = new DataNodeUsageReportUtil();
     packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
     flushNanosQuantiles = new MutableQuantiles[len];
     fsyncNanosQuantiles = new MutableQuantiles[len];
@@ -521,4 +525,10 @@ public class DataNodeMetrics {
   public void incrECReconstructionDecodingTime(long millis) {
     ecReconstructionDecodingTimeMillis.incr(millis);
   }
+
+  public DataNodeUsageReport getDNUsageReport(long timeSinceLastReport) {
+    return dnUsageReportUtil.getUsageReport(bytesWritten.value(), bytesRead
+            .value(), totalWriteTime.value(), totalReadTime.value(),
+        blocksWritten.value(), blocksRead.value(), timeSinceLastReport);
+  }
 }

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDNUsageReport.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.DataNodeUsageReport;
+import org.apache.hadoop.hdfs.server.protocol.DataNodeUsageReportUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for {@link DataNodeUsageReport}.
+ */
+public class TestDNUsageReport {
+
+  private DataNodeUsageReportUtil dnUsageUtil;
+  private long bytesWritten;
+  private long bytesRead;
+  private long writeTime;
+  private long readTime;
+  private long writeBlock;
+  private long readBlock;
+  private long timeSinceLastReport;
+
+  @Before
+  public void setup() throws IOException {
+    dnUsageUtil = new DataNodeUsageReportUtil();
+  }
+
+  @After
+  public void clear() throws IOException {
+    dnUsageUtil = null;
+  }
+
+  /**
+   * Ensure that storage type and storage state are propagated
+   * in Storage Reports.
+   */
+  @Test(timeout = 60000)
+  public void testUsageReport() throws IOException {
+
+    // Test1
+    DataNodeUsageReport report = dnUsageUtil.getUsageReport(0,
+        0, 0, 0, 0, 0, 0);
+    Assert.assertEquals(report, DataNodeUsageReport.EMPTY_REPORT);
+
+    // Test2
+    bytesWritten = 200;
+    bytesRead = 200;
+    writeTime = 50;
+    readTime = 50;
+    writeBlock = 20;
+    readBlock = 10;
+    timeSinceLastReport = 5;
+    report = dnUsageUtil.getUsageReport(bytesWritten,
+        bytesRead, writeTime, readTime, writeBlock, readBlock,
+        timeSinceLastReport);
+
+    Assert.assertEquals(bytesWritten / timeSinceLastReport,
+        report.getBytesWrittenPerSec());
+    Assert.assertEquals(bytesRead / timeSinceLastReport,
+        report.getBytesReadPerSec());
+    Assert.assertEquals(writeTime, report.getWriteTime());
+    Assert.assertEquals(readTime, report.getReadTime());
+    Assert.assertEquals(writeBlock / timeSinceLastReport,
+        report.getBlocksWrittenPerSec());
+    Assert.assertEquals(readBlock / timeSinceLastReport,
+        report.getBlocksReadPerSec());
+
+    // Test3
+    DataNodeUsageReport report2 = dnUsageUtil.getUsageReport(bytesWritten,
+        bytesRead, writeTime, readTime, writeBlock, readBlock,
+        0);
+    Assert.assertEquals(report, report2);
+
+    // Test4
+    long bytesWritten2 = 50000;
+    long bytesRead2 = 40000;
+    long writeTime2 = 5000;
+    long readTime2 = 1500;
+    long writeBlock2 = 1000;
+    long readBlock2 = 200;
+    timeSinceLastReport = 60;
+    report2 = dnUsageUtil.getUsageReport(bytesWritten2,
+        bytesRead2, writeTime2, readTime2, writeBlock2, readBlock2,
+        timeSinceLastReport);
+
+    Assert.assertEquals((bytesWritten2 - bytesWritten) / timeSinceLastReport,
+        report2.getBytesWrittenPerSec());
+    Assert.assertEquals((bytesRead2 - bytesRead) / timeSinceLastReport,
+        report2.getBytesReadPerSec());
+    Assert.assertEquals(writeTime2 - writeTime, report2.getWriteTime());
+    Assert.assertEquals(readTime2 - readTime, report2.getReadTime());
+    Assert.assertEquals((writeBlock2 - writeBlock) / timeSinceLastReport,
+        report2.getBlocksWrittenPerSec());
+    Assert.assertEquals((readBlock2 - readBlock) / timeSinceLastReport,
+        report2.getBlocksReadPerSec());
+  }
+}