فهرست منبع

HDFS-17178: BootstrapStandby needs to handle RollingUpgrade (#6018)

dannytbecker 1 سال پیش
والد
کامیت
4652d22b91

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -219,6 +219,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>lz4-java</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -348,10 +348,12 @@ public class PBHelper {
 
   public static NamespaceInfo convert(NamespaceInfoProto info) {
     StorageInfoProto storage = info.getStorageInfo();
-    return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
+    NamespaceInfo nsInfo = new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
         info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
         info.getSoftwareVersion(), info.getCapabilities(),
         convert(info.getState()));
+    nsInfo.setStorageInfo(convert(storage, NodeType.NAME_NODE));
+    return nsInfo;
   }
 
   public static NamenodeCommand convert(NamenodeCommandProto cmd) {

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java

@@ -61,6 +61,12 @@ public interface HdfsServerConstants {
    */
   int NAMENODE_LAYOUT_VERSION
       = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
+  /**
+  * Current minimum compatible version for NameNode
+  * Please see {@link NameNodeLayoutVersion.Feature} on adding new layout version.
+  */
+  int MINIMUM_COMPATIBLE_NAMENODE_LAYOUT_VERSION
+      = NameNodeLayoutVersion.MINIMUM_COMPATIBLE_LAYOUT_VERSION;
   /**
    * Path components that are reserved in HDFS.
    * <p>

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -1114,11 +1114,7 @@ public abstract class Storage extends StorageInfo {
   }
 
   public NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(
-        getNamespaceID(),
-        getClusterID(),
-        null,
-        getCTime());
+    return new NamespaceInfo(this);
   }
 
   /**

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -258,7 +258,7 @@ public class FSImage implements Closeable {
     if (startOpt == StartupOption.METADATAVERSION) {
       System.out.println("HDFS Image Version: " + layoutVersion);
       System.out.println("Software format version: " +
-        HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
+          storage.getServiceLayoutVersion());
       return false;
     }
 
@@ -269,11 +269,11 @@ public class FSImage implements Closeable {
         && startOpt != StartupOption.UPGRADEONLY
         && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
         && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
-        && layoutVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
+        && layoutVersion != storage.getServiceLayoutVersion()) {
       throw new IOException(
           "\nFile system image contains an old layout version " 
           + storage.getLayoutVersion() + ".\nAn upgrade to version "
-          + HdfsServerConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+          + storage.getServiceLayoutVersion() + " is required.\n"
           + "Please restart NameNode with the \""
           + RollingUpgradeStartupOption.STARTED.getOptionString()
           + "\" option if a rolling upgrade is already started;"
@@ -462,7 +462,7 @@ public class FSImage implements Closeable {
     long oldCTime = storage.getCTime();
     storage.cTime = now();  // generate new cTime for the state
     int oldLV = storage.getLayoutVersion();
-    storage.layoutVersion = HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
+    storage.layoutVersion = storage.getServiceLayoutVersion();
     
     List<StorageDirectory> errorSDs =
       Collections.synchronizedList(new ArrayList<StorageDirectory>());
@@ -523,11 +523,11 @@ public class FSImage implements Closeable {
     boolean canRollback = false;
     FSImage prevState = new FSImage(conf);
     try {
-      prevState.getStorage().layoutVersion = HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
+      prevState.getStorage().layoutVersion = storage.getServiceLayoutVersion();
       for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
         StorageDirectory sd = it.next();
         if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
-            HdfsServerConstants.NAMENODE_LAYOUT_VERSION)) {
+            storage.getServiceLayoutVersion())) {
           continue;
         }
         LOG.info("Can perform rollback for " + sd);
@@ -538,7 +538,7 @@ public class FSImage implements Closeable {
         // If HA is enabled, check if the shared log can be rolled back as well.
         editLog.initJournalsForWrite();
         boolean canRollBackSharedEditLog = editLog.canRollBackSharedLog(
-            prevState.getStorage(), HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
+            prevState.getStorage(), storage.getServiceLayoutVersion());
         if (canRollBackSharedEditLog) {
           LOG.info("Can perform rollback for shared edit log.");
           canRollback = true;

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1898,9 +1898,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Version of @see #getNamespaceInfo() that is not protected by a lock.
    */
   NamespaceInfo unprotectedGetNamespaceInfo() {
-    return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
-        getClusterId(), getBlockPoolId(),
-        getFSImage().getStorage().getCTime(), getState());
+    return new NamespaceInfo(getFSImage().getStorage(), getState());
   }
 
   /**

+ 14 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -600,10 +600,17 @@ public class NNStorage extends Storage implements Closeable,
    * Format all available storage directories.
    */
   public void format(NamespaceInfo nsInfo) throws IOException {
+    format(nsInfo, false);
+  }
+
+  /**
+   * Format all available storage directories.
+   */
+  public void format(NamespaceInfo nsInfo, boolean isRollingUpgrade)
+      throws IOException {
     Preconditions.checkArgument(nsInfo.getLayoutVersion() == 0 ||
-        nsInfo.getLayoutVersion() ==
-            HdfsServerConstants.NAMENODE_LAYOUT_VERSION,
-        "Bad layout version: %s", nsInfo.getLayoutVersion());
+        nsInfo.getLayoutVersion() == getServiceLayoutVersion() ||
+        isRollingUpgrade, "Bad layout version: %s", nsInfo.getLayoutVersion());
     
     this.setStorageInfo(nsInfo);
     this.blockpoolID = nsInfo.getBlockPoolID();
@@ -621,7 +628,7 @@ public class NNStorage extends Storage implements Closeable,
   }
   
   public void format() throws IOException {
-    this.layoutVersion = HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
+    this.layoutVersion = getServiceLayoutVersion();
     for (Iterator<StorageDirectory> it =
                            dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -683,7 +690,7 @@ public class NNStorage extends Storage implements Closeable,
             "storage directory " + sd.getRoot().getAbsolutePath());
       }
       props.setProperty("layoutVersion",
-          Integer.toString(HdfsServerConstants.NAMENODE_LAYOUT_VERSION));
+          Integer.toString(getServiceLayoutVersion()));
     }
     setFieldsFromProperties(props, sd);
   }
