Browse Source

HDFS-6727. Refresh data volumes on DataNode based on configuration changes (Lei Xu via Colin Patrick McCabe)
(cherry picked from commit fe38d2e9b5ac7e13f97cd2d3d2a984ab6bbaaf77)

Colin Patrick Mccabe 10 năm trước cách đây
mục cha
commit
d329237990

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -222,6 +222,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7003. Add NFS Gateway support for reading and writing to
     encryption zones. (clamb via wang)
 
+    HDFS-6727. Refresh data volumes on DataNode based on configuration changes
+    (Lei Xu via cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java

@@ -94,7 +94,12 @@ public final class HdfsServerConstants {
     NONINTERACTIVE("-nonInteractive"),
     RENAMERESERVED("-renameReserved"),
     METADATAVERSION("-metadataVersion"),
-    UPGRADEONLY("-upgradeOnly");
+    UPGRADEONLY("-upgradeOnly"),
+    // The -hotswap constant should not be used as a startup option, it is
+    // only used for StorageDirectory.analyzeStorage() in hot swap drive scenario.
+    // TODO refactor StorageDirectory.analyzeStorage() so that we can do away with
+    // this in StartupOption.
+    HOTSWAP("-hotswap");
 
     private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
         "(\\w+)\\((\\w+)\\)");

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -464,17 +464,20 @@ public abstract class Storage extends StorageInfo {
     public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
         throws IOException {
       assert root != null : "root is null";
+      boolean hadMkdirs = false;
       String rootPath = root.getCanonicalPath();
       try { // check that storage exists
         if (!root.exists()) {
           // storage directory does not exist
-          if (startOpt != StartupOption.FORMAT) {
+          if (startOpt != StartupOption.FORMAT &&
+              startOpt != StartupOption.HOTSWAP) {
             LOG.warn("Storage directory " + rootPath + " does not exist");
             return StorageState.NON_EXISTENT;
           }
           LOG.info(rootPath + " does not exist. Creating ...");
           if (!root.mkdirs())
             throw new IOException("Cannot create directory " + rootPath);
+          hadMkdirs = true;
         }
         // or is inaccessible
         if (!root.isDirectory()) {
@@ -492,7 +495,10 @@ public abstract class Storage extends StorageInfo {
 
       this.lock(); // lock storage if it exists
 
-      if (startOpt == HdfsServerConstants.StartupOption.FORMAT)
+      // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
+      // while it also checks the layout version.
+      if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
+          (startOpt == StartupOption.HOTSWAP && hadMkdirs))
         return StorageState.NOT_FORMATTED;
 
       if (startOpt != HdfsServerConstants.StartupOption.IMPORT) {

+ 157 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -70,8 +70,10 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -80,11 +82,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.ReconfigurableBase;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -138,6 +142,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -223,7 +228,7 @@ import com.google.protobuf.BlockingService;
  *
  **********************************************************/
 @InterfaceAudience.Private
-public class DataNode extends Configured 
+public class DataNode extends ReconfigurableBase
     implements InterDatanodeProtocol, ClientDatanodeProtocol,
     DataNodeMXBean {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
@@ -308,6 +313,7 @@ public class DataNode extends Configured
   private JvmPauseMonitor pauseMonitor;
 
   private SecureResources secureResources = null;
+  // dataDirs must be accessed while holding the DataNode lock.
   private List<StorageLocation> dataDirs;
   private Configuration conf;
   private final String confVersion;
@@ -389,6 +395,149 @@ public class DataNode extends Configured
     }
   }
 
+  @Override
+  public void reconfigurePropertyImpl(String property, String newVal)
+      throws ReconfigurationException {
+    if (property.equals(DFS_DATANODE_DATA_DIR_KEY)) {
+      try {
+        LOG.info("Reconfiguring " + property + " to " + newVal);
+        this.refreshVolumes(newVal);
+      } catch (Exception e) {
+        throw new ReconfigurationException(property, newVal,
+            getConf().get(property), e);
+      }
+    } else {
+      throw new ReconfigurationException(
+          property, newVal, getConf().get(property));
+    }
+  }
+
+  /**
+   * Get a list of the keys of the re-configurable properties in configuration.
+   */
+  @Override
+  public Collection<String> getReconfigurableProperties() {
+    List<String> reconfigurable =
+        Collections.unmodifiableList(Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
+    return reconfigurable;
+  }
+
+  /**
+   * Contains the StorageLocations for changed data volumes.
+   */
+  @VisibleForTesting
+  static class ChangedVolumes {
+    List<StorageLocation> newLocations = Lists.newArrayList();
+    List<StorageLocation> deactivateLocations = Lists.newArrayList();
+  }
+
+  /**
+   * Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
+   * changed volumes.
+   * @return changed volumes.
+   * @throws IOException if none of the directories are specified in the
+   * configuration.
+   */
+  @VisibleForTesting
+  ChangedVolumes parseChangedVolumes() throws IOException {
+    List<StorageLocation> locations = getStorageLocations(getConf());
+
+    if (locations.isEmpty()) {
+      throw new IOException("No directory is specified.");
+    }
+
+    ChangedVolumes results = new ChangedVolumes();
+    results.newLocations.addAll(locations);
+
+    for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
+         it.hasNext(); ) {
+      Storage.StorageDirectory dir = it.next();
+      boolean found = false;
+      for (Iterator<StorageLocation> sl = results.newLocations.iterator();
+           sl.hasNext(); ) {
+        if (sl.next().getFile().getCanonicalPath().equals(
+            dir.getRoot().getCanonicalPath())) {
+          sl.remove();
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        results.deactivateLocations.add(
+            StorageLocation.parse(dir.getRoot().toString()));
+      }
+    }
+
+    return results;
+  }
+
+  /**
+   * Attempts to reload data volumes with new configuration.
+   * @param newVolumes a comma separated string that specifies the data volumes.
+   * @throws Exception
+   */
+  private synchronized void refreshVolumes(String newVolumes) throws Exception {
+    Configuration conf = getConf();
+    String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
+    List<StorageLocation> locations = getStorageLocations(conf);
+
+    final int numOldDataDirs = dataDirs.size();
+    dataDirs = locations;
+    ChangedVolumes changedVolumes = parseChangedVolumes();
+
+    try {
+      if (numOldDataDirs + changedVolumes.newLocations.size() -
+          changedVolumes.deactivateLocations.size() <= 0) {
+        throw new IOException("Attempt to remove all volumes.");
+      }
+      if (!changedVolumes.newLocations.isEmpty()) {
+        LOG.info("Adding new volumes: " +
+            Joiner.on(",").join(changedVolumes.newLocations));
+
+        // Add volumes for each Namespace
+        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+          NamespaceInfo nsInfo = bpos.getNamespaceInfo();
+          LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID());
+          storage.addStorageLocations(
+              this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP);
+        }
+        List<String> bpids = Lists.newArrayList();
+        for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+          bpids.add(bpos.getBlockPoolId());
+        }
+        List<StorageLocation> succeedVolumes =
+            data.addVolumes(changedVolumes.newLocations, bpids);
+
+        if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
+          List<StorageLocation> failedVolumes = Lists.newArrayList();
+          // Clean all failed volumes.
+          for (StorageLocation location : changedVolumes.newLocations) {
+            if (!succeedVolumes.contains(location)) {
+              failedVolumes.add(location);
+            }
+          }
+          storage.removeVolumes(failedVolumes);
+          data.removeVolumes(failedVolumes);
+        }
+      }
+
+      if (!changedVolumes.deactivateLocations.isEmpty()) {
+        LOG.info("Deactivating volumes: " +
+            Joiner.on(",").join(changedVolumes.deactivateLocations));
+
+        data.removeVolumes(changedVolumes.deactivateLocations);
+        storage.removeVolumes(changedVolumes.deactivateLocations);
+      }
+    } catch (IOException e) {
+      LOG.warn("There is IOException when refreshing volumes! "
+          + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
+          + " = " + oldVolumes, e);
+      throw e;
+    }
+  }
+
   private synchronized void setClusterId(final String nsCid, final String bpid
       ) throws IOException {
     if(clusterId != null && !clusterId.equals(nsCid)) {
@@ -829,7 +978,9 @@ public class DataNode extends Configured
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
-    this.dataDirs = dataDirs;
+    synchronized (this) {
+      this.dataDirs = dataDirs;
+    }
     this.conf = conf;
     this.dnConf = new DNConf(conf);
     this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
@@ -1119,7 +1270,9 @@ public class DataNode extends Configured
       }
       final String bpid = nsInfo.getBlockPoolID();
       //read storage info, lock data dirs and transition fs state if necessary
-      storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+      synchronized (this) {
+        storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+      }
       final StorageInfo bpStorage = storage.getBPStorage(bpid);
       LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
           + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -94,8 +94,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   public List<V> getVolumes();
 
   /** Add an array of StorageLocation to FsDataset. */
-  public void addVolumes(Collection<StorageLocation> volumes)
-      throws IOException;
+  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
+      final Collection<String> bpids);
 
   /** Removes a collection of volumes from FsDataset. */
   public void removeVolumes(Collection<StorageLocation> volumes);

+ 115 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -28,19 +28,23 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +89,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
@@ -245,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
-    storageMap = new HashMap<String, DatanodeStorage>();
+    storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
     volumeMap = new ReplicaMap(this);
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@@ -275,45 +280,124 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // storageMap and asyncDiskService, consistent.
     FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
-    fsVolume.getVolumeMap(volumeMap);
+    ReplicaMap tempVolumeMap = new ReplicaMap(this);
+    fsVolume.getVolumeMap(tempVolumeMap);
 
+    volumeMap.addAll(tempVolumeMap);
     volumes.addVolume(fsVolume);
     storageMap.put(sd.getStorageUuid(),
         new DatanodeStorage(sd.getStorageUuid(),
-                            DatanodeStorage.State.NORMAL,
-                            storageType));
+            DatanodeStorage.State.NORMAL,
+            storageType));
     asyncDiskService.addVolume(sd.getCurrentDir());
 
     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
   }
 
