Explorar o código

HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa. Contributed by Eric Yang

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@728206 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas %!s(int64=16) %!d(string=hai) anos
pai
achega
4e9f069a7f

+ 3 - 0
CHANGES.txt

@@ -481,6 +481,9 @@ Release 0.20.0 - Unreleased
     HADOOP-4849. Documentation for Service Level Authorization implemented in
     HADOOP-4348. (acmurthy)
 
+    HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa (Eric
+    Yang via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 11 - 12
src/contrib/chukwa/bin/dbAdmin.sh

@@ -37,18 +37,17 @@ while [ 1 ]
     cat ${CHUKWA_CONF_DIR}/jdbc.conf | \
     while read LINE; do
         CLUSTER=`echo ${LINE} | cut -f 1 -d'='`
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Aggregator 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Consolidator 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator ${EXP_DATE} 3650 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Aggregator &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration ${EXP_DATE} 3650 &
     done
     end=`date +%s`
     duration=$(( $end - $start ))

+ 86 - 10
src/contrib/chukwa/conf/aggregator.sql

@@ -1,12 +1,88 @@
-insert into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] from [system_metrics] where timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from [dfs_datanode] where timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [cluster_disk] (select a.timestamp,a.mount,a.used,a.available,a.used_percent from (select from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent) as used_percent from [disk] where timestamp between '[past_hour]' and '[now]' group by timestamp,mount) as a group by a.timestamp, a.mount);
-insert into [hod_job_digest] (select timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on (a.HodID = b.HodID) where endtime between '[past_hour]' and '[now]') as d,[system_metrics] where timestamp between d.starttime and d.endtime and host=d.machine group by hodid,timestamp);
-insert into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from [hadoop_rpc] where timestamp between '[past_hour]' and '[now]' group by timestamp);
-#insert into [cluster_hadoop_mapred] (select timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [user_util] (select timestamp, j.UserID as user, sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used, avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines) as disk_unused, sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines) as disk_used, avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused, sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) and Timestamp between '[past_hour]' and '[now]' group by j.UserID);
 #insert into [node_util] select starttime, avg(unused) as unused, avg(used) as used from (select DATE_FORMAT(m.LAUNCH_TIME,'%Y-%m-%d %H:%i:%s') as starttime,sum(AvgCPUBusy*j.NumOfMachines/(60*100)) as unused,sum((100-AvgCPUBusy)*j.NumOfMachines/(60*100)) as used from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where m.LAUNCH_TIME >= '2008-09-12 21:11' and m.LAUNCH_TIME <= '2008-09-12 22:11' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by m.MRJobID order by m.LAUNCH_TIME) as t group by t.starttime 
 #insert into [jobtype_util] select CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as m, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= '2008-09-12 21:11' and d.Timestamp <= '2008-09-12 22:11' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END
-#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
-#insert into [b] select m, sum(foo.nodehours) as nodehours from (select m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m, count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
-#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * j.NumOfMachines) / (sum(j.NumOfMachines) * 1)) as used from Digest d join HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
+#insert into [b] select m, sum(foo.nodehours) as nodehours from (select m.MRJobID, round(avg(if(AvgCPUBusy is null,0,AvgCPUBusy)),0) as m, count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
+#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END as interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by AvgCPUBusy,CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [cluster_hadoop_mapred] (select timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job] where timestamp between '[past_10_minutes]' and '[now]' group by timestamp);
+replace into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] from [system_metrics] where timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from [dfs_datanode] where timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [cluster_disk] (select a.timestamp,a.mount,a.used,a.available,a.used_percent from (select from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as timestamp,mount,avg(used) as used,avg(available) as available,avg(used_percent) as used_percent from [disk] where timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp,mount) as a group by a.timestamp, a.mount);
+replace delayed into [hod_job_digest] (select timestamp,d.hodid,d.userid,[avg(system_metrics)] from (select a.HodID,b.host as machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine] b on (a.HodID = b.HodID) where endtime between '[past_10_minutes]' and '[past_5_minutes]') as d,[system_metrics] where timestamp between d.starttime and d.endtime and host=d.machine group by hodid,timestamp);
+replace into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from [hadoop_rpc] where timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [user_util] (select timestamp, j.UserID as user, sum(j.NumOfMachines) as node_total, sum(cpu_idle_pcnt*j.NumOfMachines) as cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines) as cpu_used, avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines) as disk_unused, sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines) as disk_used, avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as disk_used_pcnt, sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, sum(eth0_busy_pcnt*j.NumOfMachines) as network_used, avg(eth0_busy_pcnt) as network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines) as memory_unused, sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) and Timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by j.UserID);
+#
+# Down sample metrics for charts
+replace into [system_metrics_month] (select timestamp,[group_avg(system_metrics)] from [system_metrics_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [system_metrics_quarter] (select timestamp,[group_avg(system_metrics)] from [system_metrics_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [system_metrics_year] (select timestamp,[group_avg(system_metrics)] from [system_metrics_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [system_metrics_decade] (select timestamp,[group_avg(system_metrics)] from [system_metrics_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_namenode_month] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_namenode_quarter] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_namenode_year] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_namenode_decade] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_datanode_month] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_datanode_quarter] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_datanode_year] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_datanode_decade] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_rpc_month] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_rpc_quarter] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_rpc_year] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_rpc_decade] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [cluster_hadoop_rpc_month] (select timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_hadoop_rpc_quarter] (select timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_hadoop_rpc_year] (select timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_hadoop_rpc_decade] (select timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hadoop_mapred_month] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_mapred_quarter] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_mapred_year] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_mapred_decade] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_jvm_month] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,process_name);
+replace into [hadoop_jvm_quarter] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,process_name);
+replace into [hadoop_jvm_year] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,process_name);
+replace into [hadoop_jvm_decade] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,process_name);
+#
+replace into [dfs_throughput_month] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [dfs_throughput_quarter] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [dfs_throughput_year] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [dfs_throughput_decade] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [node_activity_month] (select timestamp,[avg(node_activity)] from [node_activity_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [node_activity_quarter] (select timestamp,[avg(node_activity)] from [node_activity_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [node_activity_year] (select timestamp,[avg(node_activity)] from [node_activity_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [node_activity_decade] (select timestamp,[avg(node_activity)] from [node_activity_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [dfs_fsnamesystem_month] (select timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_fsnamesystem_quarter] (select timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_fsnamesystem_year] (select timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_fsnamesystem_decade] (select timestamp,[group_avg(dfs_fsnamesystem)] from [dfs_fsnamesystem_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [disk_month] (select timestamp,[group_avg(disk)] from [disk_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,mount);
+replace into [disk_quarter] (select timestamp,[group_avg(disk)] from [disk_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,mount);
+replace into [disk_year] (select timestamp,[group_avg(disk)] from [disk_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,mount);
+replace into [disk_decade] (select timestamp,[group_avg(disk)] from [disk_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,mount);
+#
+replace into [cluster_disk_month] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),mount);
+replace into [cluster_disk_quarter] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),mount);
+replace into [cluster_disk_year] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),mount);
+replace into [cluster_disk_decade] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),mount);
+#
+replace into [cluster_system_metrics_month] (select timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_system_metrics_quarter] (select timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_system_metrics_year] (select timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_system_metrics_decade] (select timestamp,[avg(cluster_system_metrics)] from [cluster_system_metrics_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hod_job_digest_month] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),HodID);
+replace into [hod_job_digest_quarter] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),HodID);
+replace into [hod_job_digest_year] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),HodID);
+replace into [hod_job_digest_decade] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),HodID);
+#
+replace into [user_util_month] (select timestamp,[group_avg(user_util)] from [user_util_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),user);
+replace into [user_util_quarter] (select timestamp,[group_avg(user_util)] from [user_util_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),user);
+replace into [user_util_year] (select timestamp,[group_avg(user_util)] from [user_util_quarter] where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),user);
+replace into [user_util_decade] (select timestamp,[group_avg(user_util)] from [user_util_year] where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),user);

+ 6 - 4
src/contrib/chukwa/conf/database_create_tables

