Quellcode durchsuchen

HADOOP-2992. Distributed Upgrade framework works correctly with
more than one upgrade object. (Konstantin Shvachko via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@637307 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur vor 17 Jahren
Ursprung
Commit
9c5fab1273

+ 3 - 0
CHANGES.txt

@@ -222,6 +222,9 @@ Trunk (unreleased changes)
     minidfscluster sometimes creates datanodes with ports that are
     minidfscluster sometimes creates datanodes with ports that are
     different from their original instance. (dhruba)
     different from their original instance. (dhruba)
 
 
+    HADOOP-2992. Distributed Upgrade framework works correctly with
+    more than one upgrade object.  (Konstantin Shvachko via dhruba)
+
 Release 0.16.1 - 2008-03-13
 Release 0.16.1 - 2008-03-13
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 26 - 21
src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java

@@ -47,8 +47,9 @@ class UpgradeManagerDatanode extends UpgradeManager {
   void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
   void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
     if( ! super.initializeUpgrade())
     if( ! super.initializeUpgrade())
       return; // distr upgrade is not needed
       return; // distr upgrade is not needed
-    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
-        + getUpgradeVersion() + " to current LV " 
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
+        + dataNode.dnRegistration.getName() 
+        + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is initialized.");
         + FSConstants.LAYOUT_VERSION + " is initialized.");
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     curUO.setDatanode(dataNode);
     curUO.setDatanode(dataNode);
@@ -72,15 +73,21 @@ class UpgradeManagerDatanode extends UpgradeManager {
       return true;
       return true;
     }
     }
     if(broadcastCommand != null) {
     if(broadcastCommand != null) {
-      // the upgrade has been finished by this data-node,
-      // but the cluster is still running it, 
-      // reply with the broadcast command
-      assert currentUpgrades == null : 
-        "UpgradeManagerDatanode.currentUpgrades is not null.";
-      assert upgradeDaemon == null : 
-        "UpgradeManagerDatanode.upgradeDaemon is not null.";
-      dataNode.namenode.processUpgradeCommand(broadcastCommand);
-      return true;
+      if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
+        // stop broadcasting, the cluster moved on
+        // start upgrade for the next version
+        broadcastCommand = null;
+      } else {
+        // the upgrade has been finished by this data-node,
+        // but the cluster is still running it, 
+        // reply with the broadcast command
+        assert currentUpgrades == null : 
+          "UpgradeManagerDatanode.currentUpgrades is not null.";
+        assert upgradeDaemon == null : 
+          "UpgradeManagerDatanode.upgradeDaemon is not null.";
+        dataNode.namenode.processUpgradeCommand(broadcastCommand);
+        return true;
+      }
     }
     }
     if(currentUpgrades == null)
     if(currentUpgrades == null)
       currentUpgrades = getDistributedUpgrades();
       currentUpgrades = getDistributedUpgrades();
@@ -91,18 +98,15 @@ class UpgradeManagerDatanode extends UpgradeManager {
           + "The upgrade object is not defined.");
           + "The upgrade object is not defined.");
       return false;
       return false;
     }
     }
-    if(currentUpgrades.size() > 1)
-      throw new IOException(
-          "More than one distributed upgrade objects registered for version " 
-          + getUpgradeVersion());
     upgradeState = true;
     upgradeState = true;
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     curUO.setDatanode(dataNode);
     curUO.setDatanode(dataNode);
     curUO.startUpgrade();
     curUO.startUpgrade();
     upgradeDaemon = new Daemon(curUO);
     upgradeDaemon = new Daemon(curUO);
     upgradeDaemon.start();
     upgradeDaemon.start();
-    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
-        + getUpgradeVersion() + " to current LV " 
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
+        + dataNode.dnRegistration.getName() 
+        + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is started.");
         + FSConstants.LAYOUT_VERSION + " is started.");
     return true;
     return true;
   }
   }
@@ -116,8 +120,8 @@ class UpgradeManagerDatanode extends UpgradeManager {
     if(startUpgrade()) // upgrade started
     if(startUpgrade()) // upgrade started
       return;
       return;
     throw new IOException(
     throw new IOException(
-        "Distributed upgrade for DataNode version " 
-        + getUpgradeVersion() + " to current LV " 
+        "Distributed upgrade for DataNode " + dataNode.dnRegistration.getName() 
+        + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " cannot be started. "
         + FSConstants.LAYOUT_VERSION + " cannot be started. "
         + "The upgrade object is not defined.");
         + "The upgrade object is not defined.");
   }
   }