@@ -706,7 +713,7 @@ public class NNStorage extends Storage implements Closeable,
    * This should only be used during upgrades.
    */
   String getDeprecatedProperty(String prop) {
-    assert getLayoutVersion() > HdfsServerConstants.NAMENODE_LAYOUT_VERSION :
+    assert getLayoutVersion() > getServiceLayoutVersion() :
       "getDeprecatedProperty should only be done when loading " +
       "storage from past versions during upgrade.";
     return deprecatedProperties.get(prop);
@@ -1133,11 +1140,7 @@ public class NNStorage extends Storage implements Closeable,
 
   @Override
   public NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(
-        getNamespaceID(),
-        getClusterID(),
-        getBlockPoolID(),
-        getCTime());
+    return new NamespaceInfo(this);
   }
 
   public String getNNDirectorySize() {

+ 37 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java

@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -170,6 +171,7 @@ public class BootstrapStandby implements Tool, Configurable {
     NamenodeProtocol proxy = null;
     NamespaceInfo nsInfo = null;
     boolean isUpgradeFinalized = false;
+    boolean isRollingUpgrade = false;
     RemoteNameNodeInfo proxyInfo = null;
     for (int i = 0; i < remoteNNs.size(); i++) {
       proxyInfo = remoteNNs.get(i);
@@ -180,8 +182,9 @@ public class BootstrapStandby implements Tool, Configurable {
         // bootstrapping the other NNs from that layout, it will only contact the single NN.
         // However, if there cluster is already running and you are adding a NN later (e.g.
         // replacing a failed NN), then this will bootstrap from any node in the cluster.
-        nsInfo = proxy.versionRequest();
+        nsInfo = getProxyNamespaceInfo(proxy);
         isUpgradeFinalized = proxy.isUpgradeFinalized();
+        isRollingUpgrade = proxy.isRollingUpgrade();
         break;
       } catch (IOException ioe) {
         LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
@@ -199,10 +202,17 @@ public class BootstrapStandby implements Tool, Configurable {
       return ERR_CODE_FAILED_CONNECT;
     }
 
-    if (!checkLayoutVersion(nsInfo)) {
-      LOG.error("Layout version on remote node (" + nsInfo.getLayoutVersion()
-          + ") does not match " + "this node's layout version ("
-          + HdfsServerConstants.NAMENODE_LAYOUT_VERSION + ")");
+    if (!checkLayoutVersion(nsInfo, isRollingUpgrade)) {
+      if(isRollingUpgrade) {
+        LOG.error("Layout version on remote node in rolling upgrade ({}, {})"
+            + " is not compatible based on minimum compatible version ({})",
+            nsInfo.getLayoutVersion(), proxyInfo.getIpcAddress(),
+            HdfsServerConstants.MINIMUM_COMPATIBLE_NAMENODE_LAYOUT_VERSION);
+      } else {
+        LOG.error("Layout version on remote node ({}) does not match this "
+            + "node's service layout version ({})", nsInfo.getLayoutVersion(),
+            nsInfo.getServiceLayoutVersion());
+      }
       return ERR_CODE_INVALID_VERSION;
     }
 
@@ -217,9 +227,11 @@ public class BootstrapStandby implements Tool, Configurable {
         "            Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
         "               Cluster ID: " + nsInfo.getClusterID() + "\n" +
         "           Layout version: " + nsInfo.getLayoutVersion() + "\n" +
+        "   Service Layout version: " + nsInfo.getServiceLayoutVersion() + "\n" +
         "       isUpgradeFinalized: " + isUpgradeFinalized + "\n" +
+        "         isRollingUpgrade: " + isRollingUpgrade + "\n" +
         "=====================================================");
-    
+
     NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
 
     if (!isUpgradeFinalized) {
@@ -231,7 +243,7 @@ public class BootstrapStandby implements Tool, Configurable {
       if (!doPreUpgrade(storage, nsInfo)) {
         return ERR_CODE_ALREADY_FORMATTED;
       }
-    } else if (!format(storage, nsInfo)) { // prompt the user to format storage
+    } else if (!format(storage, nsInfo, isRollingUpgrade)) { // prompt the user to format storage
       return ERR_CODE_ALREADY_FORMATTED;
     }
 
@@ -254,20 +266,26 @@ public class BootstrapStandby implements Tool, Configurable {
     return 0;
   }
 
+  @VisibleForTesting
+  public NamespaceInfo getProxyNamespaceInfo(NamenodeProtocol proxy)
+      throws IOException {
+    return proxy.versionRequest();
+  }
+
   /**
    * Iterate over all the storage directories, checking if it should be
    * formatted. Format the storage if necessary and allowed by the user.
    * @return True if formatting is processed
    */
-  private boolean format(NNStorage storage, NamespaceInfo nsInfo)
-      throws IOException {
+  private boolean format(NNStorage storage, NamespaceInfo nsInfo,
+      boolean isRollingUpgrade) throws IOException {
     // Check with the user before blowing away data.
     if (!Storage.confirmFormat(storage.dirIterable(null), force, interactive)) {
       storage.close();
       return false;
     } else {
       // Format the storage (writes VERSION file)
-      storage.format(nsInfo);
+      storage.format(nsInfo, isRollingUpgrade);
       return true;
     }
   }
@@ -302,7 +320,7 @@ public class BootstrapStandby implements Tool, Configurable {
     // format the storage. Although this format is done through the new
     // software, since in HA setup the SBN is rolled back through
     // "-bootstrapStandby", we should still be fine.
-    if (!isFormatted && !format(storage, nsInfo)) {
+    if (!isFormatted && !format(storage, nsInfo, false)) {
       return false;
     }
 
@@ -405,8 +423,14 @@ public class BootstrapStandby implements Tool, Configurable {
     }
   }
 
-  private boolean checkLayoutVersion(NamespaceInfo nsInfo) throws IOException {
-    return (nsInfo.getLayoutVersion() == HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
+  private boolean checkLayoutVersion(NamespaceInfo nsInfo, boolean isRollingUpgrade) {
+    if (isRollingUpgrade) {
+      // During a rolling upgrade the service layout versions may be different,
+      // but we should check that the layout version being sent is compatible
+      return nsInfo.getLayoutVersion() <=
+          HdfsServerConstants.MINIMUM_COMPATIBLE_NAMENODE_LAYOUT_VERSION;
+    }
+    return nsInfo.getLayoutVersion() == nsInfo.getServiceLayoutVersion();
   }
 
   private void parseConfAndFindOtherNN() throws IOException {

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java

@@ -110,6 +110,29 @@ public class NamespaceInfo extends StorageInfo {
     this.capabilities = capabilities;
   }
 
+  public NamespaceInfo(StorageInfo storage) {
+    super(storage);
+    if (storage instanceof NamespaceInfo) {
+      this.capabilities = ((NamespaceInfo)storage).capabilities;
+      this.blockPoolID = ((NamespaceInfo)storage).blockPoolID;
+    } else {
+      this.capabilities = CAPABILITIES_SUPPORTED;
+    }
+    this.buildVersion = Storage.getBuildVersion();
+    this.softwareVersion = VersionInfo.getVersion();
+    if (storage instanceof NNStorage) {
+      this.blockPoolID = ((NNStorage)storage).getBlockPoolID();
+    } else {
+      this.blockPoolID = null;
+    }
+
+  }
+
+  public NamespaceInfo(StorageInfo storage, HAServiceState st) {
+    this(storage);
+    this.state = st;
+  }
+
   public NamespaceInfo(int nsID, String clusterID, String bpID, 
       long cT) {
     this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

@@ -609,9 +609,10 @@ public abstract class FSImageTestUtil {
   
   public static void assertNNFilesMatch(MiniDFSCluster cluster) throws Exception {
     List<File> curDirs = Lists.newArrayList();
-    curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
-    curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
-    
+    for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+      curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, i));
+    }
+
     // Ignore seen_txid file, since the newly bootstrapped standby
     // will have a higher seen_txid than the one it bootstrapped from.
     Set<String> ignoredFiles = ImmutableSet.of("seen_txid");

+ 111 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java

@@ -17,9 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby.ERR_CODE_INVALID_VERSION;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,6 +34,12 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import java.util.function.Supplier;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -172,6 +184,103 @@ public class TestBootstrapStandby {
     restartNameNodesFromIndex(1);
   }
 
+  /**
+   * Test for downloading a checkpoint while the cluster is in rolling upgrade.
+   */
+  @Test
+  public void testRollingUpgradeBootstrapStandby() throws Exception {
+    removeStandbyNameDirs();
+
+    int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
+
+    DistributedFileSystem fs = cluster.getFileSystem(0);
+    NameNodeAdapter.enterSafeMode(nn0, false);
+    NameNodeAdapter.saveNamespace(nn0);
+    NameNodeAdapter.leaveSafeMode(nn0);
+
+    // Setup BootstrapStandby to think it is a future NameNode version
+    BootstrapStandby bs = spy(new BootstrapStandby());
+    doAnswer(nsInfo ->  {
+      NamespaceInfo nsInfoSpy = (NamespaceInfo) spy(nsInfo.callRealMethod());
+      doReturn(futureVersion).when(nsInfoSpy).getServiceLayoutVersion();
+      return nsInfoSpy;
+    }).when(bs).getProxyNamespaceInfo(any());
+
+    // BootstrapStandby should fail if the node has a future version
+    // and the cluster isn't in rolling upgrade
+    bs.setConf(cluster.getConfiguration(1));
+    assertEquals("BootstrapStandby should return ERR_CODE_INVALID_VERSION",
+        ERR_CODE_INVALID_VERSION, bs.run(new String[]{"-force"}));
+
+    // Start rolling upgrade
+    fs.rollingUpgrade(RollingUpgradeAction.PREPARE);
+    nn0 = spy(nn0);
+
+    // Make nn0 think it is a future version
+    doAnswer(fsImage -> {
+      FSImage fsImageSpy = (FSImage) spy(fsImage.callRealMethod());
+      doAnswer(storage -> {
+        NNStorage storageSpy = (NNStorage) spy(storage.callRealMethod());
+        doReturn(futureVersion).when(storageSpy).getServiceLayoutVersion();
+        return storageSpy;
+      }).when(fsImageSpy).getStorage();
+      return fsImageSpy;
+    }).when(nn0).getFSImage();
+
+    // Roll edit logs a few times to inflate txid
+    nn0.getRpcServer().rollEditLog();
+    nn0.getRpcServer().rollEditLog();
+    // Make checkpoint
+    NameNodeAdapter.enterSafeMode(nn0, false);
+    NameNodeAdapter.saveNamespace(nn0);
+    NameNodeAdapter.leaveSafeMode(nn0);
+
+    long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
+        .getFSImage().getMostRecentCheckpointTxId();
+    assertEquals(11, expectedCheckpointTxId);
+
+    for (int i = 1; i < maxNNCount; i++) {
+      // BootstrapStandby on Standby NameNode
+      bs.setConf(cluster.getConfiguration(i));
+      bs.run(new String[]{"-force"});
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
+          ImmutableList.of((int) expectedCheckpointTxId));
+    }
+
+    // Make sure the bootstrap was successful
+    FSImageTestUtil.assertNNFilesMatch(cluster);
+
+    // We should now be able to start the standby successfully
+    restartNameNodesFromIndex(1, "-rollingUpgrade", "started");
+
+    // Cleanup standby dirs
+    for (int i = 1; i < maxNNCount; i++) {
+      cluster.shutdownNameNode(i);
+    }
+    removeStandbyNameDirs();
+
+    // BootstrapStandby should fail if it thinks it's version is future version
+    // before rolling upgrade is finalized;
+    doAnswer(nsInfo -> {
+      NamespaceInfo nsInfoSpy = (NamespaceInfo) spy(nsInfo.callRealMethod());
+      nsInfoSpy.layoutVersion = futureVersion;
+      doReturn(futureVersion).when(nsInfoSpy).getServiceLayoutVersion();
+      return nsInfoSpy;
+    }).when(bs).getProxyNamespaceInfo(any());
+
+    for (int i = 1; i < maxNNCount; i++) {
+      bs.setConf(cluster.getConfiguration(i));
+      assertThrows("BootstrapStandby should fail the image transfer request",
+          HttpGetFailedException.class, () -> {
+            try {
+              bs.run(new String[]{"-force"});
+            } catch (RuntimeException e) {
+              throw e.getCause();
+            }
+          });
+    }
+  }
+
   /**
    * Test for the case where the shared edits dir doesn't have
    * all of the recent edit logs.
@@ -336,10 +445,10 @@ public class TestBootstrapStandby {
     }
   }
 
-  private void restartNameNodesFromIndex(int start) throws IOException {
+  private void restartNameNodesFromIndex(int start, String... args) throws IOException {
     for (int i = start; i < maxNNCount; i++) {
       // We should now be able to start the standby successfully.
-      cluster.restartNameNode(i, false);
+      cluster.restartNameNode(i, false, args);
     }
 
     cluster.waitClusterUp();