|
@@ -20,18 +20,12 @@ package org.apache.hadoop.fs.azurebfs.services;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
-import java.io.OutputStream;
|
|
|
import java.net.HttpURLConnection;
|
|
|
-import java.net.ProtocolException;
|
|
|
import java.net.URL;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-
|
|
|
-import javax.net.ssl.HttpsURLConnection;
|
|
|
-import javax.net.ssl.SSLSocketFactory;
|
|
|
-
|
|
|
-import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
|
|
|
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonFactory;
|
|
|
import com.fasterxml.jackson.core.JsonParser;
|
|
@@ -40,37 +34,39 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
|
|
-
|
|
|
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
|
|
|
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
|
|
-import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
|
|
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
|
|
|
|
|
|
/**
|
|
|
- * Represents an HTTP operation.
|
|
|
+ * Base Http operation class for orchestrating server IO calls. Child classes would
|
|
|
+ * define the certain orchestration implementation on the basis of network library used.
|
|
|
+ * <p>
|
|
|
+ * For JDK netlib usage, the child class would be {@link AbfsJdkHttpOperation}. <br>
|
|
|
+ * For ApacheHttpClient netlib usage, the child class would be {@link AbfsAHCHttpOperation}.
|
|
|
*/
|
|
|
-public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
- private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
|
|
|
+public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
+
|
|
|
+ private final Logger log;
|
|
|
|
|
|
private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
|
|
|
|
|
|
private static final int ONE_THOUSAND = 1000;
|
|
|
+
|
|
|
private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;
|
|
|
|
|
|
private final String method;
|
|
|
private final URL url;
|
|
|
private String maskedUrl;
|
|
|
private String maskedEncodedUrl;
|
|
|
-
|
|
|
- private HttpURLConnection connection;
|
|
|
private int statusCode;
|
|
|
private String statusDescription;
|
|
|
private String storageErrorCode = "";
|
|
|
- private String storageErrorMessage = "";
|
|
|
- private String requestId = "";
|
|
|
+ private String storageErrorMessage = "";
|
|
|
+ private String requestId = "";
|
|
|
private String expectedAppendPos = "";
|
|
|
private ListResultSchema listResultSchema = null;
|
|
|
|
|
@@ -85,6 +81,23 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
private boolean shouldMask = false;
|
|
|
private boolean connectionDisconnectedOnError = false;
|
|
|
|
|
|
+ /**Request headers to be sent in the request.*/
|
|
|
+ private final List<AbfsHttpHeader> requestHeaders;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Timeout that defines maximum allowed connection establishment time for a request.
|
|
|
+ * Timeout is in milliseconds. Not all requests need to establish a new connection,
|
|
|
+ * it depends on the connection pooling-heuristic of the networking library.
|
|
|
+ */
|
|
|
+ private final int connectionTimeout;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Timeout in milliseconds that defines maximum allowed time to read the response.
|
|
|
+ * This timeout starts once request is sent. It includes server reponse time,
|
|
|
+ * network latency, and time to read the response.
|
|
|
+ */
|
|
|
+ private final int readTimeout;
|
|
|
+
|
|
|
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
|
|
|
final URL url,
|
|
|
final String method,
|
|
@@ -94,6 +107,21 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
return httpOp;
|
|
|
}
|
|
|
|
|
|
+ public AbfsHttpOperation(
|
|
|
+ final Logger log,
|
|
|
+ final URL url,
|
|
|
+ final String method,
|
|
|
+ final List<AbfsHttpHeader> requestHeaders,
|
|
|
+ final Duration connectionTimeout,
|
|
|
+ final Duration readTimeout) {
|
|
|
+ this.log = log;
|
|
|
+ this.url = url;
|
|
|
+ this.method = method;
|
|
|
+ this.requestHeaders = requestHeaders;
|
|
|
+ this.connectionTimeout = (int) connectionTimeout.toMillis();
|
|
|
+ this.readTimeout = (int) readTimeout.toMillis();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Constructor for FixedResult instance, avoiding connection init.
|
|
|
* @param url request url
|
|
@@ -103,13 +131,25 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
protected AbfsHttpOperation(final URL url,
|
|
|
final String method,
|
|
|
final int httpStatus) {
|
|
|
+ this.log = LoggerFactory.getLogger(AbfsHttpOperation.class);
|
|
|
this.url = url;
|
|
|
this.method = method;
|
|
|
this.statusCode = httpStatus;
|
|
|
+ this.requestHeaders = new ArrayList<>();
|
|
|
+ this.connectionTimeout = 0;
|
|
|
+ this.readTimeout = 0;
|
|
|
}
|
|
|
|
|
|
- protected HttpURLConnection getConnection() {
|
|
|
- return connection;
|
|
|
+ int getConnectionTimeout() {
|
|
|
+ return connectionTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getReadTimeout() {
|
|
|
+ return readTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<AbfsHttpHeader> getRequestHeaders() {
|
|
|
+ return requestHeaders;
|
|
|
}
|
|
|
|
|
|
public String getMethod() {
|
|
@@ -137,8 +177,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
}
|
|
|
|
|
|
public String getClientRequestId() {
|
|
|
- return this.connection
|
|
|
- .getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID);
|
|
|
+ return getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID);
|
|
|
}
|
|
|
|
|
|
public String getExpectedAppendPos() {
|
|
@@ -165,13 +204,21 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
return bytesReceived;
|
|
|
}
|
|
|
|
|
|
+ public URL getUrl() {
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+
|
|
|
public ListResultSchema getListResultSchema() {
|
|
|
return listResultSchema;
|
|
|
}
|
|
|
|
|
|
- public String getResponseHeader(String httpHeader) {
|
|
|
- return connection.getHeaderField(httpHeader);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Get response header value for the given headerKey.
|
|
|
+ *
|
|
|
+ * @param httpHeader header key.
|
|
|
+ * @return header value.
|
|
|
+ */
|
|
|
+ public abstract String getResponseHeader(String httpHeader);
|
|
|
|
|
|
// Returns a trace message for the request
|
|
|
@Override
|
|
@@ -235,6 +282,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
return sb.toString();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
public String getMaskedUrl() {
|
|
|
if (!shouldMask) {
|
|
|
return url.toString();
|
|
@@ -246,7 +294,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
return maskedUrl;
|
|
|
}
|
|
|
|
|
|
- public String getMaskedEncodedUrl() {
|
|
|
+ public final String getMaskedEncodedUrl() {
|
|
|
if (maskedEncodedUrl != null) {
|
|
|
return maskedEncodedUrl;
|
|
|
}
|
|
@@ -255,40 +303,6 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Initializes a new HTTP request and opens the connection.
|
|
|
- *
|
|
|
- * @param url The full URL including query string parameters.
|
|
|
- * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
|
|
|
- * @param requestHeaders The HTTP request headers.READ_TIMEOUT
|
|
|
- * @param connectionTimeout The Connection Timeout value to be used while establishing http connection
|
|
|
- * @param readTimeout The Read Timeout value to be used with http connection while making a request
|
|
|
- * @throws IOException if an error occurs.
|
|
|
- */
|
|
|
- public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders,
|
|
|
- final int connectionTimeout, final int readTimeout)
|
|
|
- throws IOException {
|
|
|
- this.url = url;
|
|
|
- this.method = method;
|
|
|
-
|
|
|
- this.connection = openConnection();
|
|
|
- if (this.connection instanceof HttpsURLConnection) {
|
|
|
- HttpsURLConnection secureConn = (HttpsURLConnection) this.connection;
|
|
|
- SSLSocketFactory sslSocketFactory = DelegatingSSLSocketFactory.getDefaultFactory();
|
|
|
- if (sslSocketFactory != null) {
|
|
|
- secureConn.setSSLSocketFactory(sslSocketFactory);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- this.connection.setConnectTimeout(connectionTimeout);
|
|
|
- this.connection.setReadTimeout(readTimeout);
|
|
|
- this.connection.setRequestMethod(method);
|
|
|
-
|
|
|
- for (AbfsHttpHeader header : requestHeaders) {
|
|
|
- setRequestProperty(header.getName(), header.getValue());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
* Sends the HTTP request. Note that HttpUrlConnection requires that an
|
|
|
* empty buffer be sent in order to set the "Content-Length: 0" header, which
|
|
|
* is required by our endpoint.
|
|
@@ -299,74 +313,9 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
*
|
|
|
* @throws IOException if an error occurs.
|
|
|
*/
|
|
|
- public void sendRequest(byte[] buffer, int offset, int length) throws IOException {
|
|
|
- this.connection.setDoOutput(true);
|
|
|
- this.connection.setFixedLengthStreamingMode(length);
|
|
|
- if (buffer == null) {
|
|
|
- // An empty buffer is sent to set the "Content-Length: 0" header, which
|
|
|
- // is required by our endpoint.
|
|
|
- buffer = new byte[]{};
|
|
|
- offset = 0;
|
|
|
- length = 0;
|
|
|
- }
|
|
|
-
|
|
|
- // send the request body
|
|
|
|
|
|
- long startTime = 0;
|
|
|
- startTime = System.nanoTime();
|
|
|
- OutputStream outputStream = null;
|
|
|
- // Updates the expected bytes to be sent based on length.
|
|
|
- this.expectedBytesToBeSent = length;
|
|
|
- try {
|
|
|
- try {
|
|
|
- /* Without expect header enabled, if getOutputStream() throws
|
|
|
- an exception, it gets caught by the restOperation. But with
|
|
|
- expect header enabled we return back without throwing an exception
|
|
|
- for the correct response code processing.
|
|
|
- */
|
|
|
- outputStream = getConnOutputStream();
|
|
|
- } catch (IOException e) {
|
|
|
- connectionDisconnectedOnError = true;
|
|
|
- /* If getOutputStream fails with an expect-100 exception , we return back
|
|
|
- without throwing an exception to the caller. Else, we throw back the exception.
|
|
|
- */
|
|
|
- String expectHeader = getConnProperty(EXPECT);
|
|
|
- if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
|
|
|
- && e instanceof ProtocolException
|
|
|
- && EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
|
|
|
- LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
|
|
|
- /*
|
|
|
- * In case expect-100 assertion has failed, headers and inputStream should not
|
|
|
- * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(),
|
|
|
- * conn.getInputStream() will lead to repeated server call.
|
|
|
- * ref: https://bugs.openjdk.org/browse/JDK-8314978.
|
|
|
- * Reading conn.responseCode() and conn.getResponseMessage() is safe in
|
|
|
- * case of Expect-100 error. Reason being, in JDK, it stores the responseCode
|
|
|
- * in the HttpUrlConnection object before throwing exception to the caller.
|
|
|
- */
|
|
|
- this.statusCode = getConnResponseCode();
|
|
|
- this.statusDescription = getConnResponseMessage();
|
|
|
- return;
|
|
|
- } else {
|
|
|
- LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- // update bytes sent for successful as well as failed attempts via the
|
|
|
- // accompanying statusCode.
|
|
|
- this.bytesSent = length;
|
|
|
-
|
|
|
- // If this fails with or without expect header enabled,
|
|
|
- // it throws an IOException.
|
|
|
- outputStream.write(buffer, offset, length);
|
|
|
- } finally {
|
|
|
- // Closing the opened output stream
|
|
|
- if (outputStream != null) {
|
|
|
- outputStream.close();
|
|
|
- }
|
|
|
- this.sendRequestTimeMs = elapsedTimeMs(startTime);
|
|
|
- }
|
|
|
- }
|
|
|
+ public abstract void sendPayload(byte[] buffer, int offset, int length) throws
|
|
|
+ IOException;
|
|
|
|
|
|
/**
|
|
|
* Gets and processes the HTTP response.
|
|
@@ -377,35 +326,31 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
*
|
|
|
* @throws IOException if an error occurs.
|
|
|
*/
|
|
|
- public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
|
|
|
- if (connectionDisconnectedOnError) {
|
|
|
- LOG.debug("This connection was not successful or has been disconnected, "
|
|
|
- + "hence not parsing headers and inputStream");
|
|
|
- return;
|
|
|
- }
|
|
|
- processConnHeadersAndInputStreams(buffer, offset, length);
|
|
|
- }
|
|
|
+ public abstract void processResponse(byte[] buffer,
|
|
|
+ int offset,
|
|
|
+ int length) throws IOException;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set request header.
|
|
|
+ *
|
|
|
+ * @param key header key.
|
|
|
+ * @param value header value.
|
|
|
+ */
|
|
|
+ public abstract void setRequestProperty(String key, String value);
|
|
|
|
|
|
- void processConnHeadersAndInputStreams(final byte[] buffer,
|
|
|
+ /**
|
|
|
+ * Parse response body from the connection.
|
|
|
+ *
|
|
|
+ * @param buffer byte array to store the response body.
|
|
|
+ * @param offset offset in the buffer.
|
|
|
+ * @param length length of the response body.
|
|
|
+ *
|
|
|
+ * @throws IOException if network error occurs while reading the response.
|
|
|
+ */
|
|
|
+ final void parseResponse(final byte[] buffer,
|
|
|
final int offset,
|
|
|
final int length) throws IOException {
|
|
|
- // get the response
|
|
|
- long startTime = 0;
|
|
|
- startTime = System.nanoTime();
|
|
|
-
|
|
|
- this.statusCode = getConnResponseCode();
|
|
|
- this.recvResponseTimeMs = elapsedTimeMs(startTime);
|
|
|
-
|
|
|
- this.statusDescription = getConnResponseMessage();
|
|
|
-
|
|
|
- this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID);
|
|
|
- if (this.requestId == null) {
|
|
|
- this.requestId = AbfsHttpConstants.EMPTY_STRING;
|
|
|
- }
|
|
|
- // dump the headers
|
|
|
- AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
|
|
|
- connection.getHeaderFields());
|
|
|
-
|
|
|
+ long startTime;
|
|
|
if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
|
|
|
// If it is HEAD, and it is ERROR
|
|
|
return;
|
|
@@ -416,12 +361,19 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
|
|
processStorageErrorResponse();
|
|
|
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
|
|
- this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0);
|
|
|
+ String contentLength = getResponseHeader(
|
|
|
+ HttpHeaderConfigurations.CONTENT_LENGTH);
|
|
|
+ if (contentLength != null) {
|
|
|
+ this.bytesReceived = Long.parseLong(contentLength);
|
|
|
+ } else {
|
|
|
+ this.bytesReceived = 0L;
|
|
|
+ }
|
|
|
+
|
|
|
} else {
|
|
|
// consume the input stream to release resources
|
|
|
int totalBytesRead = 0;
|
|
|
|
|
|
- try (InputStream stream = this.connection.getInputStream()) {
|
|
|
+ try (InputStream stream = getContentInputStream()) {
|
|
|
if (isNullInputStream(stream)) {
|
|
|
return;
|
|
|
}
|
|
@@ -429,12 +381,15 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
|
|
|
// this is a list operation and need to retrieve the data
|
|
|
// need a better solution
|
|
|
- if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) {
|
|
|
+ if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method)
|
|
|
+ && buffer == null) {
|
|
|
parseListFilesResponse(stream);
|
|
|
} else {
|
|
|
if (buffer != null) {
|
|
|
while (totalBytesRead < length) {
|
|
|
- int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead);
|
|
|
+ int bytesRead = stream.read(buffer, offset + totalBytesRead,
|
|
|
+ length
|
|
|
+ - totalBytesRead);
|
|
|
if (bytesRead == -1) {
|
|
|
endOfStream = true;
|
|
|
break;
|
|
@@ -452,9 +407,9 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.warn("IO/Network error: {} {}: {}",
|
|
|
+ log.warn("IO/Network error: {} {}: {}",
|
|
|
method, getMaskedUrl(), ex.getMessage());
|
|
|
- LOG.debug("IO Error: ", ex);
|
|
|
+ log.debug("IO Error: ", ex);
|
|
|
throw ex;
|
|
|
} finally {
|
|
|
this.recvResponseTimeMs += elapsedTimeMs(startTime);
|
|
@@ -463,23 +418,12 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void setRequestProperty(String key, String value) {
|
|
|
- this.connection.setRequestProperty(key, value);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
- * Open the HTTP connection.
|
|
|
- *
|
|
|
- * @throws IOException if an error occurs.
|
|
|
+ * Get the response stream from the connection.
|
|
|
+ * @return InputStream: response stream from the connection after network call.
|
|
|
+ * @throws IOException if the response stream could not be created from the connection.
|
|
|
*/
|
|
|
- private HttpURLConnection openConnection() throws IOException {
|
|
|
- long start = System.nanoTime();
|
|
|
- try {
|
|
|
- return (HttpURLConnection) url.openConnection();
|
|
|
- } finally {
|
|
|
- connectionTimeMs = elapsedTimeMs(start);
|
|
|
- }
|
|
|
- }
|
|
|
+ protected abstract InputStream getContentInputStream() throws IOException;
|
|
|
|
|
|
/**
|
|
|
* When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex);
|
|
@@ -499,7 +443,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
*
|
|
|
*/
|
|
|
private void processStorageErrorResponse() {
|
|
|
- try (InputStream stream = connection.getErrorStream()) {
|
|
|
+ try (InputStream stream = getErrorStream()) {
|
|
|
if (stream == null) {
|
|
|
return;
|
|
|
}
|
|
@@ -536,24 +480,25 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
// Ignore errors that occur while attempting to parse the storage
|
|
|
// error, since the response may have been handled by the HTTP driver
|
|
|
// or for other reasons have an unexpected
|
|
|
- LOG.debug("ExpectedError: ", ex);
|
|
|
+ log.debug("ExpectedError: ", ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the elapsed time in milliseconds.
|
|
|
+ * Get the error stream from the connection.
|
|
|
+ * @return InputStream
|
|
|
+ * @throws IOException if the error stream could not be created from the response stream.
|
|
|
*/
|
|
|
- private long elapsedTimeMs(final long startTime) {
|
|
|
- return (System.nanoTime() - startTime) / ONE_MILLION;
|
|
|
- }
|
|
|
+ protected abstract InputStream getErrorStream() throws IOException;
|
|
|
|
|
|
/**
|
|
|
* Parse the list file response
|
|
|
*
|
|
|
* @param stream InputStream contains the list results.
|
|
|
- * @throws IOException
|
|
|
+ * @throws IOException if the response cannot be deserialized.
|
|
|
*/
|
|
|
- private void parseListFilesResponse(final InputStream stream) throws IOException {
|
|
|
+ private void parseListFilesResponse(final InputStream stream)
|
|
|
+ throws IOException {
|
|
|
if (stream == null) {
|
|
|
return;
|
|
|
}
|
|
@@ -565,13 +510,21 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
|
|
|
try {
|
|
|
final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
- this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class);
|
|
|
+ this.listResultSchema = objectMapper.readValue(stream,
|
|
|
+ ListResultSchema.class);
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.error("Unable to deserialize list results", ex);
|
|
|
+ log.error("Unable to deserialize list results", ex);
|
|
|
throw ex;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the elapsed time in milliseconds.
|
|
|
+ */
|
|
|
+ final long elapsedTimeMs(final long startTime) {
|
|
|
+ return (System.nanoTime() - startTime) / ONE_MILLION;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check null stream, this is to pass findbugs's redundant check for NULL
|
|
|
* @param stream InputStream
|
|
@@ -585,55 +538,148 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
* @param key The request property key.
|
|
|
* @return request peoperty value.
|
|
|
*/
|
|
|
- String getConnProperty(String key) {
|
|
|
- return connection.getRequestProperty(key);
|
|
|
- }
|
|
|
+ abstract String getConnProperty(String key);
|
|
|
|
|
|
/**
|
|
|
* Gets the connection url.
|
|
|
* @return url.
|
|
|
*/
|
|
|
- URL getConnUrl() {
|
|
|
- return connection.getURL();
|
|
|
+ abstract URL getConnUrl();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets the connection response code.
|
|
|
+ * @return response code.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ abstract Integer getConnResponseCode() throws IOException;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets the connection response message.
|
|
|
+ * @return response message.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ abstract String getConnResponseMessage() throws IOException;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get request headers.
|
|
|
+ *
|
|
|
+ * @return request headers.
|
|
|
+ */
|
|
|
+ abstract Map<String, List<String>> getRequestProperties();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get request header value for a header name.
|
|
|
+ *
|
|
|
+ * @param headerName header name.
|
|
|
+ * @return header value.
|
|
|
+ */
|
|
|
+ abstract String getRequestProperty(String headerName);
|
|
|
+
|
|
|
+ boolean getConnectionDisconnectedOnError() {
|
|
|
+ return connectionDisconnectedOnError;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the connection request method.
|
|
|
- * @return request method.
|
|
|
+ * Get the suffix to add to the tracing context that defines what http-client is
|
|
|
+ * used to make the network call
|
|
|
+ * @return the suffix to distinguish http client
|
|
|
*/
|
|
|
- String getConnRequestMethod() {
|
|
|
- return connection.getRequestMethod();
|
|
|
+ public abstract String getTracingContextSuffix();
|
|
|
+
|
|
|
+ public final long getSendLatency() {
|
|
|
+ return sendRequestTimeMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ public final long getRecvLatency() {
|
|
|
+ return recvResponseTimeMs;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the connection response code.
|
|
|
- * @return response code.
|
|
|
- * @throws IOException
|
|
|
+ * Set response status code for the server call.
|
|
|
+ *
|
|
|
+ * @param statusCode status code.
|
|
|
*/
|
|
|
- Integer getConnResponseCode() throws IOException {
|
|
|
- return connection.getResponseCode();
|
|
|
+ protected void setStatusCode(final int statusCode) {
|
|
|
+ this.statusCode = statusCode;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the connection output stream.
|
|
|
- * @return output stream.
|
|
|
- * @throws IOException
|
|
|
+ * Sets response status description for the server call.
|
|
|
+ *
|
|
|
+ * @param statusDescription status description.
|
|
|
*/
|
|
|
- OutputStream getConnOutputStream() throws IOException {
|
|
|
- return connection.getOutputStream();
|
|
|
+ protected void setStatusDescription(final String statusDescription) {
|
|
|
+ this.statusDescription = statusDescription;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Gets the connection response message.
|
|
|
- * @return response message.
|
|
|
- * @throws IOException
|
|
|
+ * Set x-ms-request-id value from the server call response header.
|
|
|
*/
|
|
|
- String getConnResponseMessage() throws IOException {
|
|
|
- return connection.getResponseMessage();
|
|
|
+ protected void setRequestId() {
|
|
|
+ requestId = getResponseHeader(
|
|
|
+ HttpHeaderConfigurations.X_MS_REQUEST_ID);
|
|
|
+ if (requestId == null) {
|
|
|
+ requestId = AbfsHttpConstants.EMPTY_STRING;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- Boolean getConnectionDisconnectedOnError() {
|
|
|
+ /**
|
|
|
+ * Sets byteSent metric.
|
|
|
+ *
|
|
|
+ * @param bytesSent bytes sent.
|
|
|
+ */
|
|
|
+ protected void setBytesSent(final int bytesSent) {
|
|
|
+ this.bytesSent = bytesSent;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets expected bytes to be sent.
|
|
|
+ *
|
|
|
+ * @param expectedBytesToBeSent expected bytes to be sent.
|
|
|
+ */
|
|
|
+ protected void setExpectedBytesToBeSent(final int expectedBytesToBeSent) {
|
|
|
+ this.expectedBytesToBeSent = expectedBytesToBeSent;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets connection time in milliseconds taken to establish the connection.
|
|
|
+ *
|
|
|
+ * @param connectionTimeMs connection time in milliseconds.
|
|
|
+ */
|
|
|
+ protected void setConnectionTimeMs(final long connectionTimeMs) {
|
|
|
+ this.connectionTimeMs = connectionTimeMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets send request time in milliseconds.
|
|
|
+ *
|
|
|
+ * @param sendRequestTimeMs send request time in milliseconds.
|
|
|
+ */
|
|
|
+ protected void setSendRequestTimeMs(final long sendRequestTimeMs) {
|
|
|
+ this.sendRequestTimeMs = sendRequestTimeMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets receive response time in milliseconds.
|
|
|
+ *
|
|
|
+ * @param recvResponseTimeMs receive response time in milliseconds.
|
|
|
+ */
|
|
|
+ protected void setRecvResponseTimeMs(final long recvResponseTimeMs) {
|
|
|
+ this.recvResponseTimeMs = recvResponseTimeMs;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Marks network error and expect100 failures for send-payload phase.
|
|
|
+ */
|
|
|
+ protected void setConnectionDisconnectedOnError() {
|
|
|
+ this.connectionDisconnectedOnError = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return value of {@link #connectionDisconnectedOnError}
|
|
|
+ */
|
|
|
+ protected boolean isConnectionDisconnectedOnError() {
|
|
|
return connectionDisconnectedOnError;
|
|
|
}
|
|
|
|
|
@@ -652,9 +698,75 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|
|
super(url, method, httpStatus);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void processResponse(final byte[] buffer,
|
|
|
+ final int offset,
|
|
|
+ final int length)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setRequestProperty(final String key, final String value) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected InputStream getContentInputStream() throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected InputStream getErrorStream() throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String getConnProperty(final String key) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ URL getConnUrl() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Integer getConnResponseCode() throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String getConnResponseMessage() throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Map<String, List<String>> getRequestProperties() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String getRequestProperty(final String headerName) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getTracingContextSuffix() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String getResponseHeader(final String httpHeader) {
|
|
|
return "";
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void sendPayload(final byte[] buffer,
|
|
|
+ final int offset,
|
|
|
+ final int length)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ }
|
|
|
}
|
|
|
}
|