#!/usr/bin/python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Schedule FailMon execution for nodes of file hosts.list, according to # the properties file conf/global.config. import time import ConfigParser import subprocess import threading import random jobs = [] username = "user" connections = 10 failmonDir = "" maxFiles = 100 # This class represents a thread that connects to a set of cluster # nodes to locally execute monitoring jobs. These jobs are specified # as a shell command in the constructor. class sshThread (threading.Thread): def __init__(self, threadname, username, command, failmonDir): threading.Thread.__init__(self) self.name = threadname self.username = username self.command = command self.failmonDir = failmonDir self.hosts = [] def addHost(self, host): self.hosts.append(host) def run (self): for host in self.hosts: toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command] print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...", subprocess.check_call(toRun) print "Done!" # This class represents a monitoring job. The param member is a string # that can be passed in the '--only' list of jobs given to the Java # class org.apache.hadoop.contrib.failmon.RunOnce for execution on a # node. class Job: def __init__(self, param, interval): self.param = param self.interval = interval self.counter = interval return def reset(self): self.counter = self.interval # This function reads the configuration file to get the values of the # configuration parameters. def getJobs(file): global username global connections global jobs global failmonDir global maxFiles conf = ConfigParser.SafeConfigParser() conf.read(file) username = conf.get("Default", "ssh.username") connections = int(conf.get("Default", "max.connections")) failmonDir = conf.get("Default", "failmon.dir") maxFiles = conf.get("Default", "hdfs.files.max") # Hadoop Log interval = int(conf.get("Default", "log.hadoop.interval")) if interval != 0: jobs.append(Job("hadoopLog", interval)) # System Log interval = int(conf.get("Default", "log.system.interval")) if interval != 0: jobs.append(Job("systemLog", interval)) # NICs interval = int(conf.get("Default", "nics.interval")) if interval != 0: jobs.append(Job("nics", interval)) # CPU interval = int(conf.get("Default", "cpu.interval")) if interval != 0: jobs.append(Job("cpu", interval)) # CPU interval = int(conf.get("Default", "disks.interval")) if interval != 0: jobs.append(Job("disks", interval)) # sensors interval = int(conf.get("Default", "sensors.interval")) if interval != 0: jobs.append(Job("sensors", interval)) # upload interval = int(conf.get("Default", "upload.interval")) if interval != 0: jobs.append(Job("upload", interval)) return # Compute the gcd (Greatest Common Divisor) of two integerss def GCD(a, b): assert isinstance(a, int) assert isinstance(b, int) while a: a, b = b%a, a return b # Compute the gcd (Greatest Common Divisor) of a list of integers def listGCD(joblist): assert isinstance(joblist, list) if (len(joblist) == 1): return joblist[0].interval g = GCD(joblist[0].interval, joblist[1].interval) for i in range (2, len(joblist)): g = GCD(g, joblist[i].interval) return g # Merge all failmon files created on the HDFS into a single file def mergeFiles(): global username global failmonDir hostList = [] hosts = open('./conf/hosts.list', 'r') for host in hosts: hostList.append(host.strip().rstrip()) randomHost = random.sample(hostList, 1) mergeCommand = "bin/failmon.sh --mergeFiles" toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand] print "Invoking command on", randomHost, ":\t", mergeCommand, "...", subprocess.check_call(toRun) print "Done!" return # The actual scheduling is done here def main(): getJobs("./conf/global.config") for job in jobs: print "Configuration: ", job.param, "every", job.interval, "seconds" globalInterval = listGCD(jobs) while True : time.sleep(globalInterval) params = [] for job in jobs: job.counter -= globalInterval if (job.counter <= 0): params.append(job.param) job.reset() if (len(params) == 0): continue; onlyStr = "--only " + params[0] for i in range(1, len(params)): onlyStr += ',' + params[i] command = "bin/failmon.sh " + onlyStr # execute on all nodes hosts = open('./conf/hosts.list', 'r') threadList = [] # create a thread for every connection for i in range(0, connections): threadList.append(sshThread(i, username, command, failmonDir)) # assign some hosts/connections hosts to every thread cur = 0; for host in hosts: threadList[cur].addHost(host.strip().rstrip()) cur += 1 if (cur == len(threadList)): cur = 0 for ready in threadList: ready.start() for ssht in threading.enumerate(): if ssht != threading.currentThread(): ssht.join() # if an upload has been done, then maybe we need to merge the # HDFS files if "upload" in params: mergeFiles() return if __name__ == '__main__': main()