|
@@ -27,10 +27,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
|
|
|
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
|
* <p>
|
|
@@ -50,14 +50,15 @@ public class SerializationFactory extends Configured {
|
|
|
* <p>
|
|
|
* Serializations are found by reading the <code>io.serializations</code>
|
|
|
* property from <code>conf</code>, which is a comma-delimited list of
|
|
|
- * classnames.
|
|
|
+ * classnames.
|
|
|
* </p>
|
|
|
*/
|
|
|
public SerializationFactory(Configuration conf) {
|
|
|
super(conf);
|
|
|
- for (String serializerName : conf.getStrings("io.serializations",
|
|
|
- new String[]{WritableSerialization.class.getName(),
|
|
|
- AvroSpecificSerialization.class.getName(),
|
|
|
+ for (String serializerName : conf.getStrings(
|
|
|
+ CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
|
|
|
+ new String[]{WritableSerialization.class.getName(),
|
|
|
+ AvroSpecificSerialization.class.getName(),
|
|
|
AvroReflectSerialization.class.getName()})) {
|
|
|
add(conf, serializerName);
|
|
|
}
|
|
@@ -67,27 +68,35 @@ public class SerializationFactory extends Configured {
|
|
|
private void add(Configuration conf, String serializationName) {
|
|
|
try {
|
|
|
Class<? extends Serialization> serializionClass =
|
|
|
- (Class<? extends Serialization>) conf.getClassByName(serializationName);
|
|
|
+ (Class<? extends Serialization>) conf.getClassByName(serializationName);
|
|
|
serializations.add((Serialization)
|
|
|
- ReflectionUtils.newInstance(serializionClass, getConf()));
|
|
|
+ ReflectionUtils.newInstance(serializionClass, getConf()));
|
|
|
} catch (ClassNotFoundException e) {
|
|
|
LOG.warn("Serialization class not found: ", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public <T> Serializer<T> getSerializer(Class<T> c) {
|
|
|
- return getSerialization(c).getSerializer(c);
|
|
|
+ Serialization<T> serializer = getSerialization(c);
|
|
|
+ if (serializer != null) {
|
|
|
+ return serializer.getSerializer(c);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
public <T> Deserializer<T> getDeserializer(Class<T> c) {
|
|
|
- return getSerialization(c).getDeserializer(c);
|
|
|
+ Serialization<T> serializer = getSerialization(c);
|
|
|
+ if (serializer != null) {
|
|
|
+ return serializer.getDeserializer(c);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public <T> Serialization<T> getSerialization(Class<T> c) {
|
|
|
for (Serialization serialization : serializations) {
|
|
|
if (serialization.accept(c)) {
|
|
|
- return (Serialization<T>) serialization;
|
|
|
+ return (Serialization<T>) serialization;
|
|
|
}
|
|
|
}
|
|
|
return null;
|