|
@@ -26,10 +26,8 @@ import java.io.InputStream;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.io.OutputStream;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Date;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.apache.commons.httpclient.HttpClient;
|
|
|
import org.apache.commons.httpclient.HttpException;
|
|
@@ -41,7 +39,6 @@ import org.apache.commons.httpclient.methods.PostMethod;
|
|
|
import org.apache.commons.httpclient.methods.RequestEntity;
|
|
|
import org.apache.commons.httpclient.params.HttpMethodParams;
|
|
|
import org.apache.hadoop.chukwa.Chunk;
|
|
|
-import org.apache.hadoop.chukwa.datacollection.DataFactory;
|
|
|
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -50,8 +47,7 @@ import org.apache.log4j.Logger;
|
|
|
* Encapsulates all of the http setup and connection details needed for
|
|
|
* chunks to be delivered to a collector.
|
|
|
* <p>
|
|
|
- * On error, tries the list of available collectors, pauses for a minute,
|
|
|
- * and then repeats.
|
|
|
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
|
|
|
* </p>
|
|
|
* <p> Will wait forever for collectors to come up. </p>
|
|
|
*/
|
|
@@ -60,13 +56,13 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
static final int SENDER_RETRIES = 14440;
|
|
|
static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000;
|
|
|
//FIXME: this should really correspond to the timer in RetryListOfCollectors
|
|
|
- static final int BLACK_LIST_TIME = 300 * 1000;
|
|
|
+
|
|
|
static Logger log = Logger.getLogger(ChukwaHttpSender.class);
|
|
|
static HttpClient client = null;
|
|
|
static MultiThreadedHttpConnectionManager connectionManager = null;
|
|
|
static String currCollector = null;
|
|
|
|
|
|
- protected static ConcurrentHashMap<Long, String> blackList = null;
|
|
|
+
|
|
|
protected Iterator<String> collectors;
|
|
|
|
|
|
static
|
|
@@ -119,11 +115,10 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
}
|
|
|
|
|
|
public ChukwaHttpSender(){
|
|
|
+ //setup default collector
|
|
|
ArrayList<String> tmp = new ArrayList<String>();
|
|
|
this.collectors = tmp.iterator();
|
|
|
- ConcurrentHashMap<Long, String> tmpHash = new ConcurrentHashMap<Long, String>();
|
|
|
- this.blackList = tmpHash;
|
|
|
- log.info("setting collectors to an empty iterator");
|
|
|
+ log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
|
|
|
|
|
|
}
|
|
|
|
|
@@ -139,22 +134,21 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
* @param collectors
|
|
|
*/
|
|
|
public void setCollectors(Iterator<String> collectors){
|
|
|
- this.collectors = collectors;
|
|
|
- this.blackList.clear();
|
|
|
- //setup a new destination from our list of collectors if one isn't set up
|
|
|
+ this.collectors = collectors;
|
|
|
+ //setup a new destination from our list of collectors if one hasn't been set up
|
|
|
if (currCollector == null){
|
|
|
if (collectors.hasNext()){
|
|
|
currCollector = collectors.next();
|
|
|
}
|
|
|
else
|
|
|
- log.error("No collectors to try in setCollectors()");
|
|
|
+ log.error("No collectors to try in send(), not even trying to do doPost()");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * grab all of the chunks currently in the chunkQueue, stores a copy of them
|
|
|
- * locally, calculates their size, sets them up
|
|
|
+ * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
|
|
|
+ * their size, sets them up
|
|
|
* @return array of chunk id's which were ACKed by collector
|
|
|
*/
|
|
|
public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
|
|
@@ -188,29 +182,14 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
//need to pick a destination here
|
|
|
PostMethod method = new PostMethod();
|
|
|
try {
|
|
|
- if(blackList.size()!=0) {
|
|
|
- for(long time: blackList.keySet()) {
|
|
|
- long now = new Date().getTime();
|
|
|
- if(now-time > BLACK_LIST_TIME) {
|
|
|
- log.info(blackList.get(time)+" release from black list.");
|
|
|
- blackList.remove(time);
|
|
|
- } else if(currCollector.intern()==blackList.get(time)) {
|
|
|
- currCollector = collectors.next();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
doPost(method, postData, currCollector);
|
|
|
- // rotate POST to collectors do not work. All agent and collectors end up spending time to create TCP connections
|
|
|
- // but unable to send any data.
|
|
|
- // currCollector = collectors.next();
|
|
|
+
|
|
|
retries = SENDER_RETRIES; //reset count on success
|
|
|
//if no exception was thrown from doPost, ACK that these chunks were sent
|
|
|
return commitResults;
|
|
|
} catch (Throwable e) {
|
|
|
log.error("Http post exception", e);
|
|
|
log.info("Checking list of collectors to see if another collector has been specified for rollover");
|
|
|
- blackList.put(new Date().getTime(), currCollector);
|
|
|
- log.info("Black list collector: "+currCollector);
|
|
|
if (collectors.hasNext()){
|
|
|
currCollector = collectors.next();
|
|
|
log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
|
|
@@ -220,9 +199,6 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
" ms (" + retries + "retries left)");
|
|
|
Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
|
|
|
retries --;
|
|
|
- // shuffle the list of collectors if all of them are not available.
|
|
|
- this.collectors = DataFactory.getInstance().getCollectorURLs();
|
|
|
- this.blackList.clear();
|
|
|
} else {
|
|
|
log.error("No more collectors to try rolling over to; aborting");
|
|
|
throw new IOException("no collectors");
|
|
@@ -234,11 +210,6 @@ public class ChukwaHttpSender implements ChukwaSender{
|
|
|
method.releaseConnection();
|
|
|
}
|
|
|
} //end retry loop
|
|
|
- if(currCollector==null) {
|
|
|
- // reset the collector list, if ran out of collector to try.
|
|
|
- this.collectors = DataFactory.getInstance().getCollectorURLs();
|
|
|
- this.blackList.clear();
|
|
|
- }
|
|
|
return new ArrayList<CommitListEntry>();
|
|
|
}
|
|
|
|