Kaynağa Gözat

HADOOP-6772. svn merge -c 948237 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.21@948240 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Boudnik 15 yıl önce
ebeveyn
işleme
f7f0949cf2

+ 2 - 0
CHANGES.txt

@@ -294,6 +294,8 @@ Release 0.21.0 - Unreleased
 
   IMPROVEMENTS
 
+    HADOOP-6772. Utilities for system tests specific. (Vinay Thota via cos)
+
     HADOOP-6771. Herriot's artifact id for Maven deployment should be set to
     hadoop-core-instrumented (cos)
 

+ 184 - 0
src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java

@@ -19,10 +19,14 @@
 package org.apache.hadoop.test.system;
 
 import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Enumeration;
+import java.util.Hashtable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +45,10 @@ public abstract class AbstractDaemonCluster {
   protected ClusterProcessManager clusterManager;
   private Map<Enum<?>, List<AbstractDaemonClient>> daemons = 
     new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
+  private String newConfDir = null;  
+  private static final  String CONF_HADOOP_LOCAL_DIR =
+      "test.system.hdrc.hadoop.local.confdir"; 
+  private final static Object waitLock = new Object();
   
   /**
    * Constructor to create a cluster client.<br/>
@@ -288,5 +296,181 @@ public abstract class AbstractDaemonCluster {
       }
     }
   }
+  
+  /**
+   * It's a local folder where the config file stores temporarily
+   * while serializing the object.
+   * @return String temporary local folder path for configuration.
+   */
+  private String getHadoopLocalConfDir() {
+    String hadoopLocalConfDir = conf.get(CONF_HADOOP_LOCAL_DIR);
+    if (hadoopLocalConfDir == null || hadoopLocalConfDir.isEmpty()) {
+      LOG.error("No configuration "
+          + "for the CONF_HADOOP_LOCAL_DIR passed");
+      throw new IllegalArgumentException(
+          "No Configuration passed for hadoop conf local directory");
+    }
+    return hadoopLocalConfDir;
+  }
+
+  /**
+   * It uses to restart the cluster with new configuration at runtime.<br/>
+   * @param props attributes for new configuration.
+   * @param configFile configuration file.
+   * @throws IOException if an I/O error occurs.
+   */
+  public void restartClusterWithNewConfig(Hashtable<String,Long> props, 
+      String configFile) throws IOException {
+
+    String mapredConf = null;
+    String localDirPath = null;
+    File localFolderObj = null;
+    File xmlFileObj = null;
+    String confXMLFile = null;
+    Configuration initConf = new Configuration(getConf());
+    Enumeration<String> e = props.keys();
+    while (e.hasMoreElements()) {
+      String propKey = e.nextElement();
+      Long propValue = props.get(propKey);
+      initConf.setLong(propKey,propValue.longValue());
+    }
+
+    localDirPath = getHadoopLocalConfDir();
+    localFolderObj = new File(localDirPath);
+    if (!localFolderObj.exists()) {
+      localFolderObj.mkdir();
+    }
+    confXMLFile = localDirPath + File.separator + configFile;
+    xmlFileObj = new File(confXMLFile);
+    initConf.writeXml(new FileOutputStream(xmlFileObj));
+    newConfDir = clusterManager.pushConfig(localDirPath);
+    stop();
+    waitForClusterToStop();
+    clusterManager.start(newConfDir);
+    waitForClusterToStart();
+    localFolderObj.delete();
+  }
+  
+  /**
+   * It uses to restart the cluster with default configuration.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void restart() throws 
+      IOException {
+    stop();
+    waitForClusterToStop();
+    start();
+    waitForClusterToStart();
+  }
+  
+  /**
+   * It uses to wait until the cluster is stopped.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void waitForClusterToStop() throws 
+      IOException {
+    List<Thread> chkDaemonStop = new ArrayList<Thread>();
+    for (List<AbstractDaemonClient> set : daemons.values()) {	  
+      for (AbstractDaemonClient daemon : set) {
+        DaemonStopThread dmStop = new DaemonStopThread(daemon);
+        chkDaemonStop.add(dmStop);
+        dmStop.start();
+      }
+    }
+
+    for (Thread daemonThread : chkDaemonStop){
+      try {
+        daemonThread.join();
+      } catch(InterruptedException intExp) {
+         LOG.warn("Interrupted while thread is joining." + intExp.getMessage());
+      }
+    }
+  }
+ 
+  /**
+   * It uses to wait until the cluster is started.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void  waitForClusterToStart() throws 
+      IOException {
+    List<Thread> chkDaemonStart = new ArrayList<Thread>();
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        DaemonStartThread dmStart = new DaemonStartThread(daemon);
+        chkDaemonStart.add(dmStart);;
+        dmStart.start();
+      }
+    }
+
+    for (Thread daemonThread : chkDaemonStart){
+      try {
+        daemonThread.join();
+      } catch(InterruptedException intExp) {
+        LOG.warn("Interrupted while thread is joining" + intExp.getMessage());
+      }
+    }
+  }
+
+  /**
+   * It waits for specified amount of time.
+   * @param duration time in milliseconds.
+   * @throws InterruptedException if any thread interrupted the current
+   * thread while it is waiting for a notification.
+   */
+  public void waitFor(long duration) {
+    try {
+      synchronized (waitLock) {
+        waitLock.wait(duration);
+      }
+    } catch (InterruptedException intExp) {
+       LOG.warn("Interrrupeted while thread is waiting" + intExp.getMessage());
+    }
+  }
+  
+  class DaemonStartThread extends Thread {
+    private AbstractDaemonClient daemon;
+
+    public DaemonStartThread(AbstractDaemonClient daemon) {
+      this.daemon = daemon;
+    }
+
+    public void run(){
+      LOG.info("Waiting for Daemon " + daemon.getHostName() 
+          + " to come up.....");
+      while (true) { 
+        try {
+          daemon.ping();
+          LOG.info("Daemon is : " + daemon.getHostName() + " pinging...");
+          break;
+        } catch (Exception exp) {
+          LOG.debug(daemon.getHostName() + " is waiting to come up.");
+          waitFor(60000);
+        }
+      }
+    }
+  }
+  
+  class DaemonStopThread extends Thread {
+    private AbstractDaemonClient daemon;
+
+    public DaemonStopThread(AbstractDaemonClient daemon) {
+      this.daemon = daemon;
+    }
+
+    public void run() {
+      LOG.info("Waiting for Daemon " + daemon.getHostName() 
+          + " to stop.....");
+      while (true) {
+        try {
+          daemon.ping();
+          LOG.debug(daemon.getHostName() +" is waiting state to stop.");
+          waitFor(60000);
+        } catch (Exception exp) {
+          LOG.info("Daemon is : " + daemon.getHostName() + " stopped...");
+          break;
+        } 
+      }
+    }
+  }
 }