|
@@ -38,6 +38,8 @@ import static java.net.HttpURLConnection.HTTP_OK;
|
|
|
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
|
|
|
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
|
|
@@ -50,8 +52,8 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNEC
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
|
|
|
-import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE;
|
|
|
-import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
|
|
|
+import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
|
|
|
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
|
|
@@ -62,6 +64,9 @@ import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
public class TestAbfsRestOperationMockFailures {
|
|
|
+ // In these tests a request first fails with given exceptions and then succeed on retry.
|
|
|
+ // Client Side Throttling Metrics will be updated at least for retried request which succeeded.
|
|
|
+ // For original requests it will be updated only for EGR, IGR, OPR throttling.
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdForConnectTimeoutRetry() throws Exception {
|
|
@@ -131,37 +136,48 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdFor400Retry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400");
|
|
|
+ testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400", 1);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdFor500Retry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500");
|
|
|
+ testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500", 1);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdFor503INGRetry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
|
|
|
- INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
|
|
|
- INGRESS_LIMIT_BREACH_ABBREVIATION);
|
|
|
+ testClientRequestIdForStatusRetry(
|
|
|
+ HTTP_UNAVAILABLE,
|
|
|
+ INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
|
|
|
+ INGRESS_LIMIT_BREACH_ABBREVIATION,
|
|
|
+ 2);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testClientRequestIdFor503egrRetry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
|
|
|
- EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
|
|
|
- EGRESS_LIMIT_BREACH_ABBREVIATION);
|
|
|
+ public void testClientRequestIdFor503EGRRetry() throws Exception {
|
|
|
+ testClientRequestIdForStatusRetry(
|
|
|
+ HTTP_UNAVAILABLE,
|
|
|
+ EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
|
|
|
+ EGRESS_LIMIT_BREACH_ABBREVIATION,
|
|
|
+ 2);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdFor503OPRRetry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
|
|
|
- OPERATION_BREACH_MESSAGE, OPERATION_LIMIT_BREACH_ABBREVIATION);
|
|
|
+ testClientRequestIdForStatusRetry(
|
|
|
+ HTTP_UNAVAILABLE,
|
|
|
+ TPS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
|
|
|
+ TPS_LIMIT_BREACH_ABBREVIATION,
|
|
|
+ 2);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
public void testClientRequestIdFor503OtherRetry() throws Exception {
|
|
|
- testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503");
|
|
|
+ testClientRequestIdForStatusRetry(
|
|
|
+ HTTP_UNAVAILABLE,
|
|
|
+ OTHER_SERVER_THROTTLING.getErrorMessage(),
|
|
|
+ OTHER_SERVER_THROTTLING_ABBREVIATION,
|
|
|
+ 1);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -176,7 +192,6 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
* 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
-
|
|
|
@Test
|
|
|
public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
|
|
|
|
|
@@ -210,6 +225,7 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
|
|
|
Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
|
|
|
Mockito.doReturn("HEAD").when(httpOperation).getMethod();
|
|
|
+ Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage();
|
|
|
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
|
|
|
|
|
|
try {
|
|
@@ -217,20 +233,18 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
abfsRestOperation.execute(tracingContext);
|
|
|
} catch(AbfsRestOperationException ex) {
|
|
|
Assertions.assertThat(ex.getStatusCode())
|
|
|
- .describedAs("Status Code must be HTTP_UNAVAILABLE(409)")
|
|
|
+ .describedAs("Status Code must be HTTP_UNAVAILABLE(503)")
|
|
|
.isEqualTo(HTTP_UNAVAILABLE);
|
|
|
}
|
|
|
|
|
|
// Assert that httpOperation.processResponse was called 3 times.
|
|
|
// One for retry count 0
|
|
|
// One for retry count 1 after failing with CT
|
|
|
- // One for retry count 2 after failing with 50
|
|
|
+ // One for retry count 2 after failing with 503
|
|
|
Mockito.verify(httpOperation, times(3)).processResponse(
|
|
|
nullable(byte[].class), nullable(int.class), nullable(int.class));
|
|
|
|
|
|
- // Assert that Static Retry Policy was used after CT failure.
|
|
|
- // Iteration 1 failed with CT and shouldRetry was called with retry count 0
|
|
|
- // Before iteration 2 sleep will be computed using static retry policy and retry count 1
|
|
|
+ // Primary Request Failed with CT. Static Retry Policy should be used.
|
|
|
Mockito.verify(abfsClient, Mockito.times(1))
|
|
|
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
|
|
Mockito.verify(staticRetryPolicy, Mockito.times(1))
|
|
@@ -245,7 +259,7 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
// Before iteration 3 sleep will be computed using exponential retry policy and retry count 2
|
|
|
// Should retry with retry count 2 will return false and no further requests will be made.
|
|
|
Mockito.verify(abfsClient, Mockito.times(2))
|
|
|
- .getRetryPolicy("503");
|
|
|
+ .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION);
|
|
|
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
|
|
.shouldRetry(1, HTTP_UNAVAILABLE);
|
|
|
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
|
@@ -253,16 +267,17 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
|
|
.getRetryInterval(2);
|
|
|
Mockito.verify(tracingContext, Mockito.times(1))
|
|
|
- .constructHeader(httpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
|
|
+ .constructHeader(httpOperation, EGRESS_LIMIT_BREACH_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
|
|
|
|
|
- // Assert that intercept.updateMetrics was called only once during second Iteration
|
|
|
+ // Assert that intercept.updateMetrics was called 2 times. Both the retried request fails with EGR.
|
|
|
Mockito.verify(intercept, Mockito.times(2))
|
|
|
.updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class));
|
|
|
}
|
|
|
|
|
|
private void testClientRequestIdForStatusRetry(int status,
|
|
|
- String serverErrorMessage,
|
|
|
- String keyExpected) throws Exception {
|
|
|
+ String serverErrorMessage,
|
|
|
+ String keyExpected,
|
|
|
+ int numOfTimesCSTMetricsUpdated) throws Exception {
|
|
|
|
|
|
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
|
|
|
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
|
|
@@ -322,11 +337,14 @@ public class TestAbfsRestOperationMockFailures {
|
|
|
abfsRestOperation.execute(tracingContext);
|
|
|
Assertions.assertThat(count[0]).isEqualTo(2);
|
|
|
|
|
|
+ Mockito.verify(intercept, Mockito.times(numOfTimesCSTMetricsUpdated)).updateMetrics(any(), any());
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
|
|
|
- String[] abbreviationsExpected,
|
|
|
- int len, int numOfCTExceptions) throws Exception {
|
|
|
+ String[] abbreviationsExpected,
|
|
|
+ int len,
|
|
|
+ int numOfCTExceptions) throws Exception {
|
|
|
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
|
|
|
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
|
|
|
ExponentialRetryPolicy.class);
|