|
@@ -20,11 +20,8 @@ package org.apache.hadoop.fs.aliyun.oss;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
-import java.util.ArrayDeque;
|
|
|
-import java.util.Queue;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
+import java.io.InputStream;
|
|
|
|
|
|
-import com.google.common.util.concurrent.MoreExecutors;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -46,33 +43,20 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
private final String key;
|
|
|
private Statistics statistics;
|
|
|
private boolean closed;
|
|
|
+ private InputStream wrappedStream = null;
|
|
|
private long contentLength;
|
|
|
private long position;
|
|
|
private long partRemaining;
|
|
|
- private byte[] buffer;
|
|
|
- private int maxReadAheadPartNumber;
|
|
|
- private long expectNextPos;
|
|
|
- private long lastByteStart;
|
|
|
-
|
|
|
- private ExecutorService readAheadExecutorService;
|
|
|
- private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
|
|
|
|
|
|
public AliyunOSSInputStream(Configuration conf,
|
|
|
- ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
|
|
|
AliyunOSSFileSystemStore store, String key, Long contentLength,
|
|
|
Statistics statistics) throws IOException {
|
|
|
- this.readAheadExecutorService =
|
|
|
- MoreExecutors.listeningDecorator(readAheadExecutorService);
|
|
|
this.store = store;
|
|
|
this.key = key;
|
|
|
this.statistics = statistics;
|
|
|
this.contentLength = contentLength;
|
|
|
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
|
|
|
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
|
|
- this.maxReadAheadPartNumber = maxReadAheadPartNumber;
|
|
|
-
|
|
|
- this.expectNextPos = 0;
|
|
|
- this.lastByteStart = -1;
|
|
|
reopen(0);
|
|
|
closed = false;
|
|
|
}
|
|
@@ -98,81 +82,15 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
partSize = downloadPartSize;
|
|
|
}
|
|
|
|
|
|
- if (this.buffer != null) {
|
|
|
+ if (wrappedStream != null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Aborting old stream to open at pos " + pos);
|
|
|
}
|
|
|
- this.buffer = null;
|
|
|
- }
|
|
|
-
|
|
|
- boolean isRandomIO = true;
|
|
|
- if (pos == this.expectNextPos) {
|
|
|
- isRandomIO = false;
|
|
|
- } else {
|
|
|
- //new seek, remove cache buffers if its byteStart is not equal to pos
|
|
|
- while (readBufferQueue.size() != 0) {
|
|
|
- if (readBufferQueue.element().getByteStart() != pos) {
|
|
|
- readBufferQueue.poll();
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- this.expectNextPos = pos + partSize;
|
|
|
-
|
|
|
- int currentSize = readBufferQueue.size();
|
|
|
- if (currentSize == 0) {
|
|
|
- //init lastByteStart to pos - partSize, used by for loop below
|
|
|
- lastByteStart = pos - partSize;
|
|
|
- } else {
|
|
|
- ReadBuffer[] readBuffers = readBufferQueue.toArray(
|
|
|
- new ReadBuffer[currentSize]);
|
|
|
- lastByteStart = readBuffers[currentSize - 1].getByteStart();
|
|
|
+ wrappedStream.close();
|
|
|
}
|
|
|
|
|
|
- int maxLen = this.maxReadAheadPartNumber - currentSize;
|
|
|
- for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
|
|
|
- if (lastByteStart + partSize * (i + 1) > contentLength) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- long byteStart = lastByteStart + partSize * (i + 1);
|
|
|
- long byteEnd = byteStart + partSize -1;
|
|
|
- if (byteEnd >= contentLength) {
|
|
|
- byteEnd = contentLength - 1;
|
|
|
- }
|
|
|
-
|
|
|
- ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
|
|
|
- if (readBuffer.getBuffer().length == 0) {
|
|
|
- //EOF
|
|
|
- readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
|
|
|
- } else {
|
|
|
- this.readAheadExecutorService.execute(
|
|
|
- new AliyunOSSFileReaderTask(key, store, readBuffer));
|
|
|
- }
|
|
|
- readBufferQueue.add(readBuffer);
|
|
|
- if (isRandomIO) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- ReadBuffer readBuffer = readBufferQueue.poll();
|
|
|
- readBuffer.lock();
|
|
|
- try {
|
|
|
- readBuffer.await(ReadBuffer.STATUS.INIT);
|
|
|
- if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
|
|
|
- this.buffer = null;
|
|
|
- } else {
|
|
|
- this.buffer = readBuffer.getBuffer();
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- LOG.warn("interrupted when wait a read buffer");
|
|
|
- } finally {
|
|
|
- readBuffer.unlock();
|
|
|
- }
|
|
|
-
|
|
|
- if (this.buffer == null) {
|
|
|
+ wrappedStream = store.retrieve(key, pos, pos + partSize -1);
|
|
|
+ if (wrappedStream == null) {
|
|
|
throw new IOException("Null IO stream");
|
|
|
}
|
|
|
position = pos;
|
|
@@ -187,10 +105,18 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
reopen(position);
|
|
|
}
|
|
|
|
|
|
+ int tries = MAX_RETRIES;
|
|
|
+ boolean retry;
|
|
|
int byteRead = -1;
|
|
|
- if (partRemaining != 0) {
|
|
|
- byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
|
|
|
- }
|
|
|
+ do {
|
|
|
+ retry = false;
|
|
|
+ try {
|
|
|
+ byteRead = wrappedStream.read();
|
|
|
+ } catch (Exception e) {
|
|
|
+ handleReadException(e, --tries);
|
|
|
+ retry = true;
|
|
|
+ }
|
|
|
+ } while (retry);
|
|
|
if (byteRead >= 0) {
|
|
|
position++;
|
|
|
partRemaining--;
|
|
@@ -235,18 +161,21 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
reopen(position);
|
|
|
}
|
|
|
|
|
|
- int bytes = 0;
|
|
|
- for (int i = this.buffer.length - (int)partRemaining;
|
|
|
- i < this.buffer.length; i++) {
|
|
|
- buf[off + bytesRead] = this.buffer[i];
|
|
|
- bytes++;
|
|
|
- bytesRead++;
|
|
|
- if (off + bytesRead >= len) {
|
|
|
- break;
|
|
|
+ int tries = MAX_RETRIES;
|
|
|
+ boolean retry;
|
|
|
+ int bytes = -1;
|
|
|
+ do {
|
|
|
+ retry = false;
|
|
|
+ try {
|
|
|
+ bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
|
|
|
+ } catch (Exception e) {
|
|
|
+ handleReadException(e, --tries);
|
|
|
+ retry = true;
|
|
|
}
|
|
|
- }
|
|
|
+ } while (retry);
|
|
|
|
|
|
if (bytes > 0) {
|
|
|
+ bytesRead += bytes;
|
|
|
position += bytes;
|
|
|
partRemaining -= bytes;
|
|
|
} else if (partRemaining != 0) {
|
|
@@ -273,7 +202,9 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return;
|
|
|
}
|
|
|
closed = true;
|
|
|
- this.buffer = null;
|
|
|
+ if (wrappedStream != null) {
|
|
|
+ wrappedStream.close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -294,6 +225,7 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return;
|
|
|
} else if (pos > position && pos < position + partRemaining) {
|
|
|
long len = pos - position;
|
|
|
+ AliyunOSSUtils.skipFully(wrappedStream, len);
|
|
|
position = pos;
|
|
|
partRemaining -= len;
|
|
|
} else {
|
|
@@ -313,7 +245,18 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- public long getExpectNextPos() {
|
|
|
- return this.expectNextPos;
|
|
|
+ private void handleReadException(Exception e, int tries) throws IOException{
|
|
|
+ if (tries == 0) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
|
|
|
+ " connection at position '" + position + "', " + e.getMessage());
|
|
|
+ try {
|
|
|
+ Thread.sleep(100);
|
|
|
+ } catch (InterruptedException e2) {
|
|
|
+ LOG.warn(e2.getMessage());
|
|
|
+ }
|
|
|
+ reopen(position);
|
|
|
}
|
|
|
}
|