|
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
/**
|
|
|
* Abstract native raw decoder for all native coders to extend with.
|
|
@@ -34,36 +35,46 @@ abstract class AbstractNativeRawDecoder extends RawErasureDecoder {
|
|
|
public static Logger LOG =
|
|
|
LoggerFactory.getLogger(AbstractNativeRawDecoder.class);
|
|
|
|
|
|
+ // Protect ISA-L coder data structure in native layer from being accessed and
|
|
|
+ // updated concurrently by the init, release and decode functions.
|
|
|
+ protected final ReentrantReadWriteLock decoderLock =
|
|
|
+ new ReentrantReadWriteLock();
|
|
|
+
|
|
|
public AbstractNativeRawDecoder(ErasureCoderOptions coderOptions) {
|
|
|
super(coderOptions);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected synchronized void doDecode(ByteBufferDecodingState decodingState)
|
|
|
+ protected void doDecode(ByteBufferDecodingState decodingState)
|
|
|
throws IOException {
|
|
|
- if (nativeCoder == 0) {
|
|
|
- throw new IOException(String.format("%s closed",
|
|
|
- getClass().getSimpleName()));
|
|
|
- }
|
|
|
- int[] inputOffsets = new int[decodingState.inputs.length];
|
|
|
- int[] outputOffsets = new int[decodingState.outputs.length];
|
|
|
+ decoderLock.readLock().lock();
|
|
|
+ try {
|
|
|
+ if (nativeCoder == 0) {
|
|
|
+ throw new IOException(String.format("%s closed",
|
|
|
+ getClass().getSimpleName()));
|
|
|
+ }
|
|
|
+ int[] inputOffsets = new int[decodingState.inputs.length];
|
|
|
+ int[] outputOffsets = new int[decodingState.outputs.length];
|
|
|
|
|
|
- ByteBuffer buffer;
|
|
|
- for (int i = 0; i < decodingState.inputs.length; ++i) {
|
|
|
- buffer = decodingState.inputs[i];
|
|
|
- if (buffer != null) {
|
|
|
- inputOffsets[i] = buffer.position();
|
|
|
+ ByteBuffer buffer;
|
|
|
+ for (int i = 0; i < decodingState.inputs.length; ++i) {
|
|
|
+ buffer = decodingState.inputs[i];
|
|
|
+ if (buffer != null) {
|
|
|
+ inputOffsets[i] = buffer.position();
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- for (int i = 0; i < decodingState.outputs.length; ++i) {
|
|
|
- buffer = decodingState.outputs[i];
|
|
|
- outputOffsets[i] = buffer.position();
|
|
|
- }
|
|
|
+ for (int i = 0; i < decodingState.outputs.length; ++i) {
|
|
|
+ buffer = decodingState.outputs[i];
|
|
|
+ outputOffsets[i] = buffer.position();
|
|
|
+ }
|
|
|
|
|
|
- performDecodeImpl(decodingState.inputs, inputOffsets,
|
|
|
- decodingState.decodeLength, decodingState.erasedIndexes,
|
|
|
- decodingState.outputs, outputOffsets);
|
|
|
+ performDecodeImpl(decodingState.inputs, inputOffsets,
|
|
|
+ decodingState.decodeLength, decodingState.erasedIndexes,
|
|
|
+ decodingState.outputs, outputOffsets);
|
|
|
+ } finally {
|
|
|
+ decoderLock.readLock().unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected abstract void performDecodeImpl(ByteBuffer[] inputs,
|