|
@@ -21,10 +21,7 @@ package org.apache.hadoop.chukwa.datacollection.collector.servlet;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintStream;
|
|
|
-import java.util.LinkedList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Timer;
|
|
|
-import java.util.TimerTask;
|
|
|
+import java.util.*;
|
|
|
|
|
|
import javax.servlet.ServletConfig;
|
|
|
import javax.servlet.ServletException;
|
|
@@ -33,10 +30,11 @@ import javax.servlet.http.HttpServlet;
|
|
|
import javax.servlet.http.HttpServletRequest;
|
|
|
import javax.servlet.http.HttpServletResponse;
|
|
|
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.chukwa.Chunk;
|
|
|
import org.apache.hadoop.chukwa.ChunkImpl;
|
|
|
-import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
|
|
|
-import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
|
|
|
+import org.apache.hadoop.chukwa.datacollection.writer.*;
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
|
public class ServletCollector extends HttpServlet
|
|
@@ -51,12 +49,18 @@ public class ServletCollector extends HttpServlet
|
|
|
public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws WriterException
|
|
|
{
|
|
|
writer = w;
|
|
|
- w.init();
|
|
|
}
|
|
|
static long statTime = 0L;
|
|
|
static int numberHTTPConnection = 0;
|
|
|
static int numberchunks = 0;
|
|
|
|
|
|
+ Configuration conf;
|
|
|
+
|
|
|
+ public ServletCollector(Configuration c) {
|
|
|
+ conf =c;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
public void init(ServletConfig servletConf) throws ServletException
|
|
|
{
|
|
|
|
|
@@ -66,7 +70,6 @@ public class ServletCollector extends HttpServlet
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
Timer statTimer = new Timer();
|
|
|
statTimer.schedule(new TimerTask()
|
|
|
{
|
|
@@ -80,31 +83,26 @@ public class ServletCollector extends HttpServlet
|
|
|
}
|
|
|
}, (1000), (60*1000));
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- // read the application->pipeline settings from a config file in the format:
|
|
|
- // appliation_name: PipelinedWriter1, PipelinedWriter2, Writer
|
|
|
- // use reflection to set up the pipeline after reading in the list of writers from the config file
|
|
|
-
|
|
|
- /*
|
|
|
- String strPipelines = "HadoopLogs:HdfsWriter\nApplication2:SameerWriter:HdfsWriter";
|
|
|
- String[] pipelines = strPipelines.split("\n");
|
|
|
- // split into pipes
|
|
|
- for (String pipe : pipelines){
|
|
|
- String[] tmp = pipe.split(":");
|
|
|
- String app = tmp[0];
|
|
|
- String[] stages = tmp[1].split(",");
|
|
|
-
|
|
|
- //loop through pipes, creating linked list of stages per pipe, one at a time
|
|
|
- for (String stage : stages){
|
|
|
- Class curr = ClassLoader.loadClass(stage);
|
|
|
- }
|
|
|
- }
|
|
|
- */
|
|
|
- //FIXME: seems weird to initialize a static object here
|
|
|
- if (writer == null)
|
|
|
- writer = new SeqFileWriter();
|
|
|
-
|
|
|
+ if(writer != null) {
|
|
|
+ log.info("writer set up statically, no need for Collector.init() to do it");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ String writerClassName = conf.get("chukwaCollector.writerClass",
|
|
|
+ SeqFileWriter.class.getCanonicalName());
|
|
|
+ Class<?> writerClass = Class.forName(writerClassName);
|
|
|
+ if(writerClass != null &&ChukwaWriter.class.isAssignableFrom(writerClass))
|
|
|
+ writer = (ChukwaWriter) writerClass.newInstance();
|
|
|
+ } catch(Exception e) {
|
|
|
+ log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ //We default to here if the pipeline construction failed or didn't happen.
|
|
|
+ try{
|
|
|
+ if(writer == null)
|
|
|
+ writer = new SeqFileWriter();//default to SeqFileWriter
|
|
|
+ writer.init(conf);
|
|
|
} catch (WriterException e) {
|
|
|
throw new ServletException("Problem init-ing servlet", e);
|
|
|
}
|