|
@@ -36,32 +36,6 @@ public class TestFileTailingAdaptors extends TestCase {
|
|
|
chunks = new ChunkCatcherConnector();
|
|
|
chunks.start();
|
|
|
}
|
|
|
-
|
|
|
- public void testRawAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
|
|
|
-
|
|
|
- ChukwaAgent agent = new ChukwaAgent();
|
|
|
- // Remove any adaptor left over from previous run
|
|
|
- ChukwaConfiguration cc = new ChukwaConfiguration();
|
|
|
- int portno = cc.getInt("chukwaAgent.control.port", 9093);
|
|
|
- ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
|
|
|
- cli.removeAll();
|
|
|
- // sleep for some time to make sure we don't get chunk from existing streams
|
|
|
- Thread.sleep(5000);
|
|
|
- File testFile = makeTestFile("/tmp/chukwaRawTest",80);
|
|
|
- long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor" +
|
|
|
- " raw " + testFile + " 0");
|
|
|
- assertTrue(adaptorId != -1);
|
|
|
- Chunk c = chunks.waitForAChunk();
|
|
|
- while(!c.getDataType().equals("raw")) {
|
|
|
- c = chunks.waitForAChunk();
|
|
|
- }
|
|
|
- assertTrue(c.getDataType().equals("raw"));
|
|
|
- assertTrue(c.getRecordOffsets().length == 1);
|
|
|
- assertTrue(c.getSeqID() == testFile.length());
|
|
|
- agent.stopAdaptor(adaptorId, false);
|
|
|
- agent.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
public void testCrSepAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
|
|
|
ChukwaAgent agent = new ChukwaAgent();
|
|
@@ -83,7 +57,6 @@ public class TestFileTailingAdaptors extends TestCase {
|
|
|
c = chunks.waitForAChunk();
|
|
|
}
|
|
|
assertTrue(c.getSeqID() == testFile.length());
|
|
|
-System.out.println(c.getRecordOffsets().length);
|
|
|
assertTrue(c.getRecordOffsets().length == 80);
|
|
|
int recStart = 0;
|
|
|
for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
|
|
@@ -96,56 +69,7 @@ System.out.println(c.getRecordOffsets().length);
|
|
|
agent.stopAdaptor(adaptorId, false);
|
|
|
agent.shutdown();
|
|
|
}
|
|
|
-
|
|
|
- public void testLogRotate() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
|
|
|
- ChukwaAgent agent = new ChukwaAgent();
|
|
|
- // Remove any adaptor left over from previous run
|
|
|
- ChukwaConfiguration cc = new ChukwaConfiguration();
|
|
|
- int portno = cc.getInt("chukwaAgent.control.port", 9093);
|
|
|
- ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
|
|
|
- cli.removeAll();
|
|
|
- // sleep for some time to make sure we don't get chunk from existing streams
|
|
|
- Thread.sleep(5000);
|
|
|
- File testFile = makeTestFile("/tmp/chukwaLogRotateTest",80);
|
|
|
- long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" +
|
|
|
- " lines " + testFile + " 0");
|
|
|
- assertTrue(adaptorId != -1);
|
|
|
- System.out.println("getting a chunk...");
|
|
|
- Chunk c = chunks.waitForAChunk();
|
|
|
- System.out.println("got chunk");
|
|
|
- while(!c.getDataType().equals("lines")) {
|
|
|
- c = chunks.waitForAChunk();
|
|
|
- }
|
|
|
- assertTrue(c.getSeqID() == testFile.length());
|
|
|
- assertTrue(c.getRecordOffsets().length == 80);
|
|
|
- int recStart = 0;
|
|
|
- for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
|
|
|
- String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
|
|
|
- System.out.println("record "+ rec+ " was: " + record);
|
|
|
- assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
|
|
|
- recStart = c.getRecordOffsets()[rec] +1;
|
|
|
- }
|
|
|
- assertTrue(c.getDataType().equals("lines"));
|
|
|
- testFile = makeTestFile("/tmp/chukwaLogRotateTest",40);
|
|
|
- c = chunks.waitForAChunk();
|
|
|
- System.out.println("got chunk");
|
|
|
- while(!c.getDataType().equals("lines")) {
|
|
|
- c = chunks.waitForAChunk();
|
|
|
- }
|
|
|
- //assertTrue(c.getSeqID() == testFile.length());
|
|
|
- assertTrue(c.getRecordOffsets().length == 40);
|
|
|
- recStart = 0;
|
|
|
- for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
|
|
|
- String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
|
|
|
- System.out.println("record "+ rec+ " was: " + record);
|
|
|
- assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
|
|
|
- recStart = c.getRecordOffsets()[rec] +1;
|
|
|
- }
|
|
|
- assertTrue(c.getDataType().equals("lines"));
|
|
|
- agent.stopAdaptor(adaptorId, false);
|
|
|
- agent.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
private File makeTestFile(String name, int size) throws IOException {
|
|
|
File tmpOutput = new File(name);
|
|
|
FileOutputStream fos = new FileOutputStream(tmpOutput);
|