|
@@ -41,7 +41,6 @@ import java.io.*;
|
|
|
*/
|
|
|
public class ChukwaAgent
|
|
|
{
|
|
|
- boolean DO_CHECKPOINT_RESTORE = true;
|
|
|
//boolean WRITE_CHECKPOINTS = true;
|
|
|
|
|
|
static Logger log = Logger.getLogger(ChukwaAgent.class);
|
|
@@ -87,9 +86,7 @@ public class ChukwaAgent
|
|
|
|
|
|
private File checkpointDir; // lock this object to indicate checkpoint in
|
|
|
// progress
|
|
|
- private File initialAdaptors;
|
|
|
private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
|
|
|
- private int CHECKPOINT_INTERVAL_MS; // min interval at which to write
|
|
|
// checkpoints
|
|
|
private static String tags = "";
|
|
|
|
|
@@ -122,7 +119,8 @@ public class ChukwaAgent
|
|
|
"[default collector URL]");
|
|
|
System.exit(0);
|
|
|
}
|
|
|
- ChukwaAgent localAgent = new ChukwaAgent();
|
|
|
+ Configuration conf = readConfig();
|
|
|
+ ChukwaAgent localAgent = new ChukwaAgent(conf);
|
|
|
|
|
|
if (agent.anotherAgentIsRunning())
|
|
|
{
|
|
@@ -135,15 +133,9 @@ public class ChukwaAgent
|
|
|
int uriArgNumber = 0;
|
|
|
if (args.length > 0)
|
|
|
{
|
|
|
- if (args[0].equalsIgnoreCase("-noCheckPoint"))
|
|
|
- {
|
|
|
- agent.DO_CHECKPOINT_RESTORE = false;
|
|
|
- uriArgNumber = 1;
|
|
|
- }
|
|
|
if (args[uriArgNumber].equals("local"))
|
|
|
agent.connector = new ConsoleOutConnector(agent);
|
|
|
- else
|
|
|
- {
|
|
|
+ else {
|
|
|
if (!args[uriArgNumber].contains("://"))
|
|
|
args[uriArgNumber] = "http://" + args[uriArgNumber];
|
|
|
agent.connector = new HttpConnector(agent, args[uriArgNumber]);
|
|
@@ -182,33 +174,67 @@ public class ChukwaAgent
|
|
|
{
|
|
|
return adaptorsByNumber.size();
|
|
|
}
|
|
|
+
|
|
|
|
|
|
- public ChukwaAgent() throws AlreadyRunningException
|
|
|
- {
|
|
|
- ChukwaAgent.agent = this;
|
|
|
+ public ChukwaAgent() throws AlreadyRunningException {
|
|
|
+ this(new Configuration());
|
|
|
+ }
|
|
|
|
|
|
- readConfig();
|
|
|
+ public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
|
|
|
+ ChukwaAgent.agent = this;
|
|
|
+ this.conf = conf;
|
|
|
|
|
|
// almost always just reading this; so use a ConcurrentHM.
|
|
|
// since we wrapped the offset, it's not a structural mod.
|
|
|
adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
|
|
|
adaptorsByNumber = new HashMap<Long, Adaptor>();
|
|
|
checkpointNumber = 0;
|
|
|
- try
|
|
|
- {
|
|
|
- if (DO_CHECKPOINT_RESTORE)
|
|
|
+
|
|
|
+ boolean DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
|
|
|
+ true);
|
|
|
+ CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
|
|
|
+ "chukwa_checkpoint_");
|
|
|
+ final int CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
|
|
|
+ 5000);
|
|
|
+
|
|
|
+ if(conf.get("chukwaAgent.checkpoint.dir") != null)
|
|
|
+ checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
|
|
|
+ else
|
|
|
+ DO_CHECKPOINT_RESTORE = false;
|
|
|
+
|
|
|
+ if (checkpointDir!= null && !checkpointDir.exists()) {
|
|
|
+ checkpointDir.mkdirs();
|
|
|
+ }
|
|
|
+ tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
|
|
|
+
|
|
|
+ log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
|
|
|
+ log.info("Config - checkpointDir: [" + checkpointDir + "]");
|
|
|
+ log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
|
|
|
+ + "]");
|
|
|
+ log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
|
|
|
+ log.info("Config - tags: [" + tags + "]");
|
|
|
+
|
|
|
+ if (DO_CHECKPOINT_RESTORE) {
|
|
|
+ needNewCheckpoint = true;
|
|
|
+ log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
|
|
|
+ }
|
|
|
+
|
|
|
+ File initialAdaptors = null;
|
|
|
+ if(conf.get("chukwaAgent.initial_adaptors") != null)
|
|
|
+ initialAdaptors= new File( conf.get("chukwaAgent.initial_adaptors"));
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (DO_CHECKPOINT_RESTORE) {
|
|
|
restoreFromCheckpoint();
|
|
|
- } catch (IOException e)
|
|
|
- {
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
log.warn("failed to restart from checkpoint: ", e);
|
|
|
}
|
|
|
|
|
|
- try
|
|
|
- {
|
|
|
- if (initialAdaptors != null && initialAdaptors.exists())
|
|
|
- readAdaptorsFile(initialAdaptors);
|
|
|
- } catch (IOException e)
|
|
|
- {
|
|
|
+ try {
|
|
|
+ if (initialAdaptors != null && initialAdaptors.exists() && checkpointNumber ==0)
|
|
|
+ readAdaptorsFile(initialAdaptors); //don't read after checkpoint restore
|
|
|
+ } catch (IOException e) {
|
|
|
log.warn("couldn't read user-specified file "
|
|
|
+ initialAdaptors.getAbsolutePath());
|
|
|
}
|
|
@@ -221,8 +247,7 @@ public class ChukwaAgent
|
|
|
controlSock.start(); // this sets us up as a daemon
|
|
|
log.info("control socket started on port " + controlSock.portno);
|
|
|
|
|
|
- if (CHECKPOINT_INTERVAL_MS > 0)
|
|
|
- {
|
|
|
+ if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir!= null) {
|
|
|
checkpointer = new Timer();
|
|
|
checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
|
|
|
}
|
|
@@ -242,7 +267,7 @@ public class ChukwaAgent
|
|
|
// but can be arbitrarily many space
|
|
|
// delimited agent specific params )
|
|
|
// 4) offset
|
|
|
- Pattern addCmdPattern = Pattern.compile("add\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
|
|
|
+ Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
|
|
|
// FIXME: should handle bad lines here
|
|
|
public long processCommand(String cmd)
|
|
|
{
|
|
@@ -293,8 +318,10 @@ public class ChukwaAgent
|
|
|
}
|
|
|
}
|
|
|
} else
|
|
|
- log.warn("only 'add' command supported in config files");
|
|
|
-
|
|
|
+ if(cmd.length() > 0)
|
|
|
+ log.warn("only 'add' command supported in config files");
|
|
|
+ //no warning for blank line
|
|
|
+
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
@@ -351,7 +378,7 @@ public class ChukwaAgent
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- checkpointNumber = lowestIndex;
|
|
|
+ checkpointNumber = lowestIndex+1;
|
|
|
File checkpoint = new File(checkpointDir, lowestName);
|
|
|
readAdaptorsFile(checkpoint);
|
|
|
}
|
|
@@ -469,7 +496,10 @@ public class ChukwaAgent
|
|
|
synchronized (adaptorsByNumber) {
|
|
|
toStop = adaptorsByNumber.remove(number);
|
|
|
}
|
|
|
- if (toStop == null) {
|
|
|
+
|
|
|
+ if (toStop != null) {
|
|
|
+ adaptorPositions.remove(toStop);
|
|
|
+ } else {
|
|
|
log.warn("trying to stop adaptor " + number + " that isn't running");
|
|
|
return offset;
|
|
|
}
|
|
@@ -494,60 +524,31 @@ public class ChukwaAgent
|
|
|
return connector;
|
|
|
}
|
|
|
|
|
|
- protected void readConfig() {
|
|
|
- conf = new Configuration();
|
|
|
+ protected static Configuration readConfig() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
|
|
- String chukwaHome = System.getenv("CHUKWA_HOME");
|
|
|
- if (chukwaHome == null) {
|
|
|
- chukwaHome = ".";
|
|
|
- }
|
|
|
-
|
|
|
- if (!chukwaHome.endsWith("/")) {
|
|
|
- chukwaHome = chukwaHome + File.separator;
|
|
|
- }
|
|
|
- log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
|
|
|
-
|
|
|
- String chukwaConf = System.getProperty("CHUKWA_CONF_DIR");
|
|
|
- if (chukwaConf == null) {
|
|
|
- chukwaConf = chukwaHome + "conf" + File.separator;
|
|
|
- }
|
|
|
- if (!chukwaHome.endsWith("/")) {
|
|
|
- chukwaHome = chukwaHome + File.separator;
|
|
|
+ String chukwaHomeName = System.getenv("CHUKWA_HOME");
|
|
|
+ if (chukwaHomeName == null) {
|
|
|
+ chukwaHomeName = "";
|
|
|
}
|
|
|
- if (!chukwaConf.endsWith("/")) {
|
|
|
- chukwaConf = chukwaConf + File.separator;
|
|
|
- }
|
|
|
- log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
|
|
|
-
|
|
|
- conf.addResource(new Path(chukwaConf + "chukwa-agent-conf.xml"));
|
|
|
- DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
|
|
|
- true);
|
|
|
- CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
|
|
|
- "chukwa_checkpoint_");
|
|
|
- checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome
|
|
|
- + "/var/"));
|
|
|
- CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
|
|
|
- 5000);
|
|
|
- if (!checkpointDir.exists())
|
|
|
- {
|
|
|
- checkpointDir.mkdirs();
|
|
|
- }
|
|
|
- tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
|
|
|
+ File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
|
|
|
|
|
|
- log.info("Config - chukwaHome: [" + chukwaHome + "]");
|
|
|
- log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
|
|
|
- log.info("Config - checkpointDir: [" + checkpointDir + "]");
|
|
|
- log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
|
|
|
- + "]");
|
|
|
- log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
|
|
|
- log.info("Config - tags: [" + tags + "]");
|
|
|
-
|
|
|
- if (DO_CHECKPOINT_RESTORE) {
|
|
|
- needNewCheckpoint = true;
|
|
|
- log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
|
|
|
- }
|
|
|
+ log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
|
|
|
|
|
|
- initialAdaptors = new File(chukwaConf + "initial_adaptors");
|
|
|
+ String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
|
|
|
+ File chukwaConf;
|
|
|
+ if (chukwaConfName != null)
|
|
|
+ chukwaConf = new File(chukwaConfName).getAbsoluteFile();
|
|
|
+ else
|
|
|
+ chukwaConf = new File(chukwaHome, "conf");
|
|
|
+
|
|
|
+ log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
|
|
|
+ File agentConf = new File(chukwaConf,"chukwa-agent-conf.xml");
|
|
|
+ conf.addResource(new Path(agentConf.getAbsolutePath()));
|
|
|
+ if(conf.get("chukwaAgent.checkpoint.dir") == null)
|
|
|
+ conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var").getAbsolutePath());
|
|
|
+ conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf, "initial_adaptors").getAbsolutePath());
|
|
|
+ return conf;
|
|
|
}
|
|
|
|
|
|
public void shutdown() {
|
|
@@ -559,15 +560,15 @@ public class ChukwaAgent
|
|
|
* explicitly. It probably should.
|
|
|
*/
|
|
|
public void shutdown(boolean exit) {
|
|
|
- if (checkpointer != null)
|
|
|
- checkpointer.cancel();
|
|
|
controlSock.shutdown(); // make sure we don't get new requests
|
|
|
- try {
|
|
|
- if (needNewCheckpoint)
|
|
|
+ if (checkpointer != null) {
|
|
|
+ checkpointer.cancel();
|
|
|
+ try {
|
|
|
writeCheckpoint(); // write a last checkpoint here, before stopping
|
|
|
- // adaptors
|
|
|
- } catch (IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
+ }
|
|
|
}
|
|
|
+ // adaptors
|
|
|
|
|
|
synchronized (adaptorsByNumber) {
|
|
|
// shut down each adaptor
|
|
@@ -580,6 +581,8 @@ public class ChukwaAgent
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ adaptorsByNumber.clear();
|
|
|
+ adaptorPositions.clear();
|
|
|
if (exit)
|
|
|
System.exit(0);
|
|
|
}
|