|
@@ -57,6 +57,7 @@ public class MetricDataLoader {
|
|
private static HashMap<String, String> dbTables = null;
|
|
private static HashMap<String, String> dbTables = null;
|
|
private HashMap<String, HashMap<String,Integer>> dbSchema = null;
|
|
private HashMap<String, HashMap<String,Integer>> dbSchema = null;
|
|
private static String newSpace="-";
|
|
private static String newSpace="-";
|
|
|
|
+ private static boolean batchMode = true;
|
|
|
|
|
|
/** Creates a new instance of DBWriter */
|
|
/** Creates a new instance of DBWriter */
|
|
public MetricDataLoader() {
|
|
public MetricDataLoader() {
|
|
@@ -146,31 +147,30 @@ public class MetricDataLoader {
|
|
|
|
|
|
public void process(Path source) throws IOException, URISyntaxException, SQLException {
|
|
public void process(Path source) throws IOException, URISyntaxException, SQLException {
|
|
|
|
|
|
- System.out.println("Input file:" + source.getName());
|
|
|
|
|
|
+ System.out.println("Input file:" + source.getName());
|
|
|
|
|
|
- ChukwaConfiguration conf = new ChukwaConfiguration();
|
|
|
|
- String fsName = conf.get("writer.hdfs.filesystem");
|
|
|
|
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
|
|
|
|
|
|
+ ChukwaConfiguration conf = new ChukwaConfiguration();
|
|
|
|
+ String fsName = conf.get("writer.hdfs.filesystem");
|
|
|
|
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
|
|
|
|
|
|
- SequenceFile.Reader r =
|
|
|
|
|
|
+ SequenceFile.Reader r =
|
|
new SequenceFile.Reader(fs,source, conf);
|
|
new SequenceFile.Reader(fs,source, conf);
|
|
|
|
|
|
stmt = conn.createStatement();
|
|
stmt = conn.createStatement();
|
|
conn.setAutoCommit(false);
|
|
conn.setAutoCommit(false);
|
|
|
|
|
|
- ChukwaRecordKey key = new ChukwaRecordKey();
|
|
|
|
- ChukwaRecord record = new ChukwaRecord();
|
|
|
|
- try
|
|
|
|
- {
|
|
|
|
- while (r.next(key, record))
|
|
|
|
- {
|
|
|
|
- boolean isSuccessful=true;
|
|
|
|
- String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
|
|
|
|
- log.debug("Timestamp: " + record.getTime());
|
|
|
|
- log.debug("DataType: " + key.getReduceType());
|
|
|
|
- log.debug("StreamName: " + source.getName());
|
|
|
|
|
|
+ ChukwaRecordKey key = new ChukwaRecordKey();
|
|
|
|
+ ChukwaRecord record = new ChukwaRecord();
|
|
|
|
+ try {
|
|
|
|
+ int batch=0;
|
|
|
|
+ while (r.next(key, record)) {
|
|
|
|
+ boolean isSuccessful=true;
|
|
|
|
+ String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
|
|
|
|
+ log.debug("Timestamp: " + record.getTime());
|
|
|
|
+ log.debug("DataType: " + key.getReduceType());
|
|
|
|
+ log.debug("StreamName: " + source.getName());
|
|
|
|
|
|
- String[] fields = record.getFields();
|
|
|
|
|
|
+ String[] fields = record.getFields();
|
|
String table = null;
|
|
String table = null;
|
|
String[] priKeys = null;
|
|
String[] priKeys = null;
|
|
HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
|
|
HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
|
|
@@ -232,6 +232,7 @@ public class MetricDataLoader {
|
|
}
|
|
}
|
|
Iterator<String> i = hashReport.keySet().iterator();
|
|
Iterator<String> i = hashReport.keySet().iterator();
|
|
while(i.hasNext()) {
|
|
while(i.hasNext()) {
|
|
|
|
+ long currentTimeMillis = System.currentTimeMillis();
|
|
Object iteratorNode = i.next();
|
|
Object iteratorNode = i.next();
|
|
HashMap<String, String> recordSet = hashReport.get(iteratorNode);
|
|
HashMap<String, String> recordSet = hashReport.get(iteratorNode);
|
|
Iterator<String> fi = recordSet.keySet().iterator();
|
|
Iterator<String> fi = recordSet.keySet().iterator();
|
|
@@ -298,19 +299,28 @@ public class MetricDataLoader {
|
|
" ON DUPLICATE KEY UPDATE " + sqlValues + ";";
|
|
" ON DUPLICATE KEY UPDATE " + sqlValues + ";";
|
|
}
|
|
}
|
|
log.debug(sql);
|
|
log.debug(sql);
|
|
- stmt.addBatch(sql);
|
|
|
|
|
|
+ if(batchMode) {
|
|
|
|
+ stmt.addBatch(sql);
|
|
|
|
+ batch++;
|
|
|
|
+ } else {
|
|
|
|
+ stmt.execute(sql);
|
|
|
|
+ }
|
|
String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
|
|
String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
|
|
- long currentTimeMillis = System.currentTimeMillis();
|
|
|
|
- long latencyMillis = currentTimeMillis - record.getTime();
|
|
|
|
|
|
+ long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
|
|
int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
|
|
int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
|
|
|
|
+ if(batchMode && batch>20000) {
|
|
|
|
+ int[] updateCounts = stmt.executeBatch();
|
|
|
|
+ batch=0;
|
|
|
|
+ }
|
|
log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
|
|
log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
|
|
"," + record.getTime() +
|
|
"," + record.getTime() +
|
|
") " + latencySeconds + " sec");
|
|
") " + latencySeconds + " sec");
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
- @SuppressWarnings("unused")
|
|
|
|
- int[] updateCounts = stmt.executeBatch();
|
|
|
|
|
|
+ if(batchMode) {
|
|
|
|
+ int[] updateCounts = stmt.executeBatch();
|
|
|
|
+ }
|
|
} catch (SQLException ex) {
|
|
} catch (SQLException ex) {
|
|
// handle any errors
|
|
// handle any errors
|
|
log.error(ex, ex);
|
|
log.error(ex, ex);
|
|
@@ -341,8 +351,8 @@ public class MetricDataLoader {
|
|
|
|
|
|
public static void main(String[] args) {
|
|
public static void main(String[] args) {
|
|
try {
|
|
try {
|
|
- MetricDataLoader mdl = new MetricDataLoader();
|
|
|
|
- mdl.process(new Path(args[0]));
|
|
|
|
|
|
+ MetricDataLoader mdl = new MetricDataLoader(args[0]);
|
|
|
|
+ mdl.process(new Path(args[1]));
|
|
} catch(Exception e) {
|
|
} catch(Exception e) {
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
}
|
|
}
|