@@ -130,8 +134,9 @@ class UpgradeManagerDatanode extends UpgradeManager {
     upgradeState = false;
     upgradeState = false;
     currentUpgrades = null;
     currentUpgrades = null;
     upgradeDaemon = null;
     upgradeDaemon = null;
-    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
-        + getUpgradeVersion() + " to current LV " 
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
+        + dataNode.dnRegistration.getName() 
+        + " version " + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is complete.");
         + FSConstants.LAYOUT_VERSION + " is complete.");
   }
   }
 
 

+ 3 - 3
src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java

@@ -81,6 +81,9 @@ class UpgradeManagerNamenode extends UpgradeManager {
     }
     }
     // current upgrade is done
     // current upgrade is done
     curUO.completeUpgrade();
     curUO.completeUpgrade();
+    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
+        + curUO.getVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is complete.");
     // proceede with the next one
     // proceede with the next one
     currentUpgrades.remove(curUO);
     currentUpgrades.remove(curUO);
     if(currentUpgrades.isEmpty()) { // all upgrades are done
     if(currentUpgrades.isEmpty()) { // all upgrades are done
@@ -93,9 +96,6 @@ class UpgradeManagerNamenode extends UpgradeManager {
   }
   }
 
 
   synchronized void completeUpgrade() throws IOException {
   synchronized void completeUpgrade() throws IOException {
-    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
-        + getUpgradeVersion() + " to current LV " 
-        + FSConstants.LAYOUT_VERSION + " is complete.");
     // set and write new upgrade state into disk
     // set and write new upgrade state into disk
     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
     FSNamesystem.getFSNamesystem().getFSImage().writeAll();
     FSNamesystem.getFSNamesystem().getFSImage().writeAll();

+ 1 - 1
src/java/org/apache/hadoop/dfs/UpgradeObject.java

@@ -45,7 +45,7 @@ abstract class UpgradeObject implements Upgradeable {
 
 
   public int compareTo(Upgradeable o) {
   public int compareTo(Upgradeable o) {
     if(this.getVersion() != o.getVersion())
     if(this.getVersion() != o.getVersion())
-      return (getVersion() < o.getVersion() ? -1 : 1);
+      return (getVersion() > o.getVersion() ? -1 : 1);
     int res = this.getType().toString().compareTo(o.getType().toString());
     int res = this.getType().toString().compareTo(o.getType().toString());
     if(res != 0)
     if(res != 0)
       return res;
       return res;

+ 8 - 6
src/test/org/apache/hadoop/dfs/TestDFSUpgradeFromImage.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +60,10 @@ public class TestDFSUpgradeFromImage extends TestCase {
   boolean printChecksum = false;
   boolean printChecksum = false;
   
   
   protected void setUp() throws IOException {
   protected void setUp() throws IOException {
+    unpackStorage();
+  }
+
+  void unpackStorage() throws IOException {
     String tarFile = System.getProperty("test.cache.data") + 
     String tarFile = System.getProperty("test.cache.data") + 
                      "/hadoop-12-dfs-dir.tgz";
                      "/hadoop-12-dfs-dir.tgz";
     String dataDir = System.getProperty("test.build.data");
     String dataDir = System.getProperty("test.build.data");
@@ -178,13 +181,12 @@ public class TestDFSUpgradeFromImage extends TestCase {
     MiniDFSCluster cluster = null;
     MiniDFSCluster cluster = null;
     try {
     try {
       Configuration conf = new Configuration();
       Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(0, conf, numDataNodes, false,
-                                                  true, StartupOption.UPGRADE,
-                                                  null);
+      conf.setInt("dfs.datanode.scan.period.hours", -1); // block scanning off
+      cluster = new MiniDFSCluster(0, conf, numDataNodes, false, true,
+                                   StartupOption.UPGRADE, null);
       cluster.waitActive();
       cluster.waitActive();
       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
-                                                    cluster.getNameNodePort()),
-                                          conf);
+                                           cluster.getNameNodePort()), conf);
       //Safemode will be off only after upgrade is complete. Wait for it.
       //Safemode will be off only after upgrade is complete. Wait for it.
       while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
       while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
         LOG.info("Waiting for SafeMode to be OFF.");
         LOG.info("Waiting for SafeMode to be OFF.");

+ 77 - 34
src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java

@@ -17,15 +17,12 @@
 */
 */
 package org.apache.hadoop.dfs;
 package org.apache.hadoop.dfs;
 
 
-import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 
 
-import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
-import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
 import static org.apache.hadoop.dfs.FSConstants.LAYOUT_VERSION;
 import static org.apache.hadoop.dfs.FSConstants.LAYOUT_VERSION;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 
 
@@ -79,33 +76,30 @@ public class TestDistributedUpgrade extends TestCase {
   /**
   /**
    */
    */
   public void testDistributedUpgrade() throws Exception {
   public void testDistributedUpgrade() throws Exception {
-    File[] baseDirs;
     int numDirs = 1;
     int numDirs = 1;
-    UpgradeUtilities.initialize();
+    TestDFSUpgradeFromImage testImg = new TestDFSUpgradeFromImage();
+    testImg.unpackStorage();
+    int numDNs = testImg.numDataNodes;
 
 
     // register new upgrade objects (ignore all existing)
     // register new upgrade objects (ignore all existing)
     UpgradeObjectCollection.initialize();
     UpgradeObjectCollection.initialize();
-    UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Datanode());
-    UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Namenode());
+    UpgradeObjectCollection.registerUpgrade(new UO_Datanode1());
+    UpgradeObjectCollection.registerUpgrade(new UO_Namenode1());
+    UpgradeObjectCollection.registerUpgrade(new UO_Datanode2());
+    UpgradeObjectCollection.registerUpgrade(new UO_Namenode2());
+    UpgradeObjectCollection.registerUpgrade(new UO_Datanode3());
+    UpgradeObjectCollection.registerUpgrade(new UO_Namenode3());
 
 
-    conf = UpgradeUtilities.initializeStorageStateConf(numDirs, 
-                                                       new Configuration());
-    String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
-    String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
-    DFSAdmin dfsAdmin = new DFSAdmin();
-    dfsAdmin.setConf(conf);
-    String[] pars = {"-safemode", "wait"};
+    conf = new Configuration();
+    conf.setInt("dfs.datanode.scan.period.hours", -1); // block scanning off
 
 
     log("NameNode start in regular mode when dustributed upgrade is required", numDirs);
     log("NameNode start in regular mode when dustributed upgrade is required", numDirs);
-    baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-    UpgradeUtilities.createVersionFile(NAME_NODE, baseDirs,
-        new StorageInfo(LAYOUT_VERSION+2,
-                        UpgradeUtilities.getCurrentNamespaceID(cluster),
-                        UpgradeUtilities.getCurrentFsscTime(cluster)));
     startNameNodeShouldFail(StartupOption.REGULAR);
     startNameNodeShouldFail(StartupOption.REGULAR);
 
 
     log("Start NameNode only distributed upgrade", numDirs);
     log("Start NameNode only distributed upgrade", numDirs);
-    cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE);
+    // cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE);
+    cluster = new MiniDFSCluster(0, conf, 0, false, true,
+                                  StartupOption.UPGRADE, null);
     cluster.shutdown();
     cluster.shutdown();
 
 
     log("NameNode start in regular mode when dustributed upgrade has been started", numDirs);
     log("NameNode start in regular mode when dustributed upgrade has been started", numDirs);
@@ -115,19 +109,19 @@ public class TestDistributedUpgrade extends TestCase {
     startNameNodeShouldFail(StartupOption.ROLLBACK);
     startNameNodeShouldFail(StartupOption.ROLLBACK);
 
 
     log("Normal distributed upgrade for the cluster", numDirs);
     log("Normal distributed upgrade for the cluster", numDirs);
-    cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE);
-    UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-    cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
-    dfsAdmin.run(pars);
+    cluster = new MiniDFSCluster(0, conf, numDNs, false, true,
+                                  StartupOption.UPGRADE, null);
+    DFSAdmin dfsAdmin = new DFSAdmin();
+    dfsAdmin.setConf(conf);
+    dfsAdmin.run(new String[] {"-safemode", "wait"});
     cluster.shutdown();
     cluster.shutdown();
 
 
     // it should be ok to start in regular mode
     // it should be ok to start in regular mode
     log("NameCluster regular startup after the upgrade", numDirs);
     log("NameCluster regular startup after the upgrade", numDirs);
-    cluster = new MiniDFSCluster(conf, 0, StartupOption.REGULAR);
-    cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+    cluster = new MiniDFSCluster(0, conf, numDNs, false, true,
+                                  StartupOption.REGULAR, null);
+    cluster.waitActive();
     cluster.shutdown();
     cluster.shutdown();
-    UpgradeUtilities.createEmptyDirs(nameNodeDirs);
-    UpgradeUtilities.createEmptyDirs(dataNodeDirs);
   }
   }
 
 
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
@@ -139,9 +133,16 @@ public class TestDistributedUpgrade extends TestCase {
 /**
 /**
  * Upgrade object for data-node
  * Upgrade object for data-node
  */
  */
-class UpgradeObject_Test_Datanode extends UpgradeObjectDatanode {
+class UO_Datanode extends UpgradeObjectDatanode {
+  int version;
+
+  UO_Datanode(int v) {
+    this.status = (short)0;
+    version = v;
+  }
+
   public int getVersion() {
   public int getVersion() {
-    return LAYOUT_VERSION+1;
+    return version;
   }
   }
 
 
   public void doUpgrade() throws IOException {
   public void doUpgrade() throws IOException {
@@ -152,7 +153,6 @@ class UpgradeObject_Test_Datanode extends UpgradeObjectDatanode {
   }
   }
 
 
   public UpgradeCommand startUpgrade() throws IOException {
   public UpgradeCommand startUpgrade() throws IOException {
-    this.status = (short)0;
     return null;
     return null;
   }
   }
 }
 }
@@ -160,16 +160,23 @@ class UpgradeObject_Test_Datanode extends UpgradeObjectDatanode {
 /**
 /**
  * Upgrade object for name-node
  * Upgrade object for name-node
  */
  */
-class UpgradeObject_Test_Namenode extends UpgradeObjectNamenode {
+class UO_Namenode extends UpgradeObjectNamenode {
+  int version;
+
+  UO_Namenode(int v) {
+    status = (short)0;
+    version = v;
+  }
+
   public int getVersion() {
   public int getVersion() {
-    return LAYOUT_VERSION+1;
+    return version;
   }
   }
 
 
   synchronized public UpgradeCommand processUpgradeCommand(
   synchronized public UpgradeCommand processUpgradeCommand(
                                   UpgradeCommand command) throws IOException {
                                   UpgradeCommand command) throws IOException {
     switch(command.action) {
     switch(command.action) {
       case UpgradeCommand.UC_ACTION_REPORT_STATUS:
       case UpgradeCommand.UC_ACTION_REPORT_STATUS:
-        this.status += command.getCurrentStatus()/2;  // 2 reports needed
+        this.status += command.getCurrentStatus()/8;  // 4 reports needed
         break;
         break;
       default:
       default:
         this.status++;
         this.status++;
@@ -181,3 +188,39 @@ class UpgradeObject_Test_Namenode extends UpgradeObjectNamenode {
     return null;
     return null;
   }
   }
 }
 }
+
+class UO_Datanode1 extends UO_Datanode {
+  UO_Datanode1() {
+    super(LAYOUT_VERSION+1);
+  }
+}
+
+class UO_Namenode1 extends UO_Namenode {
+  UO_Namenode1() {
+    super(LAYOUT_VERSION+1);
+  }
+}
+
+class UO_Datanode2 extends UO_Datanode {
+  UO_Datanode2() {
+    super(LAYOUT_VERSION+2);
+  }
+}
+
+class UO_Namenode2 extends UO_Namenode {
+  UO_Namenode2() {
+    super(LAYOUT_VERSION+2);
+  }
+}
+
+class UO_Datanode3 extends UO_Datanode {
+  UO_Datanode3() {
+    super(LAYOUT_VERSION+3);
+  }
+}
+
+class UO_Namenode3 extends UO_Namenode {
+  UO_Namenode3() {
+    super(LAYOUT_VERSION+3);
+  }
+}