|
@@ -24,7 +24,6 @@ import java.io.OutputStream;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configurable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.io.compress.snappy.LoadSnappy;
|
|
|
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
|
|
|
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
@@ -34,11 +33,6 @@ import org.apache.hadoop.util.NativeCodeLoader;
|
|
|
* This class creates snappy compressors/decompressors.
|
|
|
*/
|
|
|
public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
-
|
|
|
- static {
|
|
|
- LoadSnappy.isLoaded();
|
|
|
- }
|
|
|
-
|
|
|
Configuration conf;
|
|
|
|
|
|
/**
|
|
@@ -63,11 +57,26 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
|
|
|
/**
|
|
|
* Are the native snappy libraries loaded & initialized?
|
|
|
- *
|
|
|
- * @return true if loaded & initialized, otherwise false
|
|
|
*/
|
|
|
+ public static void checkNativeCodeLoaded() {
|
|
|
+ if (!NativeCodeLoader.buildSupportsSnappy()) {
|
|
|
+ throw new RuntimeException("native snappy library not available: " +
|
|
|
+ "this version of libhadoop was built without " +
|
|
|
+ "snappy support.");
|
|
|
+ }
|
|
|
+ if (!SnappyCompressor.isNativeCodeLoaded()) {
|
|
|
+ throw new RuntimeException("native snappy library not available: " +
|
|
|
+ "SnappyCompressor has not been loaded.");
|
|
|
+ }
|
|
|
+ if (!SnappyDecompressor.isNativeCodeLoaded()) {
|
|
|
+ throw new RuntimeException("native snappy library not available: " +
|
|
|
+ "SnappyDecompressor has not been loaded.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static boolean isNativeCodeLoaded() {
|
|
|
- return LoadSnappy.isLoaded() && NativeCodeLoader.isNativeCodeLoaded();
|
|
|
+ return SnappyCompressor.isNativeCodeLoaded() &&
|
|
|
+ SnappyDecompressor.isNativeCodeLoaded();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -97,9 +106,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
public CompressionOutputStream createOutputStream(OutputStream out,
|
|
|
Compressor compressor)
|
|
|
throws IOException {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
+ checkNativeCodeLoaded();
|
|
|
int bufferSize = conf.getInt(
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
|
|
@@ -117,10 +124,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
*/
|
|
|
@Override
|
|
|
public Class<? extends Compressor> getCompressorType() {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
-
|
|
|
+ checkNativeCodeLoaded();
|
|
|
return SnappyCompressor.class;
|
|
|
}
|
|
|
|
|
@@ -131,9 +135,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
*/
|
|
|
@Override
|
|
|
public Compressor createCompressor() {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
+ checkNativeCodeLoaded();
|
|
|
int bufferSize = conf.getInt(
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
|
|
@@ -167,10 +169,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
public CompressionInputStream createInputStream(InputStream in,
|
|
|
Decompressor decompressor)
|
|
|
throws IOException {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
-
|
|
|
+ checkNativeCodeLoaded();
|
|
|
return new BlockDecompressorStream(in, decompressor, conf.getInt(
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
|
|
@@ -183,10 +182,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
*/
|
|
|
@Override
|
|
|
public Class<? extends Decompressor> getDecompressorType() {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
-
|
|
|
+ checkNativeCodeLoaded();
|
|
|
return SnappyDecompressor.class;
|
|
|
}
|
|
|
|
|
@@ -197,9 +193,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
|
|
|
*/
|
|
|
@Override
|
|
|
public Decompressor createDecompressor() {
|
|
|
- if (!isNativeCodeLoaded()) {
|
|
|
- throw new RuntimeException("native snappy library not available");
|
|
|
- }
|
|
|
+ checkNativeCodeLoaded();
|
|
|
int bufferSize = conf.getInt(
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
|
|
|
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
|