|
@@ -1,1116 +0,0 @@
|
|
|
-/*
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
- * distributed with this work for additional information
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
- * limitations under the License.
|
|
|
- *
|
|
|
- */
|
|
|
-
|
|
|
-package org.apache.hadoop.hdfs.web;
|
|
|
-
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.BlockLocation;
|
|
|
-import org.apache.hadoop.fs.CreateFlag;
|
|
|
-import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
-import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
-import org.apache.hadoop.fs.FSInputStream;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ADLFlush;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.LeaseParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.Param;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam;
|
|
|
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.util.Progressable;
|
|
|
-import org.apache.hadoop.util.VersionInfo;
|
|
|
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
-import java.io.OutputStream;
|
|
|
-import java.net.HttpURLConnection;
|
|
|
-import java.net.SocketException;
|
|
|
-import java.net.URI;
|
|
|
-import java.net.URL;
|
|
|
-import java.util.EnumSet;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-/**
|
|
|
- * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
|
|
|
- * specific stability, Reliability and performance improvement.
|
|
|
- * <p>
|
|
|
- * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent
|
|
|
- * implementation on org.apache.hadoop.hdfs.web package to configure query
|
|
|
- * parameters, configuration over HTTP request send to backend .. etc. This
|
|
|
- * class should be refactored and moved under package org.apache.hadoop.fs
|
|
|
- * .adl once the required dependent changes are made into ASF code.
|
|
|
- */
|
|
|
-public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
|
|
|
-
|
|
|
- public static final String SCHEME = "adl";
|
|
|
-
|
|
|
- // Feature configuration
|
|
|
- private boolean featureGetBlockLocationLocallyBundled = true;
|
|
|
- private boolean featureConcurrentReadWithReadAhead = true;
|
|
|
- private boolean featureRedirectOff = true;
|
|
|
- private boolean featureFlushWhenEOF = true;
|
|
|
- private boolean overrideOwner = false;
|
|
|
- private int maxConcurrentConnection;
|
|
|
- private int maxBufferSize;
|
|
|
- private String userName;
|
|
|
-
|
|
|
- /**
|
|
|
- * Constructor.
|
|
|
- */
|
|
|
- public PrivateAzureDataLakeFileSystem() {
|
|
|
- try {
|
|
|
- userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
- } catch (IOException e) {
|
|
|
- userName = "hadoop";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized void initialize(URI uri, Configuration conf)
|
|
|
- throws IOException {
|
|
|
- if (!conf.getBoolean(DFS_WEBHDFS_OAUTH_ENABLED_KEY,
|
|
|
- DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT)) {
|
|
|
- // clone configuration, enable OAuth2
|
|
|
- conf = new Configuration(conf);
|
|
|
- conf.setBoolean(DFS_WEBHDFS_OAUTH_ENABLED_KEY, true);
|
|
|
- }
|
|
|
- super.initialize(uri, conf);
|
|
|
- overrideOwner = getConf()
|
|
|
- .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
|
|
|
- ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
|
|
|
-
|
|
|
- featureRedirectOff = getConf()
|
|
|
- .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF,
|
|
|
- ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT);
|
|
|
-
|
|
|
- featureGetBlockLocationLocallyBundled = getConf()
|
|
|
- .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED,
|
|
|
- ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT);
|
|
|
-
|
|
|
- featureConcurrentReadWithReadAhead = getConf().
|
|
|
- getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD,
|
|
|
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT);
|
|
|
-
|
|
|
- maxBufferSize = getConf().getInt(
|
|
|
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE,
|
|
|
- ADLConfKeys
|
|
|
- .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT);
|
|
|
-
|
|
|
- maxConcurrentConnection = getConf().getInt(
|
|
|
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN,
|
|
|
- ADLConfKeys
|
|
|
- .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected boolean isFeatureGetBlockLocationLocallyBundled() {
|
|
|
- return featureGetBlockLocationLocallyBundled;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected boolean isFeatureConcurrentReadWithReadAhead() {
|
|
|
- return featureConcurrentReadWithReadAhead;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected boolean isFeatureRedirectOff() {
|
|
|
- return featureRedirectOff;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected boolean isOverrideOwnerFeatureOn() {
|
|
|
- return overrideOwner;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected int getMaxBufferSize() {
|
|
|
- return maxBufferSize;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- protected int getMaxConcurrentConnection() {
|
|
|
- return maxConcurrentConnection;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getScheme() {
|
|
|
- return SCHEME;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Constructing home directory locally is fine as long as Hadoop
|
|
|
- * local user name and ADL user name relationship story is not fully baked
|
|
|
- * yet.
|
|
|
- *
|
|
|
- * @return Hadoop local user home directory.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public final Path getHomeDirectory() {
|
|
|
- try {
|
|
|
- return makeQualified(new Path(
|
|
|
- "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
|
|
|
- } catch (IOException e) {
|
|
|
- }
|
|
|
-
|
|
|
- return new Path("/user/" + userName);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Azure data lake does not support user configuration for data replication
|
|
|
- * hence not leaving system to query on
|
|
|
- * azure data lake.
|
|
|
- *
|
|
|
- * Stub implementation
|
|
|
- *
|
|
|
- * @param p Not honoured
|
|
|
- * @param replication Not honoured
|
|
|
- * @return True hard coded since ADL file system does not support
|
|
|
- * replication configuration
|
|
|
- * @throws IOException No exception would not thrown in this case however
|
|
|
- * aligning with parent api definition.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public final boolean setReplication(final Path p, final short replication)
|
|
|
- throws IOException {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param f File/Folder path
|
|
|
- * @return FileStatus instance containing metadata information of f
|
|
|
- * @throws IOException For any system error
|
|
|
- */
|
|
|
- @Override
|
|
|
- public FileStatus getFileStatus(Path f) throws IOException {
|
|
|
- statistics.incrementReadOps(1);
|
|
|
- FileStatus status = super.getFileStatus(f);
|
|
|
-
|
|
|
- if (overrideOwner) {
|
|
|
- FileStatus proxiedStatus = new FileStatus(status.getLen(),
|
|
|
- status.isDirectory(), status.getReplication(), status.getBlockSize(),
|
|
|
- status.getModificationTime(), status.getAccessTime(),
|
|
|
- status.getPermission(), userName, "hdfs", status.getPath());
|
|
|
- return proxiedStatus;
|
|
|
- } else {
|
|
|
- return status;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create call semantic is handled differently in case of ADL. Create
|
|
|
- * semantics is translated to Create/Append
|
|
|
- * semantics.
|
|
|
- * 1. No dedicated connection to server.
|
|
|
- * 2. Buffering is locally done, Once buffer is full or flush is invoked on
|
|
|
- * the by the caller. All the pending
|
|
|
- * data is pushed to ADL as APPEND operation code.
|
|
|
- * 3. On close - Additional call is send to server to close the stream, and
|
|
|
- * release lock from the stream.
|
|
|
- *
|
|
|
- * Necessity of Create/Append semantics is
|
|
|
- * 1. ADL backend server does not allow idle connection for longer duration
|
|
|
- * . In case of slow writer scenario,
|
|
|
- * observed connection timeout/Connection reset causing occasional job
|
|
|
- * failures.
|
|
|
- * 2. Performance boost to jobs which are slow writer, avoided network latency
|
|
|
- * 3. ADL equally better performing with multiple of 4MB chunk as append
|
|
|
- * calls.
|
|
|
- *
|
|
|
- * @param f File path
|
|
|
- * @param permission Access permission for the newly created file
|
|
|
- * @param overwrite Remove existing file and recreate new one if true
|
|
|
- * otherwise throw error if file exist
|
|
|
- * @param bufferSize Buffer size, ADL backend does not honour
|
|
|
- * @param replication Replication count, ADL backend does not honour
|
|
|
- * @param blockSize Block size, ADL backend does not honour
|
|
|
- * @param progress Progress indicator
|
|
|
- * @return FSDataOutputStream OutputStream on which application can push
|
|
|
- * stream of bytes
|
|
|
- * @throws IOException when system error, internal server error or user error
|
|
|
- */
|
|
|
- @Override
|
|
|
- public FSDataOutputStream create(final Path f, final FsPermission permission,
|
|
|
- final boolean overwrite, final int bufferSize, final short replication,
|
|
|
- final long blockSize, final Progressable progress) throws IOException {
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
-
|
|
|
- return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
|
|
|
- new PermissionParam(applyUMask(permission)),
|
|
|
- new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
|
|
|
- new ReplicationParam(replication), new BlockSizeParam(blockSize),
|
|
|
- new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public FSDataOutputStream createNonRecursive(final Path f,
|
|
|
- final FsPermission permission, final EnumSet<CreateFlag> flag,
|
|
|
- final int bufferSize, final short replication, final long blockSize,
|
|
|
- final Progressable progress) throws IOException {
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
-
|
|
|
- String leaseId = java.util.UUID.randomUUID().toString();
|
|
|
- return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
|
|
|
- new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
|
|
|
- new CreateParentParam(false), new BufferSizeParam(bufferSize),
|
|
|
- new ReplicationParam(replication), new LeaseParam(leaseId),
|
|
|
- new BlockSizeParam(blockSize),
|
|
|
- new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Since defined as private in parent class, redefined to pass through
|
|
|
- * Create api implementation.
|
|
|
- *
|
|
|
- * @param permission
|
|
|
- * @return FsPermission list
|
|
|
- */
|
|
|
- private FsPermission applyUMask(FsPermission permission) {
|
|
|
- FsPermission fsPermission = permission;
|
|
|
- if (fsPermission == null) {
|
|
|
- fsPermission = FsPermission.getDefault();
|
|
|
- }
|
|
|
- return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Open call semantic is handled differently in case of ADL. Instead of
|
|
|
- * network stream is returned to the user,
|
|
|
- * Overridden FsInputStream is returned.
|
|
|
- *
|
|
|
- * 1. No dedicated connection to server.
|
|
|
- * 2. Process level concurrent read ahead Buffering is done, This allows
|
|
|
- * data to be available for caller quickly.
|
|
|
- * 3. Number of byte to read ahead is configurable.
|
|
|
- *
|
|
|
- * Advantage of Process level concurrent read ahead Buffering semantics is
|
|
|
- * 1. ADL backend server does not allow idle connection for longer duration
|
|
|
- * . In case of slow reader scenario,
|
|
|
- * observed connection timeout/Connection reset causing occasional job
|
|
|
- * failures.
|
|
|
- * 2. Performance boost to jobs which are slow reader, avoided network latency
|
|
|
- * 3. Compressed format support like ORC, and large data files gains the
|
|
|
- * most out of this implementation.
|
|
|
- *
|
|
|
- * Read ahead feature is configurable.
|
|
|
- *
|
|
|
- * @param f File path
|
|
|
- * @param buffersize Buffer size
|
|
|
- * @return FSDataInputStream InputStream on which application can read
|
|
|
- * stream of bytes
|
|
|
- * @throws IOException when system error, internal server error or user error
|
|
|
- */
|
|
|
- @Override
|
|
|
- public FSDataInputStream open(final Path f, final int buffersize)
|
|
|
- throws IOException {
|
|
|
- statistics.incrementReadOps(1);
|
|
|
-
|
|
|
- final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
|
|
- // use a runner so the open can recover from an invalid token
|
|
|
- FsPathConnectionRunner runner = null;
|
|
|
-
|
|
|
- if (featureConcurrentReadWithReadAhead) {
|
|
|
- URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
|
|
|
- new ReadADLNoRedirectParam(true),
|
|
|
- new ADLVersionInfo(VersionInfo.getVersion()));
|
|
|
-
|
|
|
- BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
|
|
|
- maxBufferSize, maxConcurrentConnection);
|
|
|
-
|
|
|
- FSDataInputStream fin = new FSDataInputStream(bb);
|
|
|
- return fin;
|
|
|
- } else {
|
|
|
- if (featureRedirectOff) {
|
|
|
- runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
|
|
|
- new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
|
|
|
- new ADLVersionInfo(VersionInfo.getVersion()));
|
|
|
- } else {
|
|
|
- runner = new FsPathConnectionRunner(op, f,
|
|
|
- new BufferSizeParam(buffersize));
|
|
|
- }
|
|
|
-
|
|
|
- return new FSDataInputStream(
|
|
|
- new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
|
|
|
- new OffsetUrlOpener(null)));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param f File/Folder path
|
|
|
- * @return FileStatus array list
|
|
|
- * @throws IOException For system error
|
|
|
- */
|
|
|
- @Override
|
|
|
- public FileStatus[] listStatus(final Path f) throws IOException {
|
|
|
- FileStatus[] fileStatuses = super.listStatus(f);
|
|
|
- for (int i = 0; i < fileStatuses.length; i++) {
|
|
|
- if (overrideOwner) {
|
|
|
- fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
|
|
|
- fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
|
|
|
- fileStatuses[i].getBlockSize(),
|
|
|
- fileStatuses[i].getModificationTime(),
|
|
|
- fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
|
|
|
- userName, "hdfs", fileStatuses[i].getPath());
|
|
|
- }
|
|
|
- }
|
|
|
- return fileStatuses;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
|
- final long offset, final long length) throws IOException {
|
|
|
- if (status == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- if (featureGetBlockLocationLocallyBundled) {
|
|
|
- if ((offset < 0) || (length < 0)) {
|
|
|
- throw new IllegalArgumentException("Invalid start or len parameter");
|
|
|
- }
|
|
|
-
|
|
|
- if (status.getLen() < offset) {
|
|
|
- return new BlockLocation[0];
|
|
|
- }
|
|
|
-
|
|
|
- final String[] name = {"localhost"};
|
|
|
- final String[] host = {"localhost"};
|
|
|
- long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be
|
|
|
- // non zero
|
|
|
- int numberOfLocations =
|
|
|
- (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
|
|
|
- BlockLocation[] locations = new BlockLocation[numberOfLocations];
|
|
|
- for (int i = 0; i < locations.length; i++) {
|
|
|
- long currentOffset = offset + (i * blockSize);
|
|
|
- long currentLength = Math
|
|
|
- .min(blockSize, offset + length - currentOffset);
|
|
|
- locations[i] = new BlockLocation(name, host, currentOffset,
|
|
|
- currentLength);
|
|
|
- }
|
|
|
-
|
|
|
- return locations;
|
|
|
- } else {
|
|
|
- return getFileBlockLocations(status.getPath(), offset, length);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
|
|
|
- final long length) throws IOException {
|
|
|
- statistics.incrementReadOps(1);
|
|
|
-
|
|
|
- if (featureGetBlockLocationLocallyBundled) {
|
|
|
- FileStatus fileStatus = getFileStatus(p);
|
|
|
- return getFileBlockLocations(fileStatus, offset, length);
|
|
|
- } else {
|
|
|
- return super.getFileBlockLocations(p, offset, length);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- enum StreamState {
|
|
|
- Initial,
|
|
|
- DataCachedInLocalBuffer,
|
|
|
- StreamEnd
|
|
|
- }
|
|
|
-
|
|
|
- class BatchAppendOutputStream extends OutputStream {
|
|
|
- private Path fsPath;
|
|
|
- private Param<?, ?>[] parameters;
|
|
|
- private byte[] data = null;
|
|
|
- private int offset = 0;
|
|
|
- private long length = 0;
|
|
|
- private boolean eof = false;
|
|
|
- private boolean hadError = false;
|
|
|
- private byte[] dataBuffers = null;
|
|
|
- private int bufSize = 0;
|
|
|
- private boolean streamClosed = false;
|
|
|
-
|
|
|
- public BatchAppendOutputStream(Path path, int bufferSize,
|
|
|
- Param<?, ?>... param) throws IOException {
|
|
|
- if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
|
|
|
- bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
|
|
|
- } else {
|
|
|
- bufSize = bufferSize;
|
|
|
- }
|
|
|
-
|
|
|
- this.fsPath = path;
|
|
|
- this.parameters = param;
|
|
|
- this.data = getBuffer();
|
|
|
- FSDataOutputStream createStream = null;
|
|
|
- try {
|
|
|
- if (featureRedirectOff) {
|
|
|
- CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
|
|
|
- true);
|
|
|
- Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
|
|
|
- new Param<?, ?>[param.length + 2] :
|
|
|
- new Param<?, ?>[param.length + 1];
|
|
|
- System.arraycopy(param, 0, tmpParam, 0, param.length);
|
|
|
- tmpParam[param.length] = skipRedirect;
|
|
|
- if (featureFlushWhenEOF) {
|
|
|
- tmpParam[param.length + 1] = new ADLFlush(false);
|
|
|
- }
|
|
|
- createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
|
|
|
- fsPath, 1, tmpParam).run();
|
|
|
- } else {
|
|
|
- createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
|
|
|
- fsPath, 1, param).run();
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (createStream != null) {
|
|
|
- createStream.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public final synchronized void write(int b) throws IOException {
|
|
|
- if (streamClosed) {
|
|
|
- throw new IOException(fsPath + " stream object is closed.");
|
|
|
- }
|
|
|
-
|
|
|
- if (offset == (data.length)) {
|
|
|
- flush();
|
|
|
- }
|
|
|
-
|
|
|
- data[offset] = (byte) b;
|
|
|
- offset++;
|
|
|
-
|
|
|
- // Statistics will get incremented again as part of the batch updates,
|
|
|
- // decrement here to avoid double value
|
|
|
- if (statistics != null) {
|
|
|
- statistics.incrementBytesWritten(-1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public final synchronized void write(byte[] buf, int off, int len)
|
|
|
- throws IOException {
|
|
|
- if (streamClosed) {
|
|
|
- throw new IOException(fsPath + " stream object is closed.");
|
|
|
- }
|
|
|
-
|
|
|
- int bytesToWrite = len;
|
|
|
- int localOff = off;
|
|
|
- int localLen = len;
|
|
|
- if (localLen >= data.length) {
|
|
|
- // Flush data that is already in our internal buffer
|
|
|
- flush();
|
|
|
-
|
|
|
- // Keep committing data until we have less than our internal buffers
|
|
|
- // length left
|
|
|
- do {
|
|
|
- try {
|
|
|
- commit(buf, localOff, data.length, eof);
|
|
|
- } catch (IOException e) {
|
|
|
- hadError = true;
|
|
|
- throw e;
|
|
|
- }
|
|
|
- localOff += data.length;
|
|
|
- localLen -= data.length;
|
|
|
- } while (localLen >= data.length);
|
|
|
- }
|
|
|
-
|
|
|
- // At this point, we have less than data.length left to copy from users
|
|
|
- // buffer
|
|
|
- if (offset + localLen >= data.length) {
|
|
|
- // Users buffer has enough data left to fill our internal buffer
|
|
|
- int bytesToCopy = data.length - offset;
|
|
|
- System.arraycopy(buf, localOff, data, offset, bytesToCopy);
|
|
|
- offset += bytesToCopy;
|
|
|
-
|
|
|
- // Flush our internal buffer
|
|
|
- flush();
|
|
|
- localOff += bytesToCopy;
|
|
|
- localLen -= bytesToCopy;
|
|
|
- }
|
|
|
-
|
|
|
- if (localLen > 0) {
|
|
|
- // Simply copy the remainder from the users buffer into our internal
|
|
|
- // buffer
|
|
|
- System.arraycopy(buf, localOff, data, offset, localLen);
|
|
|
- offset += localLen;
|
|
|
- }
|
|
|
-
|
|
|
- // Statistics will get incremented again as part of the batch updates,
|
|
|
- // decrement here to avoid double value
|
|
|
- if (statistics != null) {
|
|
|
- statistics.incrementBytesWritten(-bytesToWrite);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public final synchronized void flush() throws IOException {
|
|
|
- if (streamClosed) {
|
|
|
- throw new IOException(fsPath + " stream object is closed.");
|
|
|
- }
|
|
|
-
|
|
|
- if (offset > 0) {
|
|
|
- try {
|
|
|
- commit(data, 0, offset, eof);
|
|
|
- } catch (IOException e) {
|
|
|
- hadError = true;
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- offset = 0;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public final synchronized void close() throws IOException {
|
|
|
- // Stream is closed earlier, return quietly.
|
|
|
- if(streamClosed) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (featureRedirectOff) {
|
|
|
- eof = true;
|
|
|
- }
|
|
|
-
|
|
|
- boolean flushedSomething = false;
|
|
|
- if (hadError) {
|
|
|
- // No point proceeding further since the error has occurred and
|
|
|
- // stream would be required to upload again.
|
|
|
- streamClosed = true;
|
|
|
- return;
|
|
|
- } else {
|
|
|
- flushedSomething = offset > 0;
|
|
|
- try {
|
|
|
- flush();
|
|
|
- } finally {
|
|
|
- streamClosed = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (featureRedirectOff) {
|
|
|
- // If we didn't flush anything from our internal buffer, we have to
|
|
|
- // call the service again
|
|
|
- // with an empty payload and flush=true in the url
|
|
|
- if (!flushedSomething) {
|
|
|
- try {
|
|
|
- commit(null, 0, ADLConfKeys.KB, true);
|
|
|
- } finally {
|
|
|
- streamClosed = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void commit(byte[] buffer, int off, int len, boolean endOfFile)
|
|
|
- throws IOException {
|
|
|
- OutputStream out = null;
|
|
|
- try {
|
|
|
- if (featureRedirectOff) {
|
|
|
- AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
|
|
|
- true);
|
|
|
- Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
|
|
|
- new Param<?, ?>[parameters.length + 3] :
|
|
|
- new Param<?, ?>[parameters.length + 1];
|
|
|
- System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
|
|
|
- tmpParam[parameters.length] = skipRedirect;
|
|
|
- if (featureFlushWhenEOF) {
|
|
|
- tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
|
|
|
- tmpParam[parameters.length + 2] = new OffsetParam(length);
|
|
|
- }
|
|
|
-
|
|
|
- out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
|
|
|
- len, tmpParam).run();
|
|
|
- } else {
|
|
|
- out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
|
|
|
- len, parameters).run();
|
|
|
- }
|
|
|
-
|
|
|
- if (buffer != null) {
|
|
|
- out.write(buffer, off, len);
|
|
|
- length += len;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (out != null) {
|
|
|
- out.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private byte[] getBuffer() {
|
|
|
- // Switch between the first and second buffer
|
|
|
- dataBuffers = new byte[bufSize];
|
|
|
- return dataBuffers;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read data from backend in chunks instead of persistent connection. This
|
|
|
- * is to avoid slow reader causing socket
|
|
|
- * timeout.
|
|
|
- */
|
|
|
- protected class BatchByteArrayInputStream extends FSInputStream {
|
|
|
-
|
|
|
- private static final int SIZE4MB = 4 * 1024 * 1024;
|
|
|
- private final URL runner;
|
|
|
- private byte[] data = null;
|
|
|
- private long validDataHoldingSize = 0;
|
|
|
- private int bufferOffset = 0;
|
|
|
- private long currentFileOffset = 0;
|
|
|
- private long nextFileOffset = 0;
|
|
|
- private long fileSize = 0;
|
|
|
- private StreamState state = StreamState.Initial;
|
|
|
- private int maxBufferSize;
|
|
|
- private int maxConcurrentConnection;
|
|
|
- private Path fsPath;
|
|
|
- private boolean streamIsClosed;
|
|
|
- private Future[] subtasks = null;
|
|
|
-
|
|
|
- BatchByteArrayInputStream(URL url, Path p, int bufferSize,
|
|
|
- int concurrentConnection) throws IOException {
|
|
|
- this.runner = url;
|
|
|
- fsPath = p;
|
|
|
- FileStatus fStatus = getFileStatus(fsPath);
|
|
|
- if (!fStatus.isFile()) {
|
|
|
- throw new IOException("Cannot open the directory " + p + " for " +
|
|
|
- "reading");
|
|
|
- }
|
|
|
- fileSize = fStatus.getLen();
|
|
|
- this.maxBufferSize = bufferSize;
|
|
|
- this.maxConcurrentConnection = concurrentConnection;
|
|
|
- this.streamIsClosed = false;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final int read(long position, byte[] buffer, int offset,
|
|
|
- int length) throws IOException {
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
- long oldPos = this.getPos();
|
|
|
-
|
|
|
- int nread1;
|
|
|
- try {
|
|
|
- this.seek(position);
|
|
|
- nread1 = this.read(buffer, offset, length);
|
|
|
- } finally {
|
|
|
- this.seek(oldPos);
|
|
|
- }
|
|
|
-
|
|
|
- return nread1;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final int read() throws IOException {
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
- int status = doBufferAvailabilityCheck();
|
|
|
- if (status == -1) {
|
|
|
- return status;
|
|
|
- }
|
|
|
- int ch = data[bufferOffset++] & (0xff);
|
|
|
- if (statistics != null) {
|
|
|
- statistics.incrementBytesRead(1);
|
|
|
- }
|
|
|
- return ch;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final void readFully(long position, byte[] buffer,
|
|
|
- int offset, int length) throws IOException {
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
-
|
|
|
- super.readFully(position, buffer, offset, length);
|
|
|
- if (statistics != null) {
|
|
|
- statistics.incrementBytesRead(length);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final int read(byte[] b, int off, int len)
|
|
|
- throws IOException {
|
|
|
- if (b == null) {
|
|
|
- throw new IllegalArgumentException();
|
|
|
- } else if (off < 0 || len < 0 || len > b.length - off) {
|
|
|
- throw new IndexOutOfBoundsException();
|
|
|
- } else if (len == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
- int status = doBufferAvailabilityCheck();
|
|
|
- if (status == -1) {
|
|
|
- return status;
|
|
|
- }
|
|
|
-
|
|
|
- int byteRead = 0;
|
|
|
- long availableBytes = validDataHoldingSize - off;
|
|
|
- long requestedBytes = bufferOffset + len - off;
|
|
|
- if (requestedBytes <= availableBytes) {
|
|
|
- System.arraycopy(data, bufferOffset, b, off, len);
|
|
|
- bufferOffset += len;
|
|
|
- byteRead = len;
|
|
|
- } else {
|
|
|
- byteRead = super.read(b, off, len);
|
|
|
- }
|
|
|
-
|
|
|
- if (statistics != null) {
|
|
|
- statistics.incrementBytesRead(byteRead);
|
|
|
- }
|
|
|
-
|
|
|
- return byteRead;
|
|
|
- }
|
|
|
-
|
|
|
- private int doBufferAvailabilityCheck() throws IOException {
|
|
|
- if (state == StreamState.Initial) {
|
|
|
- validDataHoldingSize = fill(nextFileOffset);
|
|
|
- }
|
|
|
-
|
|
|
- long dataReloadSize = 0;
|
|
|
- switch ((int) validDataHoldingSize) {
|
|
|
- case -1:
|
|
|
- state = StreamState.StreamEnd;
|
|
|
- return -1;
|
|
|
- case 0:
|
|
|
- dataReloadSize = fill(nextFileOffset);
|
|
|
- if (dataReloadSize <= 0) {
|
|
|
- state = StreamState.StreamEnd;
|
|
|
- return (int) dataReloadSize;
|
|
|
- } else {
|
|
|
- validDataHoldingSize = dataReloadSize;
|
|
|
- }
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if (bufferOffset >= validDataHoldingSize) {
|
|
|
- dataReloadSize = fill(nextFileOffset);
|
|
|
- }
|
|
|
-
|
|
|
- if (bufferOffset >= ((dataReloadSize == 0) ?
|
|
|
- validDataHoldingSize :
|
|
|
- dataReloadSize)) {
|
|
|
- state = StreamState.StreamEnd;
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- validDataHoldingSize = ((dataReloadSize == 0) ?
|
|
|
- validDataHoldingSize :
|
|
|
- dataReloadSize);
|
|
|
- state = StreamState.DataCachedInLocalBuffer;
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- private long fill(final long off) throws IOException {
|
|
|
- if (state == StreamState.StreamEnd) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
- if (fileSize <= off) {
|
|
|
- state = StreamState.StreamEnd;
|
|
|
- return -1;
|
|
|
- }
|
|
|
- int len = maxBufferSize;
|
|
|
- long fileOffset = 0;
|
|
|
- boolean isEntireFileCached = true;
|
|
|
- if ((fileSize <= maxBufferSize)) {
|
|
|
- len = (int) fileSize;
|
|
|
- currentFileOffset = 0;
|
|
|
- nextFileOffset = 0;
|
|
|
- } else {
|
|
|
- if (len > (fileSize - off)) {
|
|
|
- len = (int) (fileSize - off);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (BufferManager.getLock()) {
|
|
|
- if (BufferManager.getInstance()
|
|
|
- .hasValidDataForOffset(fsPath.toString(), off)) {
|
|
|
- len = (int) (
|
|
|
- BufferManager.getInstance().getBufferOffset() + BufferManager
|
|
|
- .getInstance().getBufferSize() - (int) off);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (len <= 0) {
|
|
|
- len = maxBufferSize;
|
|
|
- }
|
|
|
- fileOffset = off;
|
|
|
- isEntireFileCached = false;
|
|
|
- }
|
|
|
-
|
|
|
- data = null;
|
|
|
- BufferManager bm = BufferManager.getInstance();
|
|
|
- data = bm.getEmpty(len);
|
|
|
- boolean fetchDataOverNetwork = false;
|
|
|
- synchronized (BufferManager.getLock()) {
|
|
|
- if (bm.hasData(fsPath.toString(), fileOffset, len)) {
|
|
|
- try {
|
|
|
- bm.get(data, fileOffset);
|
|
|
- validDataHoldingSize = data.length;
|
|
|
- currentFileOffset = fileOffset;
|
|
|
- } catch (ArrayIndexOutOfBoundsException e) {
|
|
|
- fetchDataOverNetwork = true;
|
|
|
- }
|
|
|
- } else {
|
|
|
- fetchDataOverNetwork = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (fetchDataOverNetwork) {
|
|
|
- int splitSize = getSplitSize(len);
|
|
|
- try {
|
|
|
- validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
|
|
|
- splitSize);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException("Interrupted filling buffer", e);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (BufferManager.getLock()) {
|
|
|
- bm.add(data, fsPath.toString(), fileOffset);
|
|
|
- }
|
|
|
- currentFileOffset = nextFileOffset;
|
|
|
- }
|
|
|
-
|
|
|
- nextFileOffset += validDataHoldingSize;
|
|
|
- state = StreamState.DataCachedInLocalBuffer;
|
|
|
- bufferOffset = isEntireFileCached ? (int) off : 0;
|
|
|
- return validDataHoldingSize;
|
|
|
- }
|
|
|
-
|
|
|
- int getSplitSize(int size) {
|
|
|
- if (size <= SIZE4MB) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
-
|
|
|
- // Not practical
|
|
|
- if (size > maxBufferSize) {
|
|
|
- size = maxBufferSize;
|
|
|
- }
|
|
|
-
|
|
|
- int equalBufferSplit = Math.max(size / SIZE4MB, 1);
|
|
|
- int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
|
|
|
- return splitSize;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final void seek(long pos) throws IOException {
|
|
|
- if (pos == -1) {
|
|
|
- throw new IOException("Bad offset, cannot seek to " + pos);
|
|
|
- }
|
|
|
-
|
|
|
- BufferManager bm = BufferManager.getInstance();
|
|
|
- synchronized (BufferManager.getLock()) {
|
|
|
- if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
|
|
|
- state = StreamState.DataCachedInLocalBuffer;
|
|
|
- } else if (pos >= 0) {
|
|
|
- state = StreamState.Initial;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- long availableBytes = (currentFileOffset + validDataHoldingSize);
|
|
|
-
|
|
|
- // Check if this position falls under buffered data
|
|
|
- if (pos < currentFileOffset || availableBytes <= 0) {
|
|
|
- validDataHoldingSize = 0;
|
|
|
- currentFileOffset = pos;
|
|
|
- nextFileOffset = pos;
|
|
|
- bufferOffset = 0;
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (pos < availableBytes && pos >= currentFileOffset) {
|
|
|
- state = StreamState.DataCachedInLocalBuffer;
|
|
|
- bufferOffset = (int) (pos - currentFileOffset);
|
|
|
- } else {
|
|
|
- validDataHoldingSize = 0;
|
|
|
- currentFileOffset = pos;
|
|
|
- nextFileOffset = pos;
|
|
|
- bufferOffset = 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final long getPos() throws IOException {
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
- return bufferOffset + currentFileOffset;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final int available() throws IOException {
|
|
|
- if (streamIsClosed) {
|
|
|
- throw new IOException("Stream already closed");
|
|
|
- }
|
|
|
- return Integer.MAX_VALUE;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public final boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private int fillDataConcurrently(byte[] byteArray, int length,
|
|
|
- long globalOffset, int splitSize)
|
|
|
- throws IOException, InterruptedException {
|
|
|
- ExecutorService executor = Executors.newFixedThreadPool(splitSize);
|
|
|
- subtasks = new Future[splitSize];
|
|
|
- for (int i = 0; i < splitSize; i++) {
|
|
|
- int offset = i * (length / splitSize);
|
|
|
- int splitLength = (splitSize == (i + 1)) ?
|
|
|
- (length / splitSize) + (length % splitSize) :
|
|
|
- (length / splitSize);
|
|
|
- subtasks[i] = executor.submit(
|
|
|
- new BackgroundReadThread(byteArray, offset, splitLength,
|
|
|
- globalOffset + offset));
|
|
|
- }
|
|
|
-
|
|
|
- executor.shutdown();
|
|
|
- // wait until all tasks are finished
|
|
|
- executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
|
|
|
- TimeUnit.SECONDS);
|
|
|
-
|
|
|
- int totalBytePainted = 0;
|
|
|
- for (int i = 0; i < splitSize; ++i) {
|
|
|
- try {
|
|
|
- totalBytePainted += (Integer) subtasks[i].get();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException(e.getCause());
|
|
|
- } catch (ExecutionException e) {
|
|
|
- Thread.currentThread().interrupt();
|
|
|
- throw new IOException(e.getCause());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (totalBytePainted != length) {
|
|
|
- throw new IOException("Expected " + length + " bytes, Got " +
|
|
|
- totalBytePainted + " bytes");
|
|
|
- }
|
|
|
-
|
|
|
- return totalBytePainted;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public synchronized final void close() throws IOException {
|
|
|
- synchronized (BufferManager.getLock()) {
|
|
|
- BufferManager.getInstance().clear();
|
|
|
- }
|
|
|
- //need to cleanup the above code the stream and connection close doesn't
|
|
|
- // happen here
|
|
|
- //flag set to mark close happened, cannot use the stream once closed
|
|
|
- streamIsClosed = true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Reads data from the ADL backend from the specified global offset and
|
|
|
- * given
|
|
|
- * length. Read data from ADL backend is copied to buffer array from the
|
|
|
- * offset value specified.
|
|
|
- *
|
|
|
- * @param buffer Store read data from ADL backend in the buffer.
|
|
|
- * @param offset Store read data from ADL backend in the buffer
|
|
|
- * from the
|
|
|
- * offset.
|
|
|
- * @param length Size of the data read from the ADL backend.
|
|
|
- * @param globalOffset Read data from file offset.
|
|
|
- * @return Number of bytes read from the ADL backend
|
|
|
- * @throws IOException For any intermittent server issues or internal
|
|
|
- * failures.
|
|
|
- */
|
|
|
- private int fillUpData(byte[] buffer, int offset, int length,
|
|
|
- long globalOffset) throws IOException {
|
|
|
- int totalBytesRead = 0;
|
|
|
- final URL offsetUrl = new URL(
|
|
|
- runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
|
|
|
- + new LengthParam(String.valueOf(length)));
|
|
|
- HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
|
|
|
- true).run();
|
|
|
- InputStream in = conn.getInputStream();
|
|
|
- try {
|
|
|
- int bytesRead = 0;
|
|
|
- while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
|
|
|
- (int) (length - totalBytesRead))) > 0) {
|
|
|
- totalBytesRead += bytesRead;
|
|
|
- }
|
|
|
-
|
|
|
- // InputStream must be fully consumed to enable http keep-alive
|
|
|
- if (bytesRead == 0) {
|
|
|
- // Looking for EOF marker byte needs to be read.
|
|
|
- if (in.read() != -1) {
|
|
|
- throw new SocketException(
|
|
|
- "Server returned more than requested data.");
|
|
|
- }
|
|
|
- }
|
|
|
- } finally {
|
|
|
- in.close();
|
|
|
- conn.disconnect();
|
|
|
- }
|
|
|
-
|
|
|
- return totalBytesRead;
|
|
|
- }
|
|
|
-
|
|
|
- private class BackgroundReadThread implements Callable {
|
|
|
-
|
|
|
- private final byte[] data;
|
|
|
- private int offset;
|
|
|
- private int length;
|
|
|
- private long globalOffset;
|
|
|
-
|
|
|
- BackgroundReadThread(byte[] buffer, int off, int size, long position) {
|
|
|
- this.data = buffer;
|
|
|
- this.offset = off;
|
|
|
- this.length = size;
|
|
|
- this.globalOffset = position;
|
|
|
- }
|
|
|
-
|
|
|
- public Object call() throws IOException {
|
|
|
- return fillUpData(data, offset, length, globalOffset);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|