+  private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
+      Storage.StorageDirectory sd, final Collection<String> bpids)
+      throws IOException {
+    final File dir = sd.getCurrentDir();
+    final StorageType storageType =
+        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+    final FsVolumeImpl fsVolume = new FsVolumeImpl(
+        this, sd.getStorageUuid(), dir, this.conf, storageType);
+    final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
+
+    List<IOException> exceptions = Lists.newArrayList();
+    for (final String bpid : bpids) {
+      try {
+        fsVolume.addBlockPool(bpid, this.conf);
+        fsVolume.getVolumeMap(bpid, tempVolumeMap);
+      } catch (IOException e) {
+        LOG.warn("Caught exception when adding " + fsVolume +
+            ". Will throw later.", e);
+        exceptions.add(e);
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      // The states of FsDatasteImpl are not modified, thus no need to rolled back.
+      throw MultipleIOException.createIOException(exceptions);
+    }
+
+    volumeMap.addAll(tempVolumeMap);
+    storageMap.put(sd.getStorageUuid(),
+        new DatanodeStorage(sd.getStorageUuid(),
+            DatanodeStorage.State.NORMAL,
+            storageType));
+    asyncDiskService.addVolume(sd.getCurrentDir());
+    volumes.addVolume(fsVolume);
+
+    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+  }
+
   /**
    * Add an array of StorageLocation to FsDataset.
    *
    * @pre dataStorage must have these volumes.
-   * @param volumes
-   * @throws IOException
+   * @param volumes an array of storage locations for adding volumes.
+   * @param bpids block pool IDs.
+   * @return an array of successfully loaded volumes.
    */
   @Override
-  public synchronized void addVolumes(Collection<StorageLocation> volumes)
-      throws IOException {
+  public synchronized List<StorageLocation> addVolumes(
+      final List<StorageLocation> volumes, final Collection<String> bpids) {
     final Collection<StorageLocation> dataLocations =
         DataNode.getStorageLocations(this.conf);
-    Map<String, Storage.StorageDirectory> allStorageDirs =
+    final Map<String, Storage.StorageDirectory> allStorageDirs =
         new HashMap<String, Storage.StorageDirectory>();
-    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
-      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-      allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+    List<StorageLocation> succeedVolumes = Lists.newArrayList();
+    try {
+      for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+        Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+        allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Caught exception when parsing storage URL.", ioe);
+      return succeedVolumes;
+    }
+
+    final boolean[] successFlags = new boolean[volumes.size()];
+    Arrays.fill(successFlags, false);
+    List<Thread> volumeAddingThreads = Lists.newArrayList();
+    for (int i = 0; i < volumes.size(); i++) {
+      final int idx = i;
+      Thread t = new Thread() {
+        public void run() {
+          StorageLocation vol = volumes.get(idx);
+          try {
+            String key = vol.getFile().getCanonicalPath();
+            if (!allStorageDirs.containsKey(key)) {
+              LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+            } else {
+              addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
+                  bpids);
+              successFlags[idx] = true;
+            }
+          } catch (IOException e) {
+            LOG.warn("Caught exception when adding volume " + vol, e);
+          }
+        }
+      };
+      volumeAddingThreads.add(t);
+      t.start();
     }
 
-    for (StorageLocation vol : volumes) {
-      String key = vol.getFile().getAbsolutePath();
-      if (!allStorageDirs.containsKey(key)) {
-        LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
-      } else {
-        addVolume(dataLocations, allStorageDirs.get(key));
+    for (Thread t : volumeAddingThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Caught InterruptedException when adding volume.", e);
       }
     }
+
+    for (int i = 0; i < volumes.size(); i++) {
+      if (successFlags[i]) {
+        succeedVolumes.add(volumes.get(i));
+      }
+    }
+    return succeedVolumes;
   }
 
   /**
@@ -335,9 +419,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         String volume = sd.getRoot().toString();
         LOG.info("Removing " + volume + " from FsDataset.");
 
-        this.volumes.removeVolume(volume);
-        storageMap.remove(sd.getStorageUuid());
+        // Disable the volume from the service.
         asyncDiskService.removeVolume(sd.getCurrentDir());
+        this.volumes.removeVolume(volume);
 
         // Removed all replica information for the blocks on the volume. Unlike
         // updating the volumeMap in addVolume(), this operation does not scan
@@ -348,7 +432,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               it.hasNext(); ) {
             ReplicaInfo block = it.next();
             if (block.getVolume().getBasePath().equals(volume)) {
-              invalidate(bpid, block.getBlockId());
+              invalidate(bpid, block);
               blocks.add(block);
               it.remove();
             }
@@ -357,6 +441,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           datanode.getBlockScanner().deleteBlocks(bpid,
               blocks.toArray(new Block[blocks.size()]));
         }
+
+        storageMap.remove(sd.getStorageUuid());
       }
     }
   }
@@ -1345,23 +1431,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   /**
    * Invalidate a block but does not delete the actual on-disk block file.
    *
-   * It should only be used for decommissioning disks.
+   * It should only be used when deactivating disks.
    *
    * @param bpid the block pool ID.
-   * @param blockId the ID of the block.
+   * @param block The block to be invalidated.
    */
-  public void invalidate(String bpid, long blockId) {
+  public void invalidate(String bpid, ReplicaInfo block) {
     // If a DFSClient has the replica in its cache of short-circuit file
     // descriptors (and the client is using ShortCircuitShm), invalidate it.
     // The short-circuit registry is null in the unit tests, because the
     // datanode is mock object.
     if (datanode.getShortCircuitRegistry() != null) {
       datanode.getShortCircuitRegistry().processBlockInvalidation(
-          new ExtendedBlockId(blockId, bpid));
+          new ExtendedBlockId(block.getBlockId(), bpid));
 
       // If the block is cached, start uncaching it.
-      cacheManager.uncacheBlock(bpid, blockId);
+      cacheManager.uncacheBlock(bpid, block.getBlockId());
     }
+
+    datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
+        block.getStorageUuid());
   }
 
   /**

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -1093,7 +1093,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public void addVolumes(Collection<StorageLocation> volumes) {
+  public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
+      final Collection<String> bpids) {
     throw new UnsupportedOperationException();
   }
 

+ 423 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java

@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataNodeHotSwapVolumes {
+  private static final int BLOCK_SIZE = 512;
+  private MiniDFSCluster cluster;
+
+  @After
+  public void tearDown() {
+    shutdown();
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes)
+      throws IOException {
+    shutdown();
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+    /*
+     * Lower the DN heartbeat, DF rate, and recheck interval to one second
+     * so state about failures and datanode death propagates faster.
+     */
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        1000);
+
+    MiniDFSNNTopology nnTopology =
+        MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(nnTopology)
+        .numDataNodes(numDataNodes)
+        .build();
+    cluster.waitActive();
+  }
+
+  private void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(Path path, int numBlocks)
+      throws IOException, InterruptedException, TimeoutException {
+    final short replicateFactor = 1;
+    createFile(path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(Path path, int numBlocks, short replicateFactor)
+      throws IOException, InterruptedException, TimeoutException {
+    createFile(0, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks)
+      throws IOException, InterruptedException, TimeoutException {
+    final short replicateFactor = 1;
+    createFile(fsIdx, path, numBlocks, replicateFactor);
+  }
+
+  private void createFile(int fsIdx, Path path, int numBlocks,
+      short replicateFactor)
+      throws IOException, TimeoutException, InterruptedException {
+    final int seed = 0;
+    final DistributedFileSystem fs = cluster.getFileSystem(fsIdx);
+    DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks,
+        replicateFactor, seed);
+    DFSTestUtil.waitReplication(fs, path, replicateFactor);
+  }
+
+  /**
+   * Verify whether a file has enough content.
+   */
+  private static void verifyFileLength(FileSystem fs, Path path, int numBlocks)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    assertEquals(numBlocks * BLOCK_SIZE, status.getLen());
+  }
+
+  /** Return the number of replicas for a given block in the file. */
+  private static int getNumReplicas(FileSystem fs, Path file,
+      int blockIdx) throws IOException {
+    BlockLocation locs[] = fs.getFileBlockLocations(file, 0, Long.MAX_VALUE);
+    return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length;
+  }
+
+  /**
+   * Wait the block to have the exact number of replicas as expected.
+   */
+  private static void waitReplication(FileSystem fs, Path file, int blockIdx,
+      int numReplicas)
+      throws IOException, TimeoutException, InterruptedException {
+    int attempts = 50;  // Wait 5 seconds.
+    while (attempts > 0) {
+      if (getNumReplicas(fs, file, blockIdx) == numReplicas) {
+        return;
+      }
+      Thread.sleep(100);
+      attempts--;
+    }
+    throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block"
+        + " of " + file + " to have " + numReplicas + " replicas.");
+  }
+
+  /** Parses data dirs from DataNode's configuration. */
+  private static Collection<String> getDataDirs(DataNode datanode) {
+    return datanode.getConf().getTrimmedStringCollection(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+  }
+
+  @Test
+  public void testParseChangedVolumes() throws IOException {
+    startDFSCluster(1, 1);
+    DataNode dn = cluster.getDataNodes().get(0);
+    Configuration conf = dn.getConf();
+
+    String oldPaths = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+    List<StorageLocation> oldLocations = new ArrayList<StorageLocation>();
+    for (String path : oldPaths.split(",")) {
+      oldLocations.add(StorageLocation.parse(path));
+    }
+    assertFalse(oldLocations.isEmpty());
+
+    String newPaths = "/foo/path1,/foo/path2";
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
+
+    DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
+    List<StorageLocation> newVolumes = changedVolumes.newLocations;
+    assertEquals(2, newVolumes.size());
+    assertEquals("/foo/path1", newVolumes.get(0).getFile().getAbsolutePath());
+    assertEquals("/foo/path2", newVolumes.get(1).getFile().getAbsolutePath());
+
+    List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
+    assertEquals(oldLocations.size(), removedVolumes.size());
+    for (int i = 0; i < removedVolumes.size(); i++) {
+      assertEquals(oldLocations.get(i).getFile(),
+          removedVolumes.get(i).getFile());
+    }
+  }
+
+  @Test
+  public void testParseChangedVolumesFailures() throws IOException {
+    startDFSCluster(1, 1);
+    DataNode dn = cluster.getDataNodes().get(0);
+    Configuration conf = dn.getConf();
+    try {
+      conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
+      dn.parseChangedVolumes();
+      fail("Should throw IOException: empty inputs.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("No directory is specified.", e);
+    }
+  }
+
+  /** Add volumes to the first DataNode. */
+  private void addVolumes(int numNewVolumes) throws ReconfigurationException {
+    File dataDir = new File(cluster.getDataDirectory());
+    DataNode dn = cluster.getDataNodes().get(0);  // First DataNode.
+    Configuration conf = dn.getConf();
+    String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY);
+
+    List<File> newVolumeDirs = new ArrayList<File>();
+    StringBuilder newDataDirBuf = new StringBuilder(oldDataDir);
+    int startIdx = oldDataDir.split(",").length + 1;
+    // Find the first available (non-taken) directory name for data volume.
+    while (true) {
+      File volumeDir = new File(dataDir, "data" + startIdx);
+      if (!volumeDir.exists()) {
+        break;
+      }
+      startIdx++;
+    }
+    for (int i = startIdx; i < startIdx + numNewVolumes; i++) {
+      File volumeDir = new File(dataDir, "data" + String.valueOf(i));
+      newVolumeDirs.add(volumeDir);
+      volumeDir.mkdirs();
+      newDataDirBuf.append(",");
+      newDataDirBuf.append(volumeDir.toURI());
+    }
+
+    String newDataDir = newDataDirBuf.toString();
+    dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
+    assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
+
+    // Check that all newly created volumes are appropriately formatted.
+    for (File volumeDir : newVolumeDirs) {
+      File curDir = new File(volumeDir, "current");
+      assertTrue(curDir.exists());
+      assertTrue(curDir.isDirectory());
+    }
+  }
+
+  private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
+    List<List<Integer>> results = new ArrayList<List<Integer>>();
+    final String bpid = cluster.getNamesystem(namesystemIdx).getBlockPoolId();
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) {
+      List<Integer> numBlocksPerDN = new ArrayList<Integer>();
+      for (BlockListAsLongs blocks : datanodeReport.values()) {
+        numBlocksPerDN.add(blocks.getNumberOfBlocks());
+      }
+      results.add(numBlocksPerDN);
+    }
+    return results;
+  }
+
+  /**
+   * Test adding one volume on a running MiniDFSCluster with only one NameNode.
+   */
+  @Test
+  public void testAddOneNewVolume()
+      throws IOException, ReconfigurationException,
+      InterruptedException, TimeoutException {
+    startDFSCluster(1, 1);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    final int numBlocks = 10;
+
+    addVolumes(1);
+
+    Path testFile = new Path("/test");
+    createFile(testFile, numBlocks);
+
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals(1, blockReports.size());  // 1 DataNode
+    assertEquals(3, blockReports.get(0).size());  // 3 volumes
+
+    // FSVolumeList uses Round-Robin block chooser by default. Thus the new
+    // blocks should be evenly located in all volumes.
+    int minNumBlocks = Integer.MAX_VALUE;
+    int maxNumBlocks = Integer.MIN_VALUE;
+    for (BlockListAsLongs blockList : blockReports.get(0).values()) {
+      minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
+      maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
+    }
+    assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1);
+    verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
+  }
+
+  @Test(timeout = 60000)
+  public void testAddVolumesDuringWrite()
+      throws IOException, InterruptedException, TimeoutException,
+      ReconfigurationException {
+    startDFSCluster(1, 1);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    Path testFile = new Path("/test");
+    createFile(testFile, 4);  // Each volume has 2 blocks.
+
+    addVolumes(2);
+
+    // Continue to write the same file, thus the new volumes will have blocks.
+    DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
+    verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
+    // After appending data, there should be [2, 2, 4, 4] blocks in each volume
+    // respectively.
+    List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
+
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals(1, blockReports.size());  // 1 DataNode
+    assertEquals(4, blockReports.get(0).size());  // 4 volumes
+    Map<DatanodeStorage, BlockListAsLongs> dnReport =
+        blockReports.get(0);
+    List<Integer> actualNumBlocks = new ArrayList<Integer>();
+    for (BlockListAsLongs blockList : dnReport.values()) {
+      actualNumBlocks.add(blockList.getNumberOfBlocks());
+    }
+    Collections.sort(actualNumBlocks);
+    assertEquals(expectedNumBlocks, actualNumBlocks);
+  }
+
+  @Test
+  public void testAddVolumesToFederationNN()
+      throws IOException, TimeoutException, InterruptedException,
+      ReconfigurationException {
+    // Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2
+    // volumes.
+    final int numNameNodes = 2;
+    final int numDataNodes = 1;
+    startDFSCluster(numNameNodes, numDataNodes);
+    Path testFile = new Path("/test");
+    // Create a file on the first namespace with 4 blocks.
+    createFile(0, testFile, 4);
+    // Create a file on the second namespace with 4 blocks.
+    createFile(1, testFile, 4);
+
+    // Add 2 volumes to the first DataNode.
+    final int numNewVolumes = 2;
+    addVolumes(numNewVolumes);
+
+    // Append to the file on the first namespace.
+    DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8);
+
+    List<List<Integer>> actualNumBlocks = getNumBlocksReport(0);
+    assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size());
+    List<Integer> blocksOnFirstDN = actualNumBlocks.get(0);
+    Collections.sort(blocksOnFirstDN);
+    assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN);
+
+    // Verify the second namespace also has the new volumes and they are empty.
+    actualNumBlocks = getNumBlocksReport(1);
+    assertEquals(4, actualNumBlocks.get(0).size());
+    assertEquals(numNewVolumes,
+        Collections.frequency(actualNumBlocks.get(0), 0));
+  }
+
+  @Test
+  public void testRemoveOneVolume()
+      throws ReconfigurationException, InterruptedException, TimeoutException,
+      IOException {
+    startDFSCluster(1, 1);
+    final short replFactor = 1;
+    Path testFile = new Path("/test");
+    createFile(testFile, 10, replFactor);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    Collection<String> oldDirs = getDataDirs(dn);
+    String newDirs = oldDirs.iterator().next();  // Keep the first volume.
+    dn.reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+    dn.scheduleAllBlockReport(0);
+
+    try {
+      DFSTestUtil.readFile(cluster.getFileSystem(), testFile);
+      fail("Expect to throw BlockMissingException.");
+    } catch (BlockMissingException e) {
+      GenericTestUtils.assertExceptionContains("Could not obtain block", e);
+    }
+
+    Path newFile = new Path("/newFile");
+    createFile(newFile, 6);
+
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(bpid);
+    assertEquals((int)replFactor, blockReports.size());
+
+    BlockListAsLongs blocksForVolume1 =
+        blockReports.get(0).values().iterator().next();
+    // The first volume has half of the testFile and full of newFile.
+    assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks());
+  }
+
+  @Test
+  public void testReplicatingAfterRemoveVolume()
+      throws InterruptedException, TimeoutException, IOException,
+      ReconfigurationException {
+    startDFSCluster(1, 2);
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    final short replFactor = 2;
+    Path testFile = new Path("/test");
+    createFile(testFile, 4, replFactor);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    Collection<String> oldDirs = getDataDirs(dn);
+    String newDirs = oldDirs.iterator().next();  // Keep the first volume.
+    dn.reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+
+    // Force DataNode to report missing blocks.
+    dn.scheduleAllBlockReport(0);
+    DataNodeTestUtils.triggerDeletionReport(dn);
+
+    // The 2nd block only has 1 replica due to the removed data volume.
+    waitReplication(fs, testFile, 1, 1);
+
+    // Wait NameNode to replica missing blocks.
+    DFSTestUtil.waitReplication(fs, testFile, replFactor);
+  }
+}

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -40,7 +40,10 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -117,6 +120,7 @@ public class TestFsDatasetImpl {
     final int numExistingVolumes = dataset.getVolumes().size();
     final int totalVolumes = numNewVolumes + numExistingVolumes;
     List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
+    Set<String> expectedVolumes = new HashSet<String>();
     for (int i = 0; i < numNewVolumes; i++) {
       String path = BASE_DIR + "/newData" + i;
       newLocations.add(StorageLocation.parse(path));
@@ -125,13 +129,15 @@ public class TestFsDatasetImpl {
     }
     when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
 
-    dataset.addVolumes(newLocations);
+    dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
     assertEquals(totalVolumes, dataset.getVolumes().size());
     assertEquals(totalVolumes, dataset.storageMap.size());
+
+    Set<String> actualVolumes = new HashSet<String>();
     for (int i = 0; i < numNewVolumes; i++) {
-      assertEquals(newLocations.get(i).getFile().getPath(),
-          dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
+      dataset.getVolumes().get(numExistingVolumes + i).getBasePath();
     }
+    assertEquals(actualVolumes, expectedVolumes);
   }
 
   @Test