Преглед на файлове

commit 2de4ecf322453200826f210e5a697277a6ba8d18
Author: Vinay Kumar Thota <vinayt@yahoo-inc.com>
Date: Wed May 26 05:16:18 2010 +0000

HADOOP:6772 from https://issues.apache.org/jira/secure/attachment/12445525/6772-ydist-security.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077480 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley преди 14 години
родител
ревизия
b7ad141808
променени са 1 файла, в които са добавени 185 реда и са изтрити 1 реда
  1. 185 1
      src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java

+ 185 - 1
src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java

@@ -19,10 +19,14 @@
 package org.apache.hadoop.test.system;
 
 import java.io.IOException;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Enumeration;
+import java.util.Hashtable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +45,10 @@ public abstract class AbstractDaemonCluster {
   protected ClusterProcessManager clusterManager;
   private Map<Enum<?>, List<AbstractDaemonClient>> daemons = 
     new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
+  private String newConfDir = null;  
+  private static final  String CONF_HADOOP_LOCAL_DIR =
+      "test.system.hdrc.hadoop.local.confdir"; 
+  private final static Object waitLock = new Object();
   
   /**
    * Constructor to create a cluster client.<br/>
@@ -84,7 +92,7 @@ public abstract class AbstractDaemonCluster {
   /**
    * Method to create the daemon client.<br/>
    * 
-   * @param remoteprocess
+   * @param process
    *          to manage the daemon.
    * @return instance of the daemon client
    * 
@@ -288,5 +296,181 @@ public abstract class AbstractDaemonCluster {
       }
     }
   }
+  
+  /**
+   * It's a local folder where the config file stores temporarily
+   * while serializing the object.
+   * @return String temporary local folder path for configuration.
+   */
+  private String getHadoopLocalConfDir() {
+    String hadoopLocalConfDir = conf.get(CONF_HADOOP_LOCAL_DIR);
+    if (hadoopLocalConfDir == null || hadoopLocalConfDir.isEmpty()) {
+      LOG.error("No configuration "
+          + "for the CONF_HADOOP_LOCAL_DIR passed");
+      throw new IllegalArgumentException(
+          "No Configuration passed for hadoop conf local directory");
+    }
+    return hadoopLocalConfDir;
+  }
+
+  /**
+   * It uses to restart the cluster with new configuration at runtime.<br/>
+   * @param props attributes for new configuration.
+   * @param configFile configuration file.
+   * @throws IOException if an I/O error occurs.
+   */
+  public void restartClusterWithNewConfig(Hashtable<String,Long> props, 
+      String configFile) throws IOException {
+
+    String mapredConf = null;
+    String localDirPath = null;
+    File localFolderObj = null;
+    File xmlFileObj = null;
+    String confXMLFile = null;
+    Configuration initConf = new Configuration(getConf());
+    Enumeration<String> e = props.keys();
+    while (e.hasMoreElements()) {
+      String propKey = e.nextElement();
+      Long propValue = props.get(propKey);
+      initConf.setLong(propKey,propValue.longValue());
+    }
+
+    localDirPath = getHadoopLocalConfDir();
+    localFolderObj = new File(localDirPath);
+    if (!localFolderObj.exists()) {
+      localFolderObj.mkdir();
+    }
+    confXMLFile = localDirPath + File.separator + configFile;
+    xmlFileObj = new File(confXMLFile);
+    initConf.writeXml(new FileOutputStream(xmlFileObj));
+    newConfDir = clusterManager.pushConfig(localDirPath);
+    stop();
+    waitForClusterToStop();
+    clusterManager.start(newConfDir);
+    waitForClusterToStart();
+    localFolderObj.delete();
+  }
+  
+  /**
+   * It uses to restart the cluster with default configuration.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void restart() throws 
+      IOException {
+    stop();
+    waitForClusterToStop();
+    start();
+    waitForClusterToStart();
+  }
+  
+  /**
+   * It uses to wait until the cluster is stopped.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void waitForClusterToStop() throws 
+      IOException {
+    List<Thread> chkDaemonStop = new ArrayList<Thread>();
+    for (List<AbstractDaemonClient> set : daemons.values()) {	  
+      for (AbstractDaemonClient daemon : set) {
+        DaemonStopThread dmStop = new DaemonStopThread(daemon);
+        chkDaemonStop.add(dmStop);
+        dmStop.start();
+      }
+    }
+
+    for (Thread daemonThread : chkDaemonStop){
+      try {
+        daemonThread.join();
+      } catch(InterruptedException intExp) {
+         LOG.warn("Interrupted while thread is joining." + intExp.getMessage());
+      }
+    }
+  }
+ 
+  /**
+   * It uses to wait until the cluster is started.<br/>
+   * @throws IOException if an I/O error occurs.
+   */
+  public void  waitForClusterToStart() throws 
+      IOException {
+    List<Thread> chkDaemonStart = new ArrayList<Thread>();
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        DaemonStartThread dmStart = new DaemonStartThread(daemon);
+        chkDaemonStart.add(dmStart);;
+        dmStart.start();
+      }
+    }
+
+    for (Thread daemonThread : chkDaemonStart){
+      try {
+        daemonThread.join();
+      } catch(InterruptedException intExp) {
+        LOG.warn("Interrupted while thread is joining" + intExp.getMessage());
+      }
+    }
+  }
+
+  /**
+   * It waits for specified amount of time.
+   * @param duration time in milliseconds.
+   * @throws InterruptedException if any thread interrupted the current
+   * thread while it is waiting for a notification.
+   */
+  public void waitFor(long duration) {
+    try {
+      synchronized (waitLock) {
+        waitLock.wait(duration);
+      }
+    } catch (InterruptedException intExp) {
+       LOG.warn("Interrrupeted while thread is waiting" + intExp.getMessage());
+    }
+  }
+  
+  class DaemonStartThread extends Thread {
+    private AbstractDaemonClient daemon;
+
+    public DaemonStartThread(AbstractDaemonClient daemon) {
+      this.daemon = daemon;
+    }
+
+    public void run(){
+      LOG.info("Waiting for Daemon " + daemon.getHostName() 
+          + " to come up.....");
+      while (true) { 
+        try {
+          daemon.ping();
+          LOG.info("Daemon is : " + daemon.getHostName() + " pinging...");
+          break;
+        } catch (Exception exp) {
+          LOG.debug(daemon.getHostName() + " is waiting to come up.");
+          waitFor(60000);
+        }
+      }
+    }
+  }
+  
+  class DaemonStopThread extends Thread {
+    private AbstractDaemonClient daemon;
+
+    public DaemonStopThread(AbstractDaemonClient daemon) {
+      this.daemon = daemon;
+    }
+
+    public void run() {
+      LOG.info("Waiting for Daemon " + daemon.getHostName() 
+          + " to stop.....");
+      while (true) {
+        try {
+          daemon.ping();
+          LOG.debug(daemon.getHostName() +" is waiting state to stop.");
+          waitFor(60000);
+        } catch (Exception exp) {
+          LOG.info("Daemon is : " + daemon.getHostName() + " stopped...");
+          break;
+        } 
+      }
+    }
+  }
 }