|
@@ -19,6 +19,7 @@ package org.apache.hadoop.chukwa.datacollection.writer;
|
|
|
|
|
|
import java.util.ArrayDeque;
|
|
import java.util.ArrayDeque;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
|
|
|
|
@@ -100,17 +101,15 @@ public class Dedup implements PipelineableWriter {
|
|
this.next = next;
|
|
this.next = next;
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public void add(Chunk data) throws WriterException {
|
|
|
|
- if(! cache.addAndCheck(new DedupKey(data.getStreamName(), data.getSeqID())))
|
|
|
|
- next.add(data);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public void add(List<Chunk> chunks) throws WriterException {
|
|
public void add(List<Chunk> chunks) throws WriterException {
|
|
|
|
+ ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
|
|
for(Chunk c: chunks)
|
|
for(Chunk c: chunks)
|
|
- add(c);
|
|
|
|
-
|
|
|
|
|
|
+ if(! cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
|
|
|
|
+ passedThrough.add(c);
|
|
|
|
+
|
|
|
|
+ if(!passedThrough.isEmpty())
|
|
|
|
+ next.add(passedThrough);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|