|
@@ -19,6 +19,8 @@
|
|
package org.apache.hadoop.io;
|
|
package org.apache.hadoop.io;
|
|
|
|
|
|
import java.lang.reflect.Array;
|
|
import java.lang.reflect.Array;
|
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
|
+import java.lang.reflect.Method;
|
|
|
|
|
|
import java.io.*;
|
|
import java.io.*;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
@@ -26,6 +28,9 @@ import java.util.*;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.conf.*;
|
|
import org.apache.hadoop.conf.*;
|
|
|
|
+import org.apache.hadoop.util.ProtoUtil;
|
|
|
|
+
|
|
|
|
+import com.google.protobuf.Message;
|
|
|
|
|
|
/** A polymorphic Writable that writes an instance with it's class name.
|
|
/** A polymorphic Writable that writes an instance with it's class name.
|
|
* Handles arrays, strings and primitive types without a Writable wrapper.
|
|
* Handles arrays, strings and primitive types without a Writable wrapper.
|
|
@@ -191,6 +196,9 @@ public class ObjectWritable implements Writable, Configurable {
|
|
UTF8.writeString(out, instance.getClass().getName());
|
|
UTF8.writeString(out, instance.getClass().getName());
|
|
((Writable)instance).write(out);
|
|
((Writable)instance).write(out);
|
|
|
|
|
|
|
|
+ } else if (Message.class.isAssignableFrom(declaredClass)) {
|
|
|
|
+ ((Message)instance).writeDelimitedTo(
|
|
|
|
+ DataOutputOutputStream.constructOutputStream(out));
|
|
} else {
|
|
} else {
|
|
throw new IOException("Can't write: "+instance+" as "+declaredClass);
|
|
throw new IOException("Can't write: "+instance+" as "+declaredClass);
|
|
}
|
|
}
|
|
@@ -261,6 +269,8 @@ public class ObjectWritable implements Writable, Configurable {
|
|
instance = UTF8.readString(in);
|
|
instance = UTF8.readString(in);
|
|
} else if (declaredClass.isEnum()) { // enum
|
|
} else if (declaredClass.isEnum()) { // enum
|
|
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
|
|
instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
|
|
|
|
+ } else if (Message.class.isAssignableFrom(declaredClass)) {
|
|
|
|
+ instance = tryInstantiateProtobuf(declaredClass, in);
|
|
} else { // Writable
|
|
} else { // Writable
|
|
Class instanceClass = null;
|
|
Class instanceClass = null;
|
|
String str = UTF8.readString(in);
|
|
String str = UTF8.readString(in);
|
|
@@ -285,6 +295,67 @@ public class ObjectWritable implements Writable, Configurable {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Try to instantiate a protocol buffer of the given message class
|
|
|
|
+ * from the given input stream.
|
|
|
|
+ *
|
|
|
|
+ * @param protoClass the class of the generated protocol buffer
|
|
|
|
+ * @param dataIn the input stream to read from
|
|
|
|
+ * @return the instantiated Message instance
|
|
|
|
+ * @throws IOException if an IO problem occurs
|
|
|
|
+ */
|
|
|
|
+ private static Message tryInstantiateProtobuf(
|
|
|
|
+ Class<?> protoClass,
|
|
|
|
+ DataInput dataIn) throws IOException {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ if (dataIn instanceof InputStream) {
|
|
|
|
+ // We can use the built-in parseDelimitedFrom and not have to re-copy
|
|
|
|
+ // the data
|
|
|
|
+ Method parseMethod = getStaticProtobufMethod(protoClass,
|
|
|
|
+ "parseDelimitedFrom", InputStream.class);
|
|
|
|
+ return (Message)parseMethod.invoke(null, (InputStream)dataIn);
|
|
|
|
+ } else {
|
|
|
|
+ // Have to read it into a buffer first, since protobuf doesn't deal
|
|
|
|
+ // with the DataInput interface directly.
|
|
|
|
+
|
|
|
|
+ // Read the size delimiter that writeDelimitedTo writes
|
|
|
|
+ int size = ProtoUtil.readRawVarint32(dataIn);
|
|
|
|
+ if (size < 0) {
|
|
|
|
+ throw new IOException("Invalid size: " + size);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ byte[] data = new byte[size];
|
|
|
|
+ dataIn.readFully(data);
|
|
|
|
+ Method parseMethod = getStaticProtobufMethod(protoClass,
|
|
|
|
+ "parseFrom", byte[].class);
|
|
|
|
+ return (Message)parseMethod.invoke(null, data);
|
|
|
|
+ }
|
|
|
|
+ } catch (InvocationTargetException e) {
|
|
|
|
+
|
|
|
|
+ if (e.getCause() instanceof IOException) {
|
|
|
|
+ throw (IOException)e.getCause();
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException(e.getCause());
|
|
|
|
+ }
|
|
|
|
+ } catch (IllegalAccessException iae) {
|
|
|
|
+ throw new AssertionError("Could not access parse method in " +
|
|
|
|
+ protoClass);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
|
|
|
|
+ Class<?> ... args) {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ return declaredClass.getMethod(method, args);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // This is a bug in Hadoop - protobufs should all have this static method
|
|
|
|
+ throw new AssertionError("Protocol buffer class " + declaredClass +
|
|
|
|
+ " does not have an accessible parseFrom(InputStream) method!");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Find and load the class with given name <tt>className</tt> by first finding
|
|
* Find and load the class with given name <tt>className</tt> by first finding
|
|
* it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
|
|
* it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
|