|
@@ -20,9 +20,16 @@ package org.apache.hadoop.mapreduce.jobhistory;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.DataInputStream;
|
|
|
-import java.io.IOException;
|
|
|
import java.io.EOFException;
|
|
|
+import java.io.IOException;
|
|
|
|
|
|
+import org.apache.avro.AvroRuntimeException;
|
|
|
+import org.apache.avro.Schema;
|
|
|
+import org.apache.avro.io.DatumReader;
|
|
|
+import org.apache.avro.io.Decoder;
|
|
|
+import org.apache.avro.io.DecoderFactory;
|
|
|
+import org.apache.avro.specific.SpecificData;
|
|
|
+import org.apache.avro.specific.SpecificDatumReader;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -31,13 +38,6 @@ import org.apache.hadoop.mapreduce.CounterGroup;
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.util.StringInterner;
|
|
|
|
|
|
-import org.apache.avro.Schema;
|
|
|
-import org.apache.avro.io.Decoder;
|
|
|
-import org.apache.avro.io.DecoderFactory;
|
|
|
-import org.apache.avro.io.DatumReader;
|
|
|
-import org.apache.avro.specific.SpecificData;
|
|
|
-import org.apache.avro.specific.SpecificDatumReader;
|
|
|
-
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Unstable
|
|
|
public class EventReader implements Closeable {
|
|
@@ -72,9 +72,18 @@ public class EventReader implements Closeable {
|
|
|
}
|
|
|
|
|
|
Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
|
|
|
- this.schema = Schema.parse(in.readLine());
|
|
|
- this.reader = new SpecificDatumReader(schema, myschema);
|
|
|
- this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
|
|
|
+ String eventschema = in.readLine();
|
|
|
+ if (null != eventschema) {
|
|
|
+ try {
|
|
|
+ this.schema = Schema.parse(eventschema);
|
|
|
+ this.reader = new SpecificDatumReader(schema, myschema);
|
|
|
+ this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
|
|
|
+ } catch (AvroRuntimeException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IOException("Event schema string not parsed since its null");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|