|
@@ -30,6 +30,8 @@ import org.apache.log4j.Logger;
|
|
|
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
import java.io.*;
|
|
|
|
|
|
/**
|
|
@@ -231,59 +233,45 @@ public class ChukwaAgent
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
+ // words should contain (space delimited):
|
|
|
+ // 0) command ("add")
|
|
|
+ // 1) AdaptorClassname
|
|
|
+ // 2) dataType (e.g. "hadoop_log")
|
|
|
+ // 3) params <optional>
|
|
|
+ // (e.g. for files, this is filename,
|
|
|
+ // but can be arbitrarily many space
|
|
|
+ // delimited agent specific params )
|
|
|
+ // 4) offset
|
|
|
+ Pattern addCmdPattern = Pattern.compile("add\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
|
|
|
// FIXME: should handle bad lines here
|
|
|
public long processCommand(String cmd)
|
|
|
{
|
|
|
- String[] words = cmd.split(" ");
|
|
|
- if (words[0].equalsIgnoreCase("add"))
|
|
|
- {
|
|
|
- // words should contain (space delimited):
|
|
|
- // 0) command ("add")
|
|
|
- // 1) AdaptorClassname
|
|
|
- // 2) dataType (e.g. "hadoop_log")
|
|
|
- // 3) params <optional>
|
|
|
- // (e.g. for files, this is filename,
|
|
|
- // but can be arbitrarily many space
|
|
|
- // delimited agent specific params )
|
|
|
- // 4) offset
|
|
|
-
|
|
|
- long offset;
|
|
|
- try
|
|
|
- {
|
|
|
- offset = Long.parseLong(words[words.length - 1]);
|
|
|
- } catch (NumberFormatException e)
|
|
|
- {
|
|
|
+ Matcher m = addCmdPattern.matcher(cmd);
|
|
|
+ if (m.matches()) {
|
|
|
+ long offset; //check for obvious errors first
|
|
|
+ try {
|
|
|
+ offset = Long.parseLong(m.group(4));
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
log.warn("malformed line " + cmd);
|
|
|
return -1L;
|
|
|
}
|
|
|
- String adaptorName = words[1];
|
|
|
+
|
|
|
+ String adaptorName = m.group(1);
|
|
|
+ String dataType = m.group(2);
|
|
|
+ String params = m.group(3);
|
|
|
+ if(params == null)
|
|
|
+ params = "";
|
|
|
|
|
|
Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
|
|
|
- if (adaptor == null)
|
|
|
- {
|
|
|
+ if (adaptor == null) {
|
|
|
log.warn("Error creating adaptor from adaptor name " + adaptorName);
|
|
|
return -1L;
|
|
|
}
|
|
|
|
|
|
- String dataType = words[2];
|
|
|
- String streamName = "";
|
|
|
- String params = "";
|
|
|
- if (words.length > 4)
|
|
|
- { // no argument
|
|
|
- int begParams = adaptorName.length() + dataType.length() + 6;
|
|
|
- // length("ADD x type ") = length(x) + 5, i.e. count letters & spaces
|
|
|
- params = cmd.substring(begParams, cmd.length()
|
|
|
- - words[words.length - 1].length() - 1);
|
|
|
- streamName = params.substring(params.indexOf(" ") + 1, params.length());
|
|
|
- }
|
|
|
long adaptorID;
|
|
|
- synchronized (adaptorsByNumber)
|
|
|
- {
|
|
|
- for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet())
|
|
|
- {
|
|
|
- if (streamName.intern() == a.getValue().getStreamName().intern())
|
|
|
- {
|
|
|
+ synchronized (adaptorsByNumber) {
|
|
|
+ for (Map.Entry<Long, Adaptor> a : adaptorsByNumber.entrySet()) {
|
|
|
+ if (params.intern() == a.getValue().getStreamName().intern()) {
|
|
|
log.warn(params + " already exist, skipping.");
|
|
|
return -1;
|
|
|
}
|