|
@@ -20,8 +20,11 @@ package org.apache.hadoop.fs.aliyun.oss;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
+import java.util.ArrayDeque;
|
|
|
+import java.util.Queue;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
|
|
+import com.google.common.util.concurrent.MoreExecutors;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
private final String key;
|
|
|
private Statistics statistics;
|
|
|
private boolean closed;
|
|
|
- private InputStream wrappedStream = null;
|
|
|
private long contentLength;
|
|
|
private long position;
|
|
|
private long partRemaining;
|
|
|
+ private byte[] buffer;
|
|
|
+ private int maxReadAheadPartNumber;
|
|
|
+ private long expectNextPos;
|
|
|
+ private long lastByteStart;
|
|
|
+
|
|
|
+ private ExecutorService readAheadExecutorService;
|
|
|
+ private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
|
|
|
|
|
|
public AliyunOSSInputStream(Configuration conf,
|
|
|
+ ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
|
|
|
AliyunOSSFileSystemStore store, String key, Long contentLength,
|
|
|
Statistics statistics) throws IOException {
|
|
|
+ this.readAheadExecutorService =
|
|
|
+ MoreExecutors.listeningDecorator(readAheadExecutorService);
|
|
|
this.store = store;
|
|
|
this.key = key;
|
|
|
this.statistics = statistics;
|
|
|
this.contentLength = contentLength;
|
|
|
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
|
|
|
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
|
|
|
+ this.maxReadAheadPartNumber = maxReadAheadPartNumber;
|
|
|
+
|
|
|
+ this.expectNextPos = 0;
|
|
|
+ this.lastByteStart = -1;
|
|
|
reopen(0);
|
|
|
closed = false;
|
|
|
}
|
|
@@ -82,15 +98,81 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
partSize = downloadPartSize;
|
|
|
}
|
|
|
|
|
|
- if (wrappedStream != null) {
|
|
|
+ if (this.buffer != null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Aborting old stream to open at pos " + pos);
|
|
|
}
|
|
|
- wrappedStream.close();
|
|
|
+ this.buffer = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isRandomIO = true;
|
|
|
+ if (pos == this.expectNextPos) {
|
|
|
+ isRandomIO = false;
|
|
|
+ } else {
|
|
|
+ //new seek, remove cache buffers if its byteStart is not equal to pos
|
|
|
+ while (readBufferQueue.size() != 0) {
|
|
|
+ if (readBufferQueue.element().getByteStart() != pos) {
|
|
|
+ readBufferQueue.poll();
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ this.expectNextPos = pos + partSize;
|
|
|
+
|
|
|
+ int currentSize = readBufferQueue.size();
|
|
|
+ if (currentSize == 0) {
|
|
|
+ //init lastByteStart to pos - partSize, used by for loop below
|
|
|
+ lastByteStart = pos - partSize;
|
|
|
+ } else {
|
|
|
+ ReadBuffer[] readBuffers = readBufferQueue.toArray(
|
|
|
+ new ReadBuffer[currentSize]);
|
|
|
+ lastByteStart = readBuffers[currentSize - 1].getByteStart();
|
|
|
}
|
|
|
|
|
|
- wrappedStream = store.retrieve(key, pos, pos + partSize -1);
|
|
|
- if (wrappedStream == null) {
|
|
|
+ int maxLen = this.maxReadAheadPartNumber - currentSize;
|
|
|
+ for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
|
|
|
+ if (lastByteStart + partSize * (i + 1) > contentLength) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ long byteStart = lastByteStart + partSize * (i + 1);
|
|
|
+ long byteEnd = byteStart + partSize -1;
|
|
|
+ if (byteEnd >= contentLength) {
|
|
|
+ byteEnd = contentLength - 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
|
|
|
+ if (readBuffer.getBuffer().length == 0) {
|
|
|
+ //EOF
|
|
|
+ readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
|
|
|
+ } else {
|
|
|
+ this.readAheadExecutorService.execute(
|
|
|
+ new AliyunOSSFileReaderTask(key, store, readBuffer));
|
|
|
+ }
|
|
|
+ readBufferQueue.add(readBuffer);
|
|
|
+ if (isRandomIO) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ReadBuffer readBuffer = readBufferQueue.poll();
|
|
|
+ readBuffer.lock();
|
|
|
+ try {
|
|
|
+ readBuffer.await(ReadBuffer.STATUS.INIT);
|
|
|
+ if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
|
|
|
+ this.buffer = null;
|
|
|
+ } else {
|
|
|
+ this.buffer = readBuffer.getBuffer();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.warn("interrupted when wait a read buffer");
|
|
|
+ } finally {
|
|
|
+ readBuffer.unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.buffer == null) {
|
|
|
throw new IOException("Null IO stream");
|
|
|
}
|
|
|
position = pos;
|
|
@@ -105,18 +187,10 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
reopen(position);
|
|
|
}
|
|
|
|
|
|
- int tries = MAX_RETRIES;
|
|
|
- boolean retry;
|
|
|
int byteRead = -1;
|
|
|
- do {
|
|
|
- retry = false;
|
|
|
- try {
|
|
|
- byteRead = wrappedStream.read();
|
|
|
- } catch (Exception e) {
|
|
|
- handleReadException(e, --tries);
|
|
|
- retry = true;
|
|
|
- }
|
|
|
- } while (retry);
|
|
|
+ if (partRemaining != 0) {
|
|
|
+ byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
|
|
|
+ }
|
|
|
if (byteRead >= 0) {
|
|
|
position++;
|
|
|
partRemaining--;
|
|
@@ -161,21 +235,18 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
reopen(position);
|
|
|
}
|
|
|
|
|
|
- int tries = MAX_RETRIES;
|
|
|
- boolean retry;
|
|
|
- int bytes = -1;
|
|
|
- do {
|
|
|
- retry = false;
|
|
|
- try {
|
|
|
- bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
|
|
|
- } catch (Exception e) {
|
|
|
- handleReadException(e, --tries);
|
|
|
- retry = true;
|
|
|
+ int bytes = 0;
|
|
|
+ for (int i = this.buffer.length - (int)partRemaining;
|
|
|
+ i < this.buffer.length; i++) {
|
|
|
+ buf[off + bytesRead] = this.buffer[i];
|
|
|
+ bytes++;
|
|
|
+ bytesRead++;
|
|
|
+ if (off + bytesRead >= len) {
|
|
|
+ break;
|
|
|
}
|
|
|
- } while (retry);
|
|
|
+ }
|
|
|
|
|
|
if (bytes > 0) {
|
|
|
- bytesRead += bytes;
|
|
|
position += bytes;
|
|
|
partRemaining -= bytes;
|
|
|
} else if (partRemaining != 0) {
|
|
@@ -202,9 +273,7 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return;
|
|
|
}
|
|
|
closed = true;
|
|
|
- if (wrappedStream != null) {
|
|
|
- wrappedStream.close();
|
|
|
- }
|
|
|
+ this.buffer = null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -225,7 +294,6 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return;
|
|
|
} else if (pos > position && pos < position + partRemaining) {
|
|
|
long len = pos - position;
|
|
|
- AliyunOSSUtils.skipFully(wrappedStream, len);
|
|
|
position = pos;
|
|
|
partRemaining -= len;
|
|
|
} else {
|
|
@@ -245,18 +313,7 @@ public class AliyunOSSInputStream extends FSInputStream {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- private void handleReadException(Exception e, int tries) throws IOException{
|
|
|
- if (tries == 0) {
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
-
|
|
|
- LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
|
|
|
- " connection at position '" + position + "', " + e.getMessage());
|
|
|
- try {
|
|
|
- Thread.sleep(100);
|
|
|
- } catch (InterruptedException e2) {
|
|
|
- LOG.warn(e2.getMessage());
|
|
|
- }
|
|
|
- reopen(position);
|
|
|
+ public long getExpectNextPos() {
|
|
|
+ return this.expectNextPos;
|
|
|
}
|
|
|
}
|