@@ -160,7 +160,7 @@ create table if not exists cluster_system_metrics_template (
     sdc_busy_pcnt float,
     sdd_busy_pcnt float,
     swap_used_pcnt float,
-    primary key(host, timestamp),
+    primary key(timestamp),
     index (timestamp)
 );
 
@@ -571,7 +571,7 @@ create table if not exists hod_job_digest_template (
     sdd_busy_pcnt float,
     swap_used_pcnt float,
     primary key(HodId, timestamp),
-    index(timeStamp)
+    index(timestamp)
 ); 
 
 create table if not exists user_util_template (
@@ -589,7 +589,9 @@ create table if not exists user_util_template (
     network_used_pcnt float,
     memory_unused double,
     memory_used double,
-    memory_used_pcnt float
+    memory_used_pcnt float,
+    primary key(user, timestamp),
+    index(timestamp)
 );
 
 create table if not exists QueueInfo(
@@ -598,5 +600,5 @@ create table if not exists QueueInfo(
     Queue VARCHAR(20),
     NumOfMachine smallint unsigned,
     status varchar(1),
-    index(TimeStamp)
+    index(Timestamp)
 );

+ 0 - 1212
src/contrib/chukwa/conf/mysql_create_tables

@@ -1,1212 +0,0 @@
-create table if not exists NodeActivity 
-(
-  Id int(11) NOT NULL auto_increment,
-  Timestamp  timestamp default CURRENT_TIMESTAMP,
-  Used int(11) default NULL,
-  Free int(11) default NULL,
-  Down int(11) default NULL,
-  PRIMARY KEY  (Id),
-  index (Timestamp)
-);
-
-create table if not exists NodeActivity_5 
-(
-  Id int(11) NOT NULL auto_increment,
-  Timestamp  timestamp default CURRENT_TIMESTAMP,
-  Used int(11) default NULL,
-  Free int(11) default NULL,
-  Down int(11) default NULL,
-  PRIMARY KEY  (Id),
-  index (Timestamp)
-);
-
-create table if not exists NodeActivity_30 
-(
-  Id int(11) NOT NULL auto_increment,
-  Timestamp  timestamp default CURRENT_TIMESTAMP,
-  Used int(11) default NULL,
-  Free int(11) default NULL,
-  Down int(11) default NULL,
-  PRIMARY KEY  (Id),
-  index (Timestamp)
-);
-
-create table if not exists NodeActivity_120 
-(
-  Id int(11) NOT NULL auto_increment,
-  Timestamp  timestamp default CURRENT_TIMESTAMP,
-  Used int(11) default NULL,
-  Free int(11) default NULL,
-  Down int(11) default NULL,
-  PRIMARY KEY  (Id),
-  index (Timestamp)
-);
-
-create table if not exists SimonData (
-Machine varchar(40),
-Timestamp  timestamp default CURRENT_TIMESTAMP,
-CPUBusy tinyint unsigned,
-FreeMemPercentage tinyint unsigned,
-CPUIdle tinyint unsigned,
-CPUNice tinyint unsigned,
-CPUSystem tinyint unsigned,
-CPUUser tinyint unsigned,
-CPUIOWait tinyint unsigned,
-CPUIrq tinyint unsigned,
-CPUSoftIrq tinyint unsigned,
-LoadFifteen  tinyint unsigned,
-LoadFive tinyint unsigned,
-LoadOne tinyint unsigned,
-MemBuffersPercentage tinyint unsigned,
-MemCachedPercentage tinyint unsigned,
-MemUserPercentage tinyint unsigned,
-MemSharedPercentage tinyint unsigned,
-MaxDiskBusyPercentage tinyint unsigned,
-NetworkInKBps mediumint  unsigned,
-NetworkOutKBps mediumint unsigned,
-DiskAReadKBps mediumint unsigned,
-DiskAWriteKBps mediumint unsigned,
-DiskBReadKBps mediumint unsigned,
-DiskBWriteKBps mediumint unsigned,
-DiskCReadKBps mediumint unsigned,
-DiskCWriteKBps mediumint unsigned,
-DiskDReadKBps mediumint unsigned,
-DiskDWriteKBps mediumint unsigned,
-DiskTotalGB smallint unsigned,
-DiskUsedPercentage tinyint unsigned,
-PacketsIn mediumint unsigned,
-PacketsOut mediumint unsigned,
-SwapInKBps mediumint unsigned,
-SwapOutKBps mediumint unsigned,
-primary key( Machine, Timestamp),
-index (Timestamp)
-);
-
-create table if not exists SimonData_5 (
-Machine varchar(40),
-Timestamp  timestamp default CURRENT_TIMESTAMP,
-CPUBusy tinyint unsigned,
-FreeMemPercentage tinyint unsigned,
-CPUIdle tinyint unsigned,
-CPUNice tinyint unsigned,
-CPUSystem tinyint unsigned,
-CPUUser tinyint unsigned,
-CPUIOWait tinyint unsigned,
-CPUIrq tinyint unsigned,
-CPUSoftIrq tinyint unsigned,
-LoadFifteen  tinyint unsigned,
-LoadFive tinyint unsigned,
-LoadOne tinyint unsigned,
-MemBuffersPercentage tinyint unsigned,
-MemCachedPercentage tinyint unsigned,
-MemUserPercentage tinyint unsigned,
-MemSharedPercentage tinyint unsigned,
-MaxDiskBusyPercentage tinyint unsigned,
-NetworkInKBps mediumint  unsigned,
-NetworkOutKBps mediumint unsigned,
-DiskAReadKBps mediumint unsigned,
-DiskAWriteKBps mediumint unsigned,
-DiskBReadKBps mediumint unsigned,
-DiskBWriteKBps mediumint unsigned,
-DiskCReadKBps mediumint unsigned,
-DiskCWriteKBps mediumint unsigned,
-DiskDReadKBps mediumint unsigned,
-DiskDWriteKBps mediumint unsigned,
-DiskTotalGB smallint unsigned,
-DiskUsedPercentage tinyint unsigned,
-PacketsIn mediumint unsigned,
-PacketsOut mediumint unsigned,
-SwapInKBps mediumint unsigned,
-SwapOutKBps mediumint unsigned,
-primary key( Machine, Timestamp),
-index (Timestamp)
-);
-
-create table if not exists SimonData_30 (
-Machine varchar(40),
-Timestamp  timestamp default CURRENT_TIMESTAMP,
-CPUBusy tinyint unsigned,
-FreeMemPercentage tinyint unsigned,
-CPUIdle tinyint unsigned,
-CPUNice tinyint unsigned,
-CPUSystem tinyint unsigned,
-CPUUser tinyint unsigned,
-CPUIOWait tinyint unsigned,
-CPUIrq tinyint unsigned,
-CPUSoftIrq tinyint unsigned,
-LoadFifteen  tinyint unsigned,
-LoadFive tinyint unsigned,
-LoadOne tinyint unsigned,
-MemBuffersPercentage tinyint unsigned,
-MemCachedPercentage tinyint unsigned,
-MemUserPercentage tinyint unsigned,
-MemSharedPercentage tinyint unsigned,
-MaxDiskBusyPercentage tinyint unsigned,
-NetworkInKBps mediumint  unsigned,
-NetworkOutKBps mediumint unsigned,
-DiskAReadKBps mediumint unsigned,
-DiskAWriteKBps mediumint unsigned,
-DiskBReadKBps mediumint unsigned,
-DiskBWriteKBps mediumint unsigned,
-DiskCReadKBps mediumint unsigned,
-DiskCWriteKBps mediumint unsigned,
-DiskDReadKBps mediumint unsigned,
-DiskDWriteKBps mediumint unsigned,
-DiskTotalGB smallint unsigned,
-DiskUsedPercentage tinyint unsigned,
-PacketsIn mediumint unsigned,
-PacketsOut mediumint unsigned,
-SwapInKBps mediumint unsigned,
-SwapOutKBps mediumint unsigned,
-primary key( Machine, Timestamp),
-index (Timestamp)
-);
-
-create table if not exists SimonData_120 (
-Machine varchar(40),
-Timestamp  timestamp default CURRENT_TIMESTAMP,
-CPUBusy tinyint unsigned,
-FreeMemPercentage tinyint unsigned,
-CPUIdle tinyint unsigned,
-CPUNice tinyint unsigned,
-CPUSystem tinyint unsigned,
-CPUUser tinyint unsigned,
-CPUIOWait tinyint unsigned,
-CPUIrq tinyint unsigned,
-CPUSoftIrq tinyint unsigned,
-LoadFifteen  tinyint unsigned,
-LoadFive tinyint unsigned,
-LoadOne tinyint unsigned,
-MemBuffersPercentage tinyint unsigned,
-MemCachedPercentage tinyint unsigned,
-MemUserPercentage tinyint unsigned,
-MemSharedPercentage tinyint unsigned,
-MaxDiskBusyPercentage tinyint unsigned,
-NetworkInKBps mediumint  unsigned,
-NetworkOutKBps mediumint unsigned,
-DiskAReadKBps mediumint unsigned,
-DiskAWriteKBps mediumint unsigned,
-DiskBReadKBps mediumint unsigned,
-DiskBWriteKBps mediumint unsigned,
-DiskCReadKBps mediumint unsigned,
-DiskCWriteKBps mediumint unsigned,
-DiskDReadKBps mediumint unsigned,
-DiskDWriteKBps mediumint unsigned,
-DiskTotalGB smallint unsigned,
-DiskUsedPercentage tinyint unsigned,
-PacketsIn mediumint unsigned,
-PacketsOut mediumint unsigned,
-SwapInKBps mediumint unsigned,
-SwapOutKBps mediumint unsigned,
-primary key( Machine, Timestamp),
-index (Timestamp)
-);
-
-create table if not exists HodMachine (
-HodID varchar(20) not null, 
-Machine varchar(40) not null,
-index(HodId)
-);
-
-create table if not exists HodJob (
-HodID varchar(20) primary key, 
-UserID varchar(20), 
-Status  smallint,
-JobTracker varchar(40), 
-TimeQueued mediumint unsigned,
-StartTime timestamp default CURRENT_TIMESTAMP, 
-EndTime timestamp default 0,  
-NumOfMachines smallint unsigned,  
-SlotLimitPerTracker smallint unsigned,
-LogProcessStatus varchar(20),
-index(StartTime)
-);
-
-
-create table if not exists MachineInfo (
-Name varchar(40) , 
-GoodUntil timestamp default 0,
-NumOfCores smallint unsigned, 
-NumOfCPUs smallint unsigned, 
-MemoryMB smallint unsigned, 
-NumOfDisks smallint unsigned, 
-DiskCapacityInGB smallint unsigned, 
-ClockFrequency varchar(20), 
-RackID varchar(20), 
-primary key(Name,GoodUntil)
-);
-
-create table if not exists MRJob(
-HodID varchar(20),
-MRJobID varchar(80),
-MRJobName varchar(100),
-STATUS varchar(10),
-SUBMIT_TIME timestamp default 0,
-LAUNCH_TIME timestamp default 0,
-FINISH_TIME timestamp default 0,
-MAPPER_PHASE_END_TIME timestamp default 0,
-TOTAL_MAPS int unsigned,
-TOTAL_REDUCES int unsigned,
-FINISHED_MAPS  int unsigned,
-FINISHED_REDUCES  int unsigned,      
-NumOfLocalSplits int unsigned,
-NumOfRackLocalSplits int unsigned,
-NUM_OF_MAPPER_ATTEMPTS int unsigned,
-NUM_OF_REDUCER_ATTEMPTS int unsigned,
-MAPPER_PHASE_EXECUTION_TIME int,
-AvgMapperExecutionTime int unsigned,
-AvgLocalMapperExecutionTime int unsigned,
-AvgRackLocalMapperExecutionTime int unsigned,
-AvgRemoteMapperExecutionTime int unsigned,
-
-AvgReducerExecutionTime int unsigned,
-AvgShuffleExecutionTime int unsigned,
-AvgSortExecutionTime int unsigned,
-
-MapperClass varchar(80),
-ReducerClass varchar(80),
-PartitionerClass varchar(80),
-CombinerClass varchar(80),
-InputFormatClass varchar(80),
-InputKeyClass varchar(80),
-InputValueClass varchar(80),
-OutputFormatClass varchar(80),
-OutputCompressed tinyint,
-OutputCompressionType  varchar(20),
-OutputCompressionCodec varchar(20),
-OutputKeyClass varchar(80),
-OutputValueClass varchar(80),
-MapOutputKeyClass varchar(80),
-MapOutputValueClass varchar(80),
-MapOutputCompressed tinyint,
-MapOutputCompressionType  varchar(20),
-MapOutputCompressionCodec varchar(20),
-InputDataSizeInMB int unsigned,
-MapOutputSizeInMB int unsigned,
-OutputDataSizeInMB int unsigned,
-MR_JOBCONF  text, 
-InputDir  text, 
-primary key( HodID, MRJobID)
-);
-
-create table if not exists MRJobTSData (
-HodID varchar(20),
-MRJobID varchar(80),
-Timestamp timestamp default 0,      
-MAPPER_COUNT mediumint unsigned,        
-REDUCER_COUNT mediumint unsigned,       
-SORT_COUNT mediumint unsigned,  
-SHUFFLE_COUNT mediumint unsigned,        
-REDUCER_EXECUTION_COUNT mediumint unsigned, 
-FINISHED_MAPPER_COUNT   mediumint unsigned, 
-FINISHED_REDUCER_COUNT mediumint unsigned,
-primary key (HodID, MRJobID, Timestamp),
-index(TimeStamp)
-);
-
-
-create table if not exists HodJobDigest(
-    HodID VARCHAR(20),
-    Timestamp timestamp default 0,
-    AvgCPUBusy  tinyint unsigned,
-    MinCPUBusy  tinyint unsigned,
-    MaxCPUBusy  tinyint unsigned,
-    AvgFreeMemPercentage tinyint unsigned,
-    MinFreeMemPercentage tinyint unsigned,
-    MaxFreeMemPercentage tinyint unsigned,
-    AvgMaxDiskBusyPercentage  TINYINT UNSIGNED,
-    MinMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    MaxMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    AvgNetworkInKBps mediumint unsigned,
-    MinNetworkInKBps mediumint unsigned,
-    MaxNetworkInKBps mediumint unsigned,
-    AvgNetworkOutKBps mediumint unsigned,
-    MinNetworkOutKBps mediumint unsigned,
-    MaxNetworkOutKBps mediumint unsigned,
-    AvgDiskAReadKBps mediumint unsigned,
-    MinDiskAReadKBps  mediumint unsigned,
-    MaxDiskAReadKBps mediumint unsigned,
-    AvgDiskBReadKBps mediumint unsigned,
-    MinDiskBReadKBps mediumint unsigned,
-    MaxDiskBReadKBps mediumint unsigned,
-    AvgDiskCReadKBps mediumint unsigned,
-    MinDiskCReadKBps mediumint unsigned,
-    MaxDiskCReadKBps mediumint unsigned,
-    AvgDiskDReadKBps mediumint unsigned,
-    MinDiskDReadKBps mediumint unsigned,
-    MaxDiskDReadKBps mediumint unsigned,
-    AvgDiskAWriteKBps mediumint unsigned,
-    MinDiskAWriteKBps mediumint UNSIGNED,
-    MaxDiskAWriteKBps mediumint UNSIGNED,
-    AvgDiskBWriteKBps mediumint UNSIGNED,
-    MinDiskBWriteKBps mediumint UNSIGNED,
-    MaxDiskBWriteKBps mediumint UNSIGNED,
-    AvgDiskCWriteKBps mediumint UNSIGNED,
-    MinDiskCWriteKBps mediumint UNSIGNED,
-    MaxDiskCWriteKBps mediumint UNSIGNED,
-    AvgDiskDWriteKBps mediumint UNSIGNED,
-    MinDiskDWriteKBps mediumint UNSIGNED,
-    MaxDiskDWriteKBps mediumint UNSIGNED,
-    AvgDiskUsedPercentage  TINYINT UNSIGNED,
-    MinDiskUsedPercentage    TINYINT UNSIGNED,
-    MaxDiskUsedPercentage    TINYINT UNSIGNED,
-    AvgOverallUsage  TINYINT UNSIGNED,
-    MinOverallUsage TINYINT UNSIGNED,
-    MaxOverallUsage  TINYINT UNSIGNED,
-    primary key(HodId, Timestamp),
-    index(TimeStamp)
-); 
-
-create table if not exists HodJobDigest_5 (
-    HodID VARCHAR(20),
-    Timestamp timestamp default 0,
-    AvgCPUBusy  tinyint unsigned,
-    MinCPUBusy  tinyint unsigned,
-    MaxCPUBusy  tinyint unsigned,
-    AvgFreeMemPercentage tinyint unsigned,
-    MinFreeMemPercentage tinyint unsigned,
-    MaxFreeMemPercentage tinyint unsigned,
-    AvgMaxDiskBusyPercentage  TINYINT UNSIGNED,
-    MinMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    MaxMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    AvgNetworkInKBps mediumint unsigned,
-    MinNetworkInKBps mediumint unsigned,
-    MaxNetworkInKBps mediumint unsigned,
-    AvgNetworkOutKBps mediumint unsigned,
-    MinNetworkOutKBps mediumint unsigned,
-    MaxNetworkOutKBps mediumint unsigned,
-    AvgDiskAReadKBps mediumint unsigned,
-    MinDiskAReadKBps  mediumint unsigned,
-    MaxDiskAReadKBps mediumint unsigned,
-    AvgDiskBReadKBps mediumint unsigned,
-    MinDiskBReadKBps mediumint unsigned,
-    MaxDiskBReadKBps mediumint unsigned,
-    AvgDiskCReadKBps mediumint unsigned,
-    MinDiskCReadKBps mediumint unsigned,
-    MaxDiskCReadKBps mediumint unsigned,
-    AvgDiskDReadKBps mediumint unsigned,
-    MinDiskDReadKBps mediumint unsigned,
-    MaxDiskDReadKBps mediumint unsigned,
-    AvgDiskAWriteKBps mediumint unsigned,
-    MinDiskAWriteKBps mediumint UNSIGNED,
-    MaxDiskAWriteKBps mediumint UNSIGNED,
-    AvgDiskBWriteKBps mediumint UNSIGNED,
-    MinDiskBWriteKBps mediumint UNSIGNED,
-    MaxDiskBWriteKBps mediumint UNSIGNED,
-    AvgDiskCWriteKBps mediumint UNSIGNED,
-    MinDiskCWriteKBps mediumint UNSIGNED,
-    MaxDiskCWriteKBps mediumint UNSIGNED,
-    AvgDiskDWriteKBps mediumint UNSIGNED,
-    MinDiskDWriteKBps mediumint UNSIGNED,
-    MaxDiskDWriteKBps mediumint UNSIGNED,
-    AvgDiskUsedPercentage  TINYINT UNSIGNED,
-    MinDiskUsedPercentage    TINYINT UNSIGNED,
-    MaxDiskUsedPercentage    TINYINT UNSIGNED,
-    AvgOverallUsage  TINYINT UNSIGNED,
-    MinOverallUsage TINYINT UNSIGNED,
-    MaxOverallUsage  TINYINT UNSIGNED,
-    primary key(HodId, Timestamp),
-    index(TimeStamp)
-);
- 
-create table if not exists HodJobDigest_30 (
-    HodID VARCHAR(20),
-    Timestamp timestamp default 0,
-    AvgCPUBusy  tinyint unsigned,
-    MinCPUBusy  tinyint unsigned,
-    MaxCPUBusy  tinyint unsigned,
-    AvgFreeMemPercentage tinyint unsigned,
-    MinFreeMemPercentage tinyint unsigned,
-    MaxFreeMemPercentage tinyint unsigned,
-    AvgMaxDiskBusyPercentage  TINYINT UNSIGNED,
-    MinMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    MaxMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    AvgNetworkInKBps mediumint unsigned,
-    MinNetworkInKBps mediumint unsigned,
-    MaxNetworkInKBps mediumint unsigned,
-    AvgNetworkOutKBps mediumint unsigned,
-    MinNetworkOutKBps mediumint unsigned,
-    MaxNetworkOutKBps mediumint unsigned,
-    AvgDiskAReadKBps mediumint unsigned,
-    MinDiskAReadKBps  mediumint unsigned,
-    MaxDiskAReadKBps mediumint unsigned,
-    AvgDiskBReadKBps mediumint unsigned,
-    MinDiskBReadKBps mediumint unsigned,
-    MaxDiskBReadKBps mediumint unsigned,
-    AvgDiskCReadKBps mediumint unsigned,
-    MinDiskCReadKBps mediumint unsigned,
-    MaxDiskCReadKBps mediumint unsigned,
-    AvgDiskDReadKBps mediumint unsigned,
-    MinDiskDReadKBps mediumint unsigned,
-    MaxDiskDReadKBps mediumint unsigned,
-    AvgDiskAWriteKBps mediumint unsigned,
-    MinDiskAWriteKBps mediumint UNSIGNED,
-    MaxDiskAWriteKBps mediumint UNSIGNED,
-    AvgDiskBWriteKBps mediumint UNSIGNED,
-    MinDiskBWriteKBps mediumint UNSIGNED,
-    MaxDiskBWriteKBps mediumint UNSIGNED,
-    AvgDiskCWriteKBps mediumint UNSIGNED,
-    MinDiskCWriteKBps mediumint UNSIGNED,
-    MaxDiskCWriteKBps mediumint UNSIGNED,
-    AvgDiskDWriteKBps mediumint UNSIGNED,
-    MinDiskDWriteKBps mediumint UNSIGNED,
-    MaxDiskDWriteKBps mediumint UNSIGNED,
-    AvgDiskUsedPercentage  TINYINT UNSIGNED,
-    MinDiskUsedPercentage    TINYINT UNSIGNED,
-    MaxDiskUsedPercentage    TINYINT UNSIGNED,
-    AvgOverallUsage  TINYINT UNSIGNED,
-    MinOverallUsage TINYINT UNSIGNED,
-    MaxOverallUsage  TINYINT UNSIGNED,
-    primary key(HodId, Timestamp),
-    index(TimeStamp)
-); 
-
-create table if not exists HodJobDigest_120 (
-    HodID VARCHAR(20),
-    Timestamp timestamp default 0,
-    AvgCPUBusy  tinyint unsigned,
-    MinCPUBusy  tinyint unsigned,
-    MaxCPUBusy  tinyint unsigned,
-    AvgFreeMemPercentage tinyint unsigned,
-    MinFreeMemPercentage tinyint unsigned,
-    MaxFreeMemPercentage tinyint unsigned,
-    AvgMaxDiskBusyPercentage  TINYINT UNSIGNED,
-    MinMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    MaxMaxDiskBusyPercentage    TINYINT UNSIGNED,
-    AvgNetworkInKBps mediumint unsigned,
-    MinNetworkInKBps mediumint unsigned,
-    MaxNetworkInKBps mediumint unsigned,
-    AvgNetworkOutKBps mediumint unsigned,
-    MinNetworkOutKBps mediumint unsigned,
-    MaxNetworkOutKBps mediumint unsigned,
-    AvgDiskAReadKBps mediumint unsigned,
-    MinDiskAReadKBps  mediumint unsigned,
-    MaxDiskAReadKBps mediumint unsigned,
-    AvgDiskBReadKBps mediumint unsigned,
-    MinDiskBReadKBps mediumint unsigned,
-    MaxDiskBReadKBps mediumint unsigned,
-    AvgDiskCReadKBps mediumint unsigned,
-    MinDiskCReadKBps mediumint unsigned,
-    MaxDiskCReadKBps mediumint unsigned,
-    AvgDiskDReadKBps mediumint unsigned,
-    MinDiskDReadKBps mediumint unsigned,
-    MaxDiskDReadKBps mediumint unsigned,
-    AvgDiskAWriteKBps mediumint unsigned,
-    MinDiskAWriteKBps mediumint UNSIGNED,
-    MaxDiskAWriteKBps mediumint UNSIGNED,
-    AvgDiskBWriteKBps mediumint UNSIGNED,
-    MinDiskBWriteKBps mediumint UNSIGNED,
-    MaxDiskBWriteKBps mediumint UNSIGNED,
-    AvgDiskCWriteKBps mediumint UNSIGNED,
-    MinDiskCWriteKBps mediumint UNSIGNED,
-    MaxDiskCWriteKBps mediumint UNSIGNED,
-    AvgDiskDWriteKBps mediumint UNSIGNED,
-    MinDiskDWriteKBps mediumint UNSIGNED,
-    MaxDiskDWriteKBps mediumint UNSIGNED,
-    AvgDiskUsedPercentage  TINYINT UNSIGNED,
-    MinDiskUsedPercentage    TINYINT UNSIGNED,
-    MaxDiskUsedPercentage    TINYINT UNSIGNED,
-    AvgOverallUsage  TINYINT UNSIGNED,
-    MinOverallUsage TINYINT UNSIGNED,
-    MaxOverallUsage  TINYINT UNSIGNED,
-    primary key(HodId, Timestamp),
-    index(TimeStamp)
-); 
-create table if not exists HodJobUnprocessed (
-HodID varchar(20),
-primary key(HodId)
-);
-
-create table if not exists QueueInfo(
-Timestamp timestamp default 0,
-HodID VARCHAR(20),
-Queue VARCHAR(20),
-NumOfMachine smallint unsigned,
-status varchar(1),
-index(TimeStamp)
-);
-
-
-create table if not exists cluster_systemstate(
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED, 
-    pids mediumint UNSIGNED,
-    avgBlurbs double,
-    avgBlurbRate double,
-    avgCpuIdle double,
-    avgCpuNice double,
-    avgCpuNicePercent double,
-    avgCpuSystem double,
-    avgCpuUser double,
-    avgCpuIowait double,
-    avgCpuIrq double,
-    avgCpuSoftirq double,
-    avgCpuBusy double,
-    avgLoadOne double,
-    avgLoadFive double,
-    avgLoadFifteen double,
-    avgMemBuffers double,
-    avgMemCached double,
-    avgMemCachedPercent double,
-    avgMemFree double,
-    avgMemFreePercent double,
-    avgMemUser double,
-    avgMemUserPercent double,
-    avgMemShared double,
-    avgMemSharedPercent double,
-    avgMemTotal double,
-    avgSwapTotal double,
-    avgSwapInKbps double,
-    avgSwapOutKbps double,
-    avgBytesIn double,
-    avgBytesOut double,
-    avgPktsIn double,
-    avgPktsOut double,
-    avgDiskFree double,
-    avgDiskTotal double,
-    avgDiskUsed double,
-    avgPartMaxUsed double,
-    avgDiskMaxBusy double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists cluster_systemstate_5(
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED, 
-    pids mediumint UNSIGNED,
-    avgBlurbs double,
-    avgBlurbRate double,
-    avgCpuIdle double,
-    avgCpuNice double,
-    avgCpuNicePercent double,
-    avgCpuSystem double,
-    avgCpuUser double,
-    avgCpuIowait double,
-    avgCpuIrq double,
-    avgCpuSoftirq double,
-    avgCpuBusy double,
-    avgLoadOne double,
-    avgLoadFive double,
-    avgLoadFifteen double,
-    avgMemBuffers double,
-    avgMemCached double,
-    avgMemCachedPercent double,
-    avgMemFree double,
-    avgMemFreePercent double,
-    avgMemUser double,
-    avgMemUserPercent double,
-    avgMemShared double,
-    avgMemSharedPercent double,
-    avgMemTotal double,
-    avgSwapTotal double,
-    avgSwapInKbps double,
-    avgSwapOutKbps double,
-    avgBytesIn double,
-    avgBytesOut double,
-    avgPktsIn double,
-    avgPktsOut double,
-    avgDiskFree double,
-    avgDiskTotal double,
-    avgDiskUsed double,
-    avgPartMaxUsed double,
-    avgDiskMaxBusy double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists cluster_systemstate_30(
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED, 
-    pids mediumint UNSIGNED,
-    avgBlurbs double,
-    avgBlurbRate double,
-    avgCpuIdle double,
-    avgCpuNice double,
-    avgCpuNicePercent double,
-    avgCpuSystem double,
-    avgCpuUser double,
-    avgCpuIowait double,
-    avgCpuIrq double,
-    avgCpuSoftirq double,
-    avgCpuBusy double,
-    avgLoadOne double,
-    avgLoadFive double,
-    avgLoadFifteen double,
-    avgMemBuffers double,
-    avgMemCached double,
-    avgMemCachedPercent double,
-    avgMemFree double,
-    avgMemFreePercent double,
-    avgMemUser double,
-    avgMemUserPercent double,
-    avgMemShared double,
-    avgMemSharedPercent double,
-    avgMemTotal double,
-    avgSwapTotal double,
-    avgSwapInKbps double,
-    avgSwapOutKbps double,
-    avgBytesIn double,
-    avgBytesOut double,
-    avgPktsIn double,
-    avgPktsOut double,
-    avgDiskFree double,
-    avgDiskTotal double,
-    avgDiskUsed double,
-    avgPartMaxUsed double,
-    avgDiskMaxBusy double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists cluster_systemstate_120(
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED, 
-    pids mediumint UNSIGNED,
-    avgBlurbs double,
-    avgBlurbRate double,
-    avgCpuIdle double,
-    avgCpuNice double,
-    avgCpuNicePercent double,
-    avgCpuSystem double,
-    avgCpuUser double,
-    avgCpuIowait double,
-    avgCpuIrq double,
-    avgCpuSoftirq double,
-    avgCpuBusy double,
-    avgLoadOne double,
-    avgLoadFive double,
-    avgLoadFifteen double,
-    avgMemBuffers double,
-    avgMemCached double,
-    avgMemCachedPercent double,
-    avgMemFree double,
-    avgMemFreePercent double,
-    avgMemUser double,
-    avgMemUserPercent double,
-    avgMemShared double,
-    avgMemSharedPercent double,
-    avgMemTotal double,
-    avgSwapTotal double,
-    avgSwapInKbps double,
-    avgSwapOutKbps double,
-    avgBytesIn double,
-    avgBytesOut double,
-    avgPktsIn double,
-    avgPktsOut double,
-    avgDiskFree double,
-    avgDiskTotal double,
-    avgDiskUsed double,
-    avgPartMaxUsed double,
-    avgDiskMaxBusy double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_throughput (
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED,
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    bytesReadPerSec double,
-    blocksWrittenPerSec double,
-    blocksReadPerSec double,
-    blocksReplicatedPerSec double,
-    blocksRemovedPerSec double,
-    blocksVerifiedPerSec double,
-    blockVerificationFailuresPerSec double,
-    readsFromLocalClientPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    writesFromLocalRemotePerSec double,
-    readBlockOperationsPerSec double,
-    numberReadBlockOperations double,
-    writeBlockOperationsPerSec double,
-    numberWriteBlockOperations double,
-    readMetadataOperationsPerSec double,
-    numberReadMetadataOperations double,
-    copyBlockOperationsPerSec double,
-    numberCopyBlockOperations double,
-    replaceBlockOperationsPerSec double,
-    numberReplaceBlockOperations double,
-    heartBeatsPerSec double,
-    numberHeartBeats double,
-    blockReportsPerSec double,
-    numberBlockReports double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_throughput_5 (
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED,
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    bytesReadPerSec double,
-    blocksWrittenPerSec double,
-    blocksReadPerSec double,
-    blocksReplicatedPerSec double,
-    blocksRemovedPerSec double,
-    blocksVerifiedPerSec double,
-    blockVerificationFailuresPerSec double,
-    readsFromLocalClientPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    writesFromLocalRemotePerSec double,
-    readBlockOperationsPerSec double,
-    numberReadBlockOperations double,
-    writeBlockOperationsPerSec double,
-    numberWriteBlockOperations double,
-    readMetadataOperationsPerSec double,
-    numberReadMetadataOperations double,
-    copyBlockOperationsPerSec double,
-    numberCopyBlockOperations double,
-    replaceBlockOperationsPerSec double,
-    numberReplaceBlockOperations double,
-    heartBeatsPerSec double,
-    numberHeartBeats double,
-    blockReportsPerSec double,
-    numberBlockReports double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_throughput_30 (
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED,
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    bytesReadPerSec double,
-    blocksWrittenPerSec double,
-    blocksReadPerSec double,
-    blocksReplicatedPerSec double,
-    blocksRemovedPerSec double,
-    blocksVerifiedPerSec double,
-    blockVerificationFailuresPerSec double,
-    readsFromLocalClientPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    writesFromLocalRemotePerSec double,
-    readBlockOperationsPerSec double,
-    numberReadBlockOperations double,
-    writeBlockOperationsPerSec double,
-    numberWriteBlockOperations double,
-    readMetadataOperationsPerSec double,
-    numberReadMetadataOperations double,
-    copyBlockOperationsPerSec double,
-    numberCopyBlockOperations double,
-    replaceBlockOperationsPerSec double,
-    numberReplaceBlockOperations double,
-    heartBeatsPerSec double,
-    numberHeartBeats double,
-    blockReportsPerSec double,
-    numberBlockReports double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_throughput_120 (
-    Timestamp timestamp default 0,
-    nodes mediumint UNSIGNED,
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    bytesReadPerSec double,
-    blocksWrittenPerSec double,
-    blocksReadPerSec double,
-    blocksReplicatedPerSec double,
-    blocksRemovedPerSec double,
-    blocksVerifiedPerSec double,
-    blockVerificationFailuresPerSec double,
-    readsFromLocalClientPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    writesFromLocalRemotePerSec double,
-    readBlockOperationsPerSec double,
-    numberReadBlockOperations double,
-    writeBlockOperationsPerSec double,
-    numberWriteBlockOperations double,
-    readMetadataOperationsPerSec double,
-    numberReadMetadataOperations double,
-    copyBlockOperationsPerSec double,
-    numberCopyBlockOperations double,
-    replaceBlockOperationsPerSec double,
-    numberReplaceBlockOperations double,
-    heartBeatsPerSec double,
-    numberHeartBeats double,
-    blockReportsPerSec double,
-    numberBlockReports double,
-    primary key(Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_individual (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    blocksRemovedPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    blocksVerifiedPerSec double,
-    blocksWrittenPerSec double,
-    blockVerificationFailuresPerSec double,
-    restarts double,
-    blocksReplicatedPerSec double,
-    bytesReadPerSec double,
-    writesFromLocalRemotePerSec double,
-    readsFromLocalClientPerSec double,
-    blocksReadPerSec double,
-    primary key(node, TimeStamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_individual_5 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    blocksRemovedPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    blocksVerifiedPerSec double,
-    blocksWrittenPerSec double,
-    blockVerificationFailuresPerSec double,
-    restarts double,
-    blocksReplicatedPerSec double,
-    bytesReadPerSec double,
-    writesFromLocalRemotePerSec double,
-    readsFromLocalClientPerSec double,
-    blocksReadPerSec double,
-    primary key(node, TimeStamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_individual_30 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    blocksRemovedPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    blocksVerifiedPerSec double,
-    blocksWrittenPerSec double,
-    blockVerificationFailuresPerSec double,
-    restarts double,
-    blocksReplicatedPerSec double,
-    bytesReadPerSec double,
-    writesFromLocalRemotePerSec double,
-    readsFromLocalClientPerSec double,
-    blocksReadPerSec double,
-    primary key(node, TimeStamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_individual_120 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    bytesWrittenPerSec double,
-    blocksRemovedPerSec double,
-    readsFromLocalRemotePerSec double,
-    writesFromLocalClientPerSec double,
-    blocksVerifiedPerSec double,
-    blocksWrittenPerSec double,
-    blockVerificationFailuresPerSec double,
-    restarts double,
-    blocksReplicatedPerSec double,
-    bytesReadPerSec double,
-    writesFromLocalRemotePerSec double,
-    readsFromLocalClientPerSec double,
-    blocksReadPerSec double,
-    primary key(node, TimeStamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_namenode (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    filesDeletedPerSec double,
-    filesCreatedPerSec double,
-    filesOpenedPerSec double,
-    filesRenamedPerSec double,
-    filesListedPerSec double,
-    numberOfTransactionsPerSec double, 
-    AverageTransactions double,
-    timeInSafeMode double,
-    numberOfSyncsPerSec double,
-    AverageSyncTime double,
-    fsImageLoadTime double,
-    numberOfBlockReportPerSec double,
-    AverageBlockReportTime double,
-    BlocksCorrupted double,
-    primary key(node, Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_namenode_5 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    filesDeletedPerSec double,
-    filesCreatedPerSec double,
-    filesOpenedPerSec double,
-    filesRenamedPerSec double,
-    filesListedPerSec double,
-    numberOfTransactionsPerSec double, 
-    AverageTransactions double,
-    timeInSafeMode double,
-    numberOfSyncsPerSec double,
-    AverageSyncTime double,
-    fsImageLoadTime double,
-    numberOfBlockReportPerSec double,
-    AverageBlockReportTime double,
-    BlocksCorrupted double,
-    primary key(node, Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_namenode_30 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    filesDeletedPerSec double,
-    filesCreatedPerSec double,
-    filesOpenedPerSec double,
-    filesRenamedPerSec double,
-    filesListedPerSec double,
-    numberOfTransactionsPerSec double, 
-    AverageTransactions double,
-    timeInSafeMode double,
-    numberOfSyncsPerSec double,
-    AverageSyncTime double,
-    fsImageLoadTime double,
-    numberOfBlockReportPerSec double,
-    AverageBlockReportTime double,
-    BlocksCorrupted double,
-    primary key(node, Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists dfs_namenode_120 (
-    Timestamp timestamp default 0,
-    node varchar(80),
-    pids mediumint UNSIGNED,
-    restarts mediumint UNSIGNED,
-    filesDeletedPerSec double,
-    filesCreatedPerSec double,
-    filesOpenedPerSec double,
-    filesRenamedPerSec double,
-    filesListedPerSec double,
-    numberOfTransactionsPerSec double, 
-    AverageTransactions double,
-    timeInSafeMode double,
-    numberOfSyncsPerSec double,
-    AverageSyncTime double,
-    fsImageLoadTime double,
-    numberOfBlockReportPerSec double,
-    AverageBlockReportTime double,
-    BlocksCorrupted double,
-    primary key(node, Timestamp),
-    index(TimeStamp)
-);
-
-create table if not exists MRJobCounters (
-    HodId double,
-    JobId double,
-    MAP_REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS double,
-    FILE_SYSTEMS_LOCAL_BYTES_WRITTEN double,
-    MAP_REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES double,
-    JOB_COUNTERS__LAUNCHED_MAP_TASKS double,
-    MAP_REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS double,
-    MAP_REDUCE_FRAMEWORK_MAP_INPUT_RECORDS double,
-    MAP_REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS double,
-    FILE_SYSTEMS_HDFS_BYTES_READ double,
-    JOB_COUNTERS__RACK_LOCAL_MAP_TASKS double,
-    MAP_REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS double,
-    MAP_REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS double,
-    JOB_COUNTERS__DATA_LOCAL_MAP_TASKS double,
-    FILE_SYSTEMS_HDFS_BYTES_WRITTEN double,
-    FILE_SYSTEMS_LOCAL_BYTES_READ double, 
-    MAP_REDUCE_FRAMEWORK_MAP_INPUT_BYTES double, 
-    JOB_COUNTERS__LAUNCHED_REDUCE_TASKS double,
-    MAP_REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS double,
-    primary key(HodId, JobId),
-    index(HodId)
-);
-
-create table if not exists rpc_metrics (
-    Timestamp timestamp default 0,
-    nodes double,
-    AverageRpcQueueTime_num_ops double,
-    AverageRpcQueueTime_avg_time double,
-    AverageRpcProcessingTime_num_ops double,
-    AverageRpcProcessingTime_avg_time double,
-    RpcDiscarded_num_ops double,
-    RpcDiscarded_avg_time double,
-    register_num_ops double,
-    register_avg_time double,
-    getProtocolVersion_num_ops double,
-    getProtocolVersion_avg_time double,
-    sendHeartbeat_num_ops double,
-    sendHeartbeat_avg_time double,
-    blockReport_num_ops double,
-    blockReport_avg_time double,
-    getBlockLocations_num_ops double,
-    heartbeat_num_ops double,
-    versionRequest_num_ops double,
-    setPermission_num_ops double,
-    rollFsImage_num_ops double,
-    primary key(timestamp),
-    index(timestamp)
-);
-
-create table if not exists rpc_metrics_5 (
-    Timestamp timestamp default 0,
-    nodes double,
-    AverageRpcQueueTime_num_ops double,
-    AverageRpcQueueTime_avg_time double,
-    AverageRpcProcessingTime_num_ops double,
-    AverageRpcProcessingTime_avg_time double,
-    RpcDiscarded_num_ops double,
-    RpcDiscarded_avg_time double,
-    register_num_ops double,
-    register_avg_time double,
-    getProtocolVersion_num_ops double,
-    getProtocolVersion_avg_time double,
-    sendHeartbeat_num_ops double,
-    sendHeartbeat_avg_time double,
-    blockReport_num_ops double,
-    blockReport_avg_time double,
-    getBlockLocations_num_ops double,
-    heartbeat_num_ops double,
-    versionRequest_num_ops double,
-    setPermission_num_ops double,
-    rollFsImage_num_ops double,
-    primary key(timestamp),
-    index(timestamp)
-);
-
-create table if not exists rpc_metrics_30 (
-    Timestamp timestamp default 0,
-    nodes double,
-    AverageRpcQueueTime_num_ops double,
-    AverageRpcQueueTime_avg_time double,
-    AverageRpcProcessingTime_num_ops double,
-    AverageRpcProcessingTime_avg_time double,
-    RpcDiscarded_num_ops double,
-    RpcDiscarded_avg_time double,
-    register_num_ops double,
-    register_avg_time double,
-    getProtocolVersion_num_ops double,
-    getProtocolVersion_avg_time double,
-    sendHeartbeat_num_ops double,
-    sendHeartbeat_avg_time double,
-    blockReport_num_ops double,
-    blockReport_avg_time double,
-    getBlockLocations_num_ops double,
-    heartbeat_num_ops double,
-    versionRequest_num_ops double,
-    setPermission_num_ops double,
-    rollFsImage_num_ops double,
-    primary key(timestamp),
-    index(timestamp)
-);
-
-create table if not exists rpc_metrics_120 (
-    Timestamp timestamp default 0,
-    nodes double,
-    AverageRpcQueueTime_num_ops double,
-    AverageRpcQueueTime_avg_time double,
-    AverageRpcProcessingTime_num_ops double,
-    AverageRpcProcessingTime_avg_time double,
-    RpcDiscarded_num_ops double,
-    RpcDiscarded_avg_time double,
-    register_num_ops double,
-    register_avg_time double,
-    getProtocolVersion_num_ops double,
-    getProtocolVersion_avg_time double,
-    sendHeartbeat_num_ops double,
-    sendHeartbeat_avg_time double,
-    blockReport_num_ops double,
-    blockReport_avg_time double,
-    getBlockLocations_num_ops double,
-    heartbeat_num_ops double,
-    versionRequest_num_ops double,
-    setPermission_num_ops double,
-    rollFsImage_num_ops double,
-    primary key(timestamp),
-    index(timestamp)
-);
-
-create table if not exists dfs_fsnamesystem_status (
-    Timestamp timestamp default 0,
-    node VARCHAR(80),
-    FilesTotal double,
-    BlocksTotal double,
-    CapacityTotalGB double,
-    CapacityUsedGB double,
-    CapacityRemainingGB double,
-    TotalLoad double,
-    PendingReplicationBlocks double,
-    UnderReplicatedBlocks double,
-    ScheduledReplicationBlocks double,
-    primary key(node, timestamp),
-    index(timestamp)
-);
-
-create table if not exists dfs_fsnamesystem_status_5 (
-    Timestamp timestamp default 0,
-    node VARCHAR(80),
-    FilesTotal double,
-    BlocksTotal double,
-    CapacityTotalGB double,
-    CapacityUsedGB double,
-    CapacityRemainingGB double,
-    TotalLoad double,
-    PendingReplicationBlocks double,
-    UnderReplicatedBlocks double,
-    ScheduledReplicationBlocks double,
-    primary key(node, timestamp),
-    index(timestamp)
-);
-
-create table if not exists dfs_fsnamesystem_status_30 (
-    Timestamp timestamp default 0,
-    node VARCHAR(80),
-    FilesTotal double,
-    BlocksTotal double,
-    CapacityTotalGB double,
-    CapacityUsedGB double,
-    CapacityRemainingGB double,
-    TotalLoad double,
-    PendingReplicationBlocks double,
-    UnderReplicatedBlocks double,
-    ScheduledReplicationBlocks double,
-    primary key(node, timestamp),
-    index(timestamp)
-);
-
-create table if not exists dfs_fsnamesystem_status_120 (
-    Timestamp timestamp default 0,
-    node VARCHAR(80),
-    FilesTotal double,
-    BlocksTotal double,
-    CapacityTotalGB double,
-    CapacityUsedGB double,
-    CapacityRemainingGB double,
-    TotalLoad double,
-    PendingReplicationBlocks double,
-    UnderReplicatedBlocks double,
-    ScheduledReplicationBlocks double,
-    primary key(node, timestamp),
-    index(timestamp)
-);

+ 0 - 4
src/contrib/chukwa/conf/mysql_upgrade_tables

@@ -1,4 +0,0 @@
-alter table dfs_throughput change readsFromLocalRemotePerSec readsFromRemoteClientPerSec double;
-alter table dfs_throughput change writesFromLocalRemotePerSec writesFromRemoteClientPerSec double;
-alter table dfs_individual change readsFromLocalRemotePerSec readsFromRemoteClientPerSec double;
-alter table dfs_individual change writesFromLocalRemotePerSec writesFromRemoteClientPerSec double;

+ 151 - 78
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java

@@ -25,30 +25,37 @@ import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
-
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.DatabaseMetaData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.PidFile;
 
 public class Aggregator {
 	private static DatabaseConfig dbc = null;
 
-	private static Log log = LogFactory.getLog(Consolidator.class);
+	private static Log log = LogFactory.getLog(Aggregator.class);
+	private String table = null;
+	private String jdbc = null;
+	private int[] intervals;
 	private long current = 0;
-    private static PidFile loader=null;
-
-	public Aggregator() {
-
+    private static DatabaseWriter db = null;
+    public Aggregator() {
 		dbc = new DatabaseConfig();
 		Calendar now = Calendar.getInstance();
 		current = now.getTimeInMillis();
 	}
 
-	public HashMap<String,String> findMacros(String query) {
+	public HashMap<String,String> findMacros(String query) throws SQLException {
 		boolean add=false;
 		HashMap<String,String> macroList = new HashMap<String,String>();
 		String macro="";
@@ -71,54 +78,131 @@ public class Aggregator {
 	    return macroList;
 	}
 
-	public String computeMacro(String macro) {
-		if(macro.indexOf("avg(")==0) {
+	public String computeMacro(String macro) throws SQLException {
+		Pattern p = Pattern.compile("past_(.*)_minutes");
+		Matcher matcher = p.matcher(macro);
+		if(macro.indexOf("avg(")==0 || macro.indexOf("group_avg(")==0) {
 			String meta="";
-			String[] table = dbc.findTableName(macro.substring(4,macro.indexOf(")")), current, current);
+			String[] table = dbc.findTableName(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")), current, current);
 			try {
 				String cluster = System.getProperty("CLUSTER");
 				if(cluster==null) {
 					cluster="unknown";
 				}
-				DatabaseWriter db = new DatabaseWriter(cluster);
-
-			    String query = "select * from "+table[0]+" order by timestamp desc limit 1";
-	            log.debug("Query: "+query);
-	            ResultSet rs = db.query(query);
-	            if(rs==null) {
-	          	    throw new SQLException("Table is undefined.");
+                DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
+	            ResultSet rs = dbMetaData.getColumns ( null,null,table[0], null);
+	            boolean first=true;
+	            while(rs.next()) {
+	            	if(!first) {
+	            		meta = meta+",";
+	            	}
+	            	String name = rs.getString(4);
+	            	int type = rs.getInt(5);
+	            	if(type==java.sql.Types.VARCHAR) {
+	            		if(macro.indexOf("group_avg(")<0) {
+	            			meta=meta+"count("+name+") as "+name;
+	            		} else {
+	            			meta=meta+name;
+	            		}
+		            	first=false;
+	            	} else if(type==java.sql.Types.DOUBLE ||
+	            			  type==java.sql.Types.FLOAT ||
+	            			  type==java.sql.Types.INTEGER) {
+	            		meta=meta+"avg("+name+")";
+		            	first=false;
+	            	} else if(type==java.sql.Types.TIMESTAMP) {
+	            		// Skip the column
+	            	} else {
+	            		meta=meta+"AVG("+name+")";
+		            	first=false;
+	            	}
 	            }
-	            ResultSetMetaData rmeta = rs.getMetaData();
-	            if(rs.next()) {
-	            	boolean first=true;
-	                for(int i=1;i<=rmeta.getColumnCount();i++) {
-	                	if(!first) {
-	                		meta=meta+",";
-	                	}
-		                if(rmeta.getColumnType(i)==java.sql.Types.VARCHAR) {
-		                	meta=meta+"count("+rmeta.getColumnName(i)+") as "+rmeta.getColumnName(i);
-		                	first=false;
-		                } else if(rmeta.getColumnType(i)==java.sql.Types.DOUBLE || 
-		                		  rmeta.getColumnType(i)==java.sql.Types.INTEGER || 
-		                		  rmeta.getColumnType(i)==java.sql.Types.FLOAT) {
-		                	meta=meta+"avg("+rmeta.getColumnName(i)+")";
-		                	first=false;
-		                } else if(rmeta.getColumnType(i)==java.sql.Types.TIMESTAMP) {
-		                	// Skip the column
-		                } else {
-		                	meta=meta+"avg("+rmeta.getColumnName(i)+")";
-		                	first=false;		                	
-		                }
-		            }
+	            if(first) {
+	          	    throw new SQLException("Table is undefined.");
 	            }
 			} catch(SQLException ex) {
-				log.error(ex);
+				throw new SQLException("Table does not exist:"+ table[0]);
 			}
 			return meta;
 		} else if(macro.indexOf("now")==0) {
+			SimpleDateFormat sdf = new SimpleDateFormat();
 			return DatabaseWriter.formatTimeStamp(current);
+		} else if(matcher.find()) {
+			int period = Integer.parseInt(matcher.group(1));
+			long timestamp = current - (current % (period*60*1000L)) - (period*60*1000L);
+			return DatabaseWriter.formatTimeStamp(timestamp);
 		} else if(macro.indexOf("past_hour")==0) {
 			return DatabaseWriter.formatTimeStamp(current-3600*1000L);
+		} else if(macro.endsWith("_week")) {
+			long partition = current / DatabaseConfig.WEEK;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_week");
+			return tableName.toString();
+		} else if(macro.endsWith("_month")) {
+			long partition = current / DatabaseConfig.MONTH;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_month");
+			return tableName.toString();
+		} else if(macro.endsWith("_quarter")) {
+			long partition = current / DatabaseConfig.QUARTER;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_quarter");
+			return tableName.toString();
+		} else if(macro.endsWith("_year")) {
+			long partition = current / DatabaseConfig.YEAR;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_year");
+			return tableName.toString();
+		} else if(macro.endsWith("_decade")) {
+			long partition = current / DatabaseConfig.DECADE;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_decade");
+			return tableName.toString();
 		}
 		String[] tableList = dbc.findTableName(macro,current,current);
 		return tableList[0];
@@ -143,62 +227,51 @@ public class Aggregator {
         return contents.toString();
     }
 
-	public void process(String table, String query) {
+	public void process(String query) {
 		ResultSet rs = null;
+		String[] columns;
+		int[] columnsType;
+        String groupBy = "";
 	    long start = current;
 	    long end = current;
         
-		String cluster = System.getProperty("CLUSTER");
-		if(cluster==null) {
-			cluster="unknown";
+
+		try {
+            HashMap<String, String> macroList = findMacros(query);
+            Iterator<String> macroKeys = macroList.keySet().iterator();
+            while(macroKeys.hasNext()) {
+        	    String mkey = macroKeys.next();
+        	    log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
+	    	    query = query.replace("["+mkey+"]", macroList.get(mkey));
+            }
+            db.execute(query);
+		} catch(SQLException e) {
+		    log.error(query);
+			log.error(e.getMessage());
 		}
-	    DatabaseWriter db = new DatabaseWriter(cluster);
-			    // Find the last aggregated value from table
-			    String[] tmpList = dbc.findTableName(table,start,end);
-			    String timeTest = "select timestamp from "+tmpList[0]+" order by timestamp desc limit 1";
-			    try {
-					rs = db.query(timeTest);
-				    while(rs.next()) {
-				    	start=rs.getTimestamp(1).getTime();
-				    	end=start;
-				    }
-			    } catch (SQLException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-			    // Transform table names
-                HashMap<String, String> macroList = findMacros(query);
-                Iterator<String> macroKeys = macroList.keySet().iterator();
-                while(macroKeys.hasNext()) {
-                	String mkey = macroKeys.next();
-                	log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
-			    	query = query.replace("["+mkey+"]", macroList.get(mkey));
-                }
-				log.info(query);
-                db.execute(query);
-            db.close();
 	}
 
     public static void main(String[] args) {
-        loader=new PidFile(System.getProperty("CLUSTER")+"Aggregator");
-    	dbc = new DatabaseConfig();    	
+        log.info("Aggregator started.");
+    	dbc = new DatabaseConfig();
+		String cluster = System.getProperty("CLUSTER");
+		if(cluster==null) {
+			cluster="unknown";
+		}
+    	db = new DatabaseWriter(cluster);
     	String queries = Aggregator.getContents(new File(System.getenv("CHUKWA_CONF_DIR")+File.separator+"aggregator.sql"));
     	String[] query = queries.split("\n");
     	for(int i=0;i<query.length;i++) {
-    		    int startOffset = query[i].indexOf("[")+1;
-    		    int endOffset = query[i].indexOf("]");
     		    if(query[i].equals("")) {
-    		    } else if(startOffset==-1 || endOffset==-1) {
-    		    	log.error("Unable to extract table name from query:"+query[i]);
     		    } else if(query[i].indexOf("#")==0) {
     		    	log.debug("skipping: "+query[i]);
     		    } else {
-    		    	String table = query[i].substring(startOffset, endOffset);
     		    	Aggregator dba = new Aggregator();
-    		    	dba.process(table, query[i]);
+    		    	dba.process(query[i]);
     		    }
         }
-        loader.clean();
+        db.close();
+    	log.info("Aggregator finished.");
     }
 
 }

+ 36 - 10
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java

@@ -27,6 +27,7 @@ import java.text.SimpleDateFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 
 public class DatabaseWriter {
     private static Log log = LogFactory.getLog(DatabaseWriter.class);
@@ -35,26 +36,25 @@ public class DatabaseWriter {
     private ResultSet rs = null;
 
     public DatabaseWriter(String host, String user, String password) {
-    	String jdbc_url = System.getenv("JDBC_URL_PREFIX")+host+"/";
-    	
-		if(user!=null) {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+host+"/";
+        if(user!=null) {
             jdbc_url = jdbc_url + "?user=" + user;
             if(password!=null) {
                 jdbc_url = jdbc_url + "&password=" + password;
             }
-		}
+        }
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-            String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
@@ -66,20 +66,43 @@ public class DatabaseWriter {
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-        	String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
     }
     
+    public DatabaseWriter() {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
+        if(mdlConfig.get("jdbc.user")!=null) {
+            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+            if(mdlConfig.get("jdbc.password")!=null) {
+                jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+            }
+        }
+        try {
+            // The newInstance() call is a work around for some
+            // broken Java implementations
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        } catch (Exception ex) {
+            // handle the error
+            log.error(ex,ex);
+        }
+        try {
+            conn = DriverManager.getConnection(jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
+        } catch (SQLException ex) {
+            log.error(ex,ex);
+        }
+    }
     public void execute(String query) {
         try {
             stmt = conn.createStatement(); 
@@ -102,6 +125,9 @@ public class DatabaseWriter {
             }
         }
     }
+    public Connection getConnection() {
+    	return conn;
+    }
     public ResultSet query(String query) throws SQLException {
         try {
             stmt = conn.createStatement();