|
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.s3.S3Exception;
|
|
import org.apache.hadoop.fs.s3.S3Exception;
|
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
@@ -119,7 +120,7 @@ public class NativeS3FileSystem extends FileSystem {
|
|
key);
|
|
key);
|
|
LOG.debug("{}", e, e);
|
|
LOG.debug("{}", e, e);
|
|
try {
|
|
try {
|
|
- seek(pos);
|
|
|
|
|
|
+ reopen(pos);
|
|
result = in.read();
|
|
result = in.read();
|
|
} catch (EOFException eof) {
|
|
} catch (EOFException eof) {
|
|
LOG.debug("EOF on input stream read: {}", eof, eof);
|
|
LOG.debug("EOF on input stream read: {}", eof, eof);
|
|
@@ -148,7 +149,7 @@ public class NativeS3FileSystem extends FileSystem {
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.info( "Received IOException while reading '{}'," +
|
|
LOG.info( "Received IOException while reading '{}'," +
|
|
" attempting to reopen.", key);
|
|
" attempting to reopen.", key);
|
|
- seek(pos);
|
|
|
|
|
|
+ reopen(pos);
|
|
result = in.read(b, off, len);
|
|
result = in.read(b, off, len);
|
|
}
|
|
}
|
|
if (result > 0) {
|
|
if (result > 0) {
|
|
@@ -168,16 +169,21 @@ public class NativeS3FileSystem extends FileSystem {
|
|
/**
|
|
/**
|
|
* Close the inner stream if not null. Even if an exception
|
|
* Close the inner stream if not null. Even if an exception
|
|
* is raised during the close, the field is set to null
|
|
* is raised during the close, the field is set to null
|
|
- * @throws IOException if raised by the close() operation.
|
|
|
|
*/
|
|
*/
|
|
- private void closeInnerStream() throws IOException {
|
|
|
|
- if (in != null) {
|
|
|
|
- try {
|
|
|
|
- in.close();
|
|
|
|
- } finally {
|
|
|
|
- in = null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ private void closeInnerStream() {
|
|
|
|
+ IOUtils.closeStream(in);
|
|
|
|
+ in = null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Reopen a new input stream with the specified position
|
|
|
|
+ * @param pos the position to reopen a new stream
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private synchronized void reopen(long pos) throws IOException {
|
|
|
|
+ LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
|
|
|
|
+ InputStream newStream = store.retrieve(key, pos);
|
|
|
|
+ updateInnerStream(newStream, pos);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -202,9 +208,7 @@ public class NativeS3FileSystem extends FileSystem {
|
|
}
|
|
}
|
|
if (pos != newpos) {
|
|
if (pos != newpos) {
|
|
// the seek is attempting to move the current position
|
|
// the seek is attempting to move the current position
|
|
- LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
|
|
|
|
- InputStream newStream = store.retrieve(key, newpos);
|
|
|
|
- updateInnerStream(newStream, newpos);
|
|
|
|
|
|
+ reopen(newpos);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|