Pārlūkot izejas kodu

HDFS-6685. Balancer should preserve storage type of replicas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1615015 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 10 gadi atpakaļ
vecāks
revīzija
b8597e6a10

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -338,6 +338,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-6441. Add ability to exclude/include specific datanodes while
     balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
 
+    HDFS-6685. Balancer should preserve storage type of replicas.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -32,4 +35,11 @@ public enum StorageType {
   SSD;
 
   public static final StorageType DEFAULT = DISK;
+  public static final StorageType[] EMPTY_ARRAY = {};
+  
+  private static final StorageType[] VALUES = values();
+  
+  public static List<StorageType> asList() {
+    return Arrays.asList(VALUES);
+  }
 }

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -352,15 +352,19 @@ public class PBHelper {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
-        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
+        .build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
+    final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
     return new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
-        storageUuids.toArray(new String[storageUuids.size()]));
+        storageUuids.toArray(new String[storageUuids.size()]),
+        convertStorageTypes(storageTypes, storageUuids.size()));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 360 - 266
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java


+ 61 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java

@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
 
 /**
  * Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  */
 @InterfaceAudience.Private
 abstract class BalancingPolicy {
-  long totalCapacity;
-  long totalUsedSpace;
-  private double avgUtilization;
+  final EnumCounters<StorageType> totalCapacities
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumCounters<StorageType> totalUsedSpaces
+      = new EnumCounters<StorageType>(StorageType.class);
+  final EnumDoubles<StorageType> avgUtilizations
+      = new EnumDoubles<StorageType>(StorageType.class);
 
   void reset() {
-    totalCapacity = 0L;
-    totalUsedSpace = 0L;
-    avgUtilization = 0.0;
+    totalCapacities.reset();
+    totalUsedSpaces.reset();
+    avgUtilizations.reset();
   }
 
   /** Get the policy name. */
   abstract String getName();
 
   /** Accumulate used space and capacity. */
-  abstract void accumulateSpaces(DatanodeInfo d);
+  abstract void accumulateSpaces(DatanodeStorageReport r);
 
   void initAvgUtilization() {
-    this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+    for(StorageType t : StorageType.asList()) {
+      final long capacity = totalCapacities.get(t);
+      if (capacity > 0L) {
+        final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
+        avgUtilizations.set(t, avg);
+      }
+    }
   }
-  double getAvgUtilization() {
-    return avgUtilization;
+
+  double getAvgUtilization(StorageType t) {
+    return avgUtilizations.get(t);
   }
 
-  /** Return the utilization of a datanode */
-  abstract double getUtilization(DatanodeInfo d);
+  /** @return the utilization of a particular storage type of a datanode;
+   *          or return null if the datanode does not have such storage type.
+   */
+  abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
   
   @Override
   public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getDfsUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getDfsUsed());
+      }
     }
     
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getDfsUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long dfsUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          dfsUsed += s.getDfsUsed();
+        }
+      }
+      return capacity == 0L? null: dfsUsed*100.0/capacity;
     }
   }
 
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
     }
 
     @Override
-    void accumulateSpaces(DatanodeInfo d) {
-      totalCapacity += d.getCapacity();
-      totalUsedSpace += d.getBlockPoolUsed();  
+    void accumulateSpaces(DatanodeStorageReport r) {
+      for(StorageReport s : r.getStorageReports()) {
+        final StorageType t = s.getStorage().getStorageType();
+        totalCapacities.add(t, s.getCapacity());
+        totalUsedSpaces.add(t, s.getBlockPoolUsed());
+      }
     }
 
     @Override
-    double getUtilization(DatanodeInfo d) {
-      return d.getBlockPoolUsed()*100.0/d.getCapacity();
+    Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+      long capacity = 0L;
+      long blockPoolUsed = 0L;
+      for(StorageReport s : r.getStorageReports()) {
+        if (s.getStorage().getStorageType() == t) {
+          capacity += s.getCapacity();
+          blockPoolUsed += s.getBlockPoolUsed();
+        }
+      }
+      return capacity == 0L? null: blockPoolUsed*100.0/capacity;
     }
   }
 }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2826,12 +2826,15 @@ public class BlockManager {
     } else {
       final String[] datanodeUuids = new String[locations.size()];
       final String[] storageIDs = new String[datanodeUuids.length];
+      final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
       for(int i = 0; i < locations.size(); i++) {
         final DatanodeStorageInfo s = locations.get(i);
         datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
         storageIDs[i] = s.getStorageID();
+        storageTypes[i] = s.getStorageType();
       }
-      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+          storageTypes));
       return block.getNumBytes();
     }
   }

+ 19 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java

@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.util.Arrays;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
@@ -39,12 +38,15 @@ public class BlocksWithLocations {
     final Block block;
     final String[] datanodeUuids;
     final String[] storageIDs;
+    final StorageType[] storageTypes;
     
     /** constructor */
-    public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) {
+    public BlockWithLocations(Block block, String[] datanodeUuids,
+        String[] storageIDs, StorageType[] storageTypes) {
       this.block = block;
       this.datanodeUuids = datanodeUuids;
       this.storageIDs = storageIDs;
+      this.storageTypes = storageTypes;
     }
     
     /** get the block */
@@ -61,7 +63,12 @@ public class BlocksWithLocations {
     public String[] getStorageIDs() {
       return storageIDs;
     }
-    
+
+    /** @return the storage types */
+    public StorageType[] getStorageTypes() {
+      return storageTypes;
+    }
+
     @Override
     public String toString() {
       final StringBuilder b = new StringBuilder();
@@ -70,12 +77,18 @@ public class BlocksWithLocations {
         return b.append("[]").toString();
       }
       
-      b.append(storageIDs[0]).append('@').append(datanodeUuids[0]);
+      appendString(0, b.append("["));
       for(int i = 1; i < datanodeUuids.length; i++) {
-        b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]);
+        appendString(i, b.append(","));
       }
       return b.append("]").toString();
     }
+    
+    private StringBuilder appendString(int i, StringBuilder b) {
+      return b.append("[").append(storageTypes[i]).append("]")
+              .append(storageIDs[i])
+              .append("@").append(datanodeUuids[i]);
+    }
   }
 
   private final BlockWithLocations[] blocks;

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumCounters.java

@@ -37,7 +37,7 @@ import com.google.common.base.Preconditions;
 public class EnumCounters<E extends Enum<E>> {
   /** The class of the enum. */
   private final Class<E> enumClass;
-  /** The counter array, counters[i] corresponds to the enumConstants[i]. */
+  /** An array of longs corresponding to the enum type. */
   private final long[] counters;
 
   /**
@@ -75,6 +75,13 @@ public class EnumCounters<E extends Enum<E>> {
     }
   }
 
+  /** Reset all counters to zero. */
+  public final void reset() {
+    for(int i = 0; i < counters.length; i++) {
+      this.counters[i] = 0L;
+    }
+  }
+
   /** Add the given value to counter e. */
   public final void add(final E e, final long value) {
     counters[e.ordinal()] += value;

+ 128 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/EnumDoubles.java

@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.Arrays;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Similar to {@link EnumCounters} except that the value type is double.
+ *
+ * @param <E> the enum type
+ */
+public class EnumDoubles<E extends Enum<E>> {
+  /** The class of the enum. */
+  private final Class<E> enumClass;
+  /** An array of doubles corresponding to the enum type. */
+  private final double[] doubles;
+
+  /**
+   * Construct doubles for the given enum constants.
+   * @param enumClass the enum class.
+   */
+  public EnumDoubles(final Class<E> enumClass) {
+    final E[] enumConstants = enumClass.getEnumConstants();
+    Preconditions.checkNotNull(enumConstants);
+    this.enumClass = enumClass;
+    this.doubles = new double[enumConstants.length];
+  }
+  
+  /** @return the value corresponding to e. */
+  public final double get(final E e) {
+    return doubles[e.ordinal()];
+  }
+
+  /** Negate all values. */
+  public final void negation() {
+    for(int i = 0; i < doubles.length; i++) {
+      doubles[i] = -doubles[i];
+    }
+  }
+  
+  /** Set e to the given value. */
+  public final void set(final E e, final double value) {
+    doubles[e.ordinal()] = value;
+  }
+
+  /** Set the values of this object to that object. */
+  public final void set(final EnumDoubles<E> that) {
+    for(int i = 0; i < doubles.length; i++) {
+      this.doubles[i] = that.doubles[i];
+    }
+  }
+
+  /** Reset all values to zero. */
+  public final void reset() {
+    for(int i = 0; i < doubles.length; i++) {
+      this.doubles[i] = 0.0;
+    }
+  }
+
+  /** Add the given value to e. */
+  public final void add(final E e, final double value) {
+    doubles[e.ordinal()] += value;
+  }
+
+  /** Add the values of that object to this. */
+  public final void add(final EnumDoubles<E> that) {
+    for(int i = 0; i < doubles.length; i++) {
+      this.doubles[i] += that.doubles[i];
+    }
+  }
+
+  /** Subtract the given value from e. */
+  public final void subtract(final E e, final double value) {
+    doubles[e.ordinal()] -= value;
+  }
+
+  /** Subtract the values of this object from that object. */
+  public final void subtract(final EnumDoubles<E> that) {
+    for(int i = 0; i < doubles.length; i++) {
+      this.doubles[i] -= that.doubles[i];
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    } else if (obj == null || !(obj instanceof EnumDoubles)) {
+      return false;
+    }
+    final EnumDoubles<?> that = (EnumDoubles<?>)obj;
+    return this.enumClass == that.enumClass
+        && Arrays.equals(this.doubles, that.doubles);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(doubles);
+  }
+
+  @Override
+  public String toString() {
+    final E[] enumConstants = enumClass.getEnumConstants();
+    final StringBuilder b = new StringBuilder();
+    for(int i = 0; i < doubles.length; i++) {
+      final String name = enumConstants[i].name();
+      b.append(name).append("=").append(doubles[i]).append(", ");
+    }
+    return b.substring(0, b.length() - 2);
+  }
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -404,6 +404,7 @@ message BlockWithLocationsProto {
   required BlockProto block = 1;   // Block
   repeated string datanodeUuids = 2; // Datanodes with replicas of the block
   repeated string storageUuids = 3;  // Storages with replicas of the block
+  repeated StorageTypeProto storageTypes = 4;
 }
 
 /**

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -184,8 +184,10 @@ public class TestPBHelper {
   private static BlockWithLocations getBlockWithLocations(int bid) {
     final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
     final String[] storageIDs = {"s1", "s2", "s3"};
+    final StorageType[] storageTypes = {
+        StorageType.DISK, StorageType.DISK, StorageType.DISK};
     return new BlockWithLocations(new Block(bid, 0, 1),
-        datanodeUuids, storageIDs);
+        datanodeUuids, storageIDs, storageTypes);
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels