|
@@ -43,20 +43,37 @@ import org.apache.hadoop.mapreduce.Counters;
|
|
*/
|
|
*/
|
|
class EventWriter {
|
|
class EventWriter {
|
|
static final String VERSION = "Avro-Json";
|
|
static final String VERSION = "Avro-Json";
|
|
|
|
+ static final String VERSION_BINARY = "Avro-Binary";
|
|
|
|
|
|
private FSDataOutputStream out;
|
|
private FSDataOutputStream out;
|
|
private DatumWriter<Event> writer =
|
|
private DatumWriter<Event> writer =
|
|
new SpecificDatumWriter<Event>(Event.class);
|
|
new SpecificDatumWriter<Event>(Event.class);
|
|
private Encoder encoder;
|
|
private Encoder encoder;
|
|
private static final Log LOG = LogFactory.getLog(EventWriter.class);
|
|
private static final Log LOG = LogFactory.getLog(EventWriter.class);
|
|
-
|
|
|
|
- EventWriter(FSDataOutputStream out) throws IOException {
|
|
|
|
|
|
+ public enum WriteMode { JSON, BINARY }
|
|
|
|
+ private final WriteMode writeMode;
|
|
|
|
+ private final boolean jsonOutput; // Cache value while we have 2 modes
|
|
|
|
+
|
|
|
|
+ EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException {
|
|
this.out = out;
|
|
this.out = out;
|
|
- out.writeBytes(VERSION);
|
|
|
|
|
|
+ this.writeMode = mode;
|
|
|
|
+ if (this.writeMode==WriteMode.JSON) {
|
|
|
|
+ this.jsonOutput = true;
|
|
|
|
+ out.writeBytes(VERSION);
|
|
|
|
+ } else if (this.writeMode==WriteMode.BINARY) {
|
|
|
|
+ this.jsonOutput = false;
|
|
|
|
+ out.writeBytes(VERSION_BINARY);
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Unknown mode: " + mode);
|
|
|
|
+ }
|
|
out.writeBytes("\n");
|
|
out.writeBytes("\n");
|
|
out.writeBytes(Event.SCHEMA$.toString());
|
|
out.writeBytes(Event.SCHEMA$.toString());
|
|
out.writeBytes("\n");
|
|
out.writeBytes("\n");
|
|
- this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out);
|
|
|
|
|
|
+ if (!this.jsonOutput) {
|
|
|
|
+ this.encoder = EncoderFactory.get().binaryEncoder(out, null);
|
|
|
|
+ } else {
|
|
|
|
+ this.encoder = EncoderFactory.get().jsonEncoder(Event.SCHEMA$, out);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
synchronized void write(HistoryEvent event) throws IOException {
|
|
synchronized void write(HistoryEvent event) throws IOException {
|
|
@@ -65,7 +82,9 @@ class EventWriter {
|
|
wrapper.setEvent(event.getDatum());
|
|
wrapper.setEvent(event.getDatum());
|
|
writer.write(wrapper, encoder);
|
|
writer.write(wrapper, encoder);
|
|
encoder.flush();
|
|
encoder.flush();
|
|
- out.writeBytes("\n");
|
|
|
|
|
|
+ if (this.jsonOutput) {
|
|
|
|
+ out.writeBytes("\n");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
void flush() throws IOException {
|
|
void flush() throws IOException {
|