瀏覽代碼

Merging change r1086458 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1089691 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 年之前
父節點
當前提交
ce4a67947c

+ 3 - 0
CHANGES.txt

@@ -314,6 +314,9 @@ Trunk (unreleased changes)
     HDFS-1541. Not marking datanodes dead when namenode in safemode.
     (hairong)
 
+    HDFS-1120. Make DataNode's block-to-device placement policy pluggable
+    (Harsh J Chouraria via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

+ 9 - 0
src/java/hdfs-default.xml

@@ -344,6 +344,15 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.block.volume.choice.policy</name>
+  <value>org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy</value>
+  <description>The policy class to use to determine into which of the
+  datanode's available volumes a block must be written to. Default is a simple
+  round-robin policy that chooses volumes in a cyclic order.
+  </description>
+</property>
+
 <property>
   <name>dfs.heartbeat.interval</name>
   <value>3</value>

+ 3 - 0
src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -175,6 +175,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_DEFAULT = 2L<<40;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
+  public static final String  DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
+  public static final String  DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY_DEFAULT =
+    "org.apache.hadoop.hdfs.server.datanode.RoundRobinVolumesPolicy";
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
   public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";

+ 52 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**************************************************
+ * BlockVolumeChoosingPolicy allows a DataNode to
+ * specify what policy is to be used while choosing
+ * a volume for a block request.
+ * 
+ ***************************************************/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface BlockVolumeChoosingPolicy {
+
+  /**
+   * Returns a specific FSVolume after applying a suitable choice algorithm
+   * to place a given block, given a list of FSVolumes and the block
+   * size sought for storage.
+   * 
+   * (Policies that maintain state must be thread-safe.)
+   * 
+   * @param volumes - the array of FSVolumes that are available.
+   * @param blockSize - the size of the block for which a volume is sought.
+   * @return the chosen volume to store the block.
+   * @throws IOException when disks are unavailable or are full.
+   */
+  public FSVolume chooseVolume(List<FSVolume> volumes, long blockSize)
+    throws IOException;
+
+}

+ 14 - 28
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -65,8 +65,8 @@ import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.IOUtils;
 
@@ -782,11 +782,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
      * This list is replaced on modification holding "this" lock.
      */
     private volatile List<FSVolume> volumes = null;
-    private int curVolume = 0; // Synchronized using "this"
+    BlockVolumeChoosingPolicy blockChooser;
       
-    FSVolumeSet(FSVolume[] volumes) {
+    FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
       List<FSVolume> list = Arrays.asList(volumes);
       this.volumes = Collections.unmodifiableList(list);
+      this.blockChooser = blockChooser;
     }
     
     private int numberOfVolumes() {
@@ -800,29 +801,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
      * @param blockSize free space needed on the volume
      * @return next volume to store the block in.
      */
-    FSVolume getNextVolume(long blockSize) throws IOException {
-      synchronized(this) {
-        if(volumes.size() < 1) {
-          throw new DiskOutOfSpaceException("No more available volumes");
-        }
-        // since volumes could've been removed because of the failure
-        // make sure we are not out of bounds
-        if(curVolume >= volumes.size()) {
-          curVolume = 0;
-        }
-        
-        int startVolume = curVolume;
-        
-        while (true) {
-          FSVolume volume = volumes.get(curVolume);
-          curVolume = (curVolume + 1) % volumes.size();
-          if (volume.getAvailable() > blockSize) { return volume; }
-          if (curVolume == startVolume) {
-            throw new DiskOutOfSpaceException(
-                "Insufficient space for an additional block");
-          }
-        }
-      }
+    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+      return blockChooser.chooseVolume(volumes, blockSize);
     }
       
     private long getDfsUsed() throws IOException {
@@ -1173,8 +1153,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
           + storage.getStorageDir(idx).getCurrentDir());
     }
     volumeMap = new ReplicasMap(this);
-    
-    volumes = new FSVolumeSet(volArray);
+
+    BlockVolumeChoosingPolicy blockChooserImpl =
+      (BlockVolumeChoosingPolicy) ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
+            RoundRobinVolumesPolicy.class,
+            BlockVolumeChoosingPolicy.class),
+        conf);
+    volumes = new FSVolumeSet(volArray, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
 
     File[] roots = new File[storage.getNumStorageDirs()];

+ 55 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+
+public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
+
+  private int curVolume = 0;
+
+  @Override
+  public synchronized FSVolume chooseVolume(List<FSVolume> volumes, long blockSize)
+      throws IOException {
+    if(volumes.size() < 1) {
+      throw new DiskOutOfSpaceException("No more available volumes");
+    }
+    
+    // since volumes could've been removed because of the failure
+    // make sure we are not out of bounds
+    if(curVolume >= volumes.size()) {
+      curVolume = 0;
+    }
+    
+    int startVolume = curVolume;
+    
+    while (true) {
+      FSVolume volume = volumes.get(curVolume);
+      curVolume = (curVolume + 1) % volumes.size();
+      if (volume.getAvailable() > blockSize) { return volume; }
+      if (curVolume == startVolume) {
+        throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+      }
+    }
+  }
+
+}

+ 67 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestRoundRobinVolumesPolicy {
+
+  // Test the Round-Robin block-volume choosing algorithm.
+  @Test
+  public void testRR() throws Exception {
+    final List<FSVolume> volumes = new ArrayList<FSVolume>();
+
+    // First volume, with 100 bytes of space.
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
+
+    // Second volume, with 200 bytes of space.
+    volumes.add(Mockito.mock(FSVolume.class));
+    Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
+
+    RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
+        RoundRobinVolumesPolicy.class, null);
+    
+    // Test two rounds of round-robin choosing
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
+
+    // The first volume has only 100L space, so the policy should
+    // wisely choose the second one in case we ask for more.
+    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150));
+
+    // Fail if no volume can be chosen?
+    try {
+      policy.chooseVolume(volumes, Long.MAX_VALUE);
+      Assert.fail();
+    } catch (IOException e) {
+      // Passed.
+    }
+  }
+
+}