|
@@ -32,6 +32,9 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
|
|
|
|
+
|
|
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
@@ -56,6 +59,11 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
String testNetworkStatsString = "http_send";
|
|
String testNetworkStatsString = "http_send";
|
|
long connectionsMade, requestsSent, bytesSent;
|
|
long connectionsMade, requestsSent, bytesSent;
|
|
|
|
|
|
|
|
+ metricMap = fs.getInstrumentationMap();
|
|
|
|
+ long connectionsMadeBeforeTest = metricMap
|
|
|
|
+ .get(CONNECTIONS_MADE.getStatName());
|
|
|
|
+ long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* Creating AbfsOutputStream will result in 1 connection made and 1 send
|
|
* Creating AbfsOutputStream will result in 1 connection made and 1 send
|
|
* request.
|
|
* request.
|
|
@@ -75,27 +83,26 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
/*
|
|
/*
|
|
* Testing the network stats with 1 write operation.
|
|
* Testing the network stats with 1 write operation.
|
|
*
|
|
*
|
|
- * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
|
|
|
|
|
|
+ * connections_made : (connections made above) + 2(flush).
|
|
*
|
|
*
|
|
- * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
|
|
|
|
|
|
+ * send_requests : (requests sent above) + 2(flush).
|
|
*
|
|
*
|
|
* bytes_sent : bytes wrote in AbfsOutputStream.
|
|
* bytes_sent : bytes wrote in AbfsOutputStream.
|
|
*/
|
|
*/
|
|
- if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
|
|
|
|
|
+ long extraCalls = 0;
|
|
|
|
+ if (!fs.getAbfsStore()
|
|
|
|
+ .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
|
// no network calls are made for hflush in case of appendblob
|
|
// no network calls are made for hflush in case of appendblob
|
|
- connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
|
|
|
- 5, metricMap);
|
|
|
|
- requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 3,
|
|
|
|
- metricMap);
|
|
|
|
- } else {
|
|
|
|
- connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
|
|
|
- 6, metricMap);
|
|
|
|
- requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
|
|
|
|
- metricMap);
|
|
|
|
|
|
+ extraCalls++;
|
|
}
|
|
}
|
|
|
|
+ long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
|
|
|
|
+ long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
|
|
|
|
+ connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
|
|
|
|
+ expectedConnectionsMade, metricMap);
|
|
|
|
+ requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
|
|
|
|
+ metricMap);
|
|
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
|
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
|
testNetworkStatsString.getBytes().length, metricMap);
|
|
testNetworkStatsString.getBytes().length, metricMap);
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// To close the AbfsOutputStream 1 connection is made and 1 request is sent.
|
|
// To close the AbfsOutputStream 1 connection is made and 1 request is sent.
|
|
@@ -135,14 +142,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
*/
|
|
*/
|
|
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
|
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
|
// no network calls are made for hflush in case of appendblob
|
|
// no network calls are made for hflush in case of appendblob
|
|
- assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
|
|
|
|
|
+ assertAbfsStatistics(CONNECTIONS_MADE,
|
|
connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
|
|
connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
|
|
- assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
|
|
|
|
|
|
+ assertAbfsStatistics(SEND_REQUESTS,
|
|
requestsSent + 1 + LARGE_OPERATIONS, metricMap);
|
|
requestsSent + 1 + LARGE_OPERATIONS, metricMap);
|
|
} else {
|
|
} else {
|
|
- assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
|
|
|
|
|
+ assertAbfsStatistics(CONNECTIONS_MADE,
|
|
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
|
|
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
|
|
- assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
|
|
|
|
|
|
+ assertAbfsStatistics(SEND_REQUESTS,
|
|
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
|
|
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
|
|
}
|
|
}
|
|
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
|
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
|
@@ -182,6 +189,10 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
out.write(testResponseString.getBytes());
|
|
out.write(testResponseString.getBytes());
|
|
out.hflush();
|
|
out.hflush();
|
|
|
|
|
|
|
|
+ metricMap = fs.getInstrumentationMap();
|
|
|
|
+ long getResponsesBeforeTest = metricMap
|
|
|
|
+ .get(CONNECTIONS_MADE.getStatName());
|
|
|
|
+
|
|
// open would require 1 get response.
|
|
// open would require 1 get response.
|
|
in = fs.open(getResponsePath);
|
|
in = fs.open(getResponsePath);
|
|
// read would require 1 get response and also get the bytes received.
|
|
// read would require 1 get response and also get the bytes received.
|
|
@@ -195,18 +206,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|
/*
|
|
/*
|
|
* Testing values of statistics after writing and reading a buffer.
|
|
* Testing values of statistics after writing and reading a buffer.
|
|
*
|
|
*
|
|
- * get_responses - 6(above operations) + 1(open()) + 1 (read()).
|
|
|
|
|
|
+ * get_responses - (above operations) + 1(open()) + 1 (read()).;
|
|
*
|
|
*
|
|
* bytes_received - This should be equal to bytes sent earlier.
|
|
* bytes_received - This should be equal to bytes sent earlier.
|
|
*/
|
|
*/
|
|
- if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
|
|
|
|
- //for appendBlob hflush is a no-op
|
|
|
|
- getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 7,
|
|
|
|
- metricMap);
|
|
|
|
- } else {
|
|
|
|
- getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
|
|
|
|
- metricMap);
|
|
|
|
|
|
+ long extraCalls = 0;
|
|
|
|
+ if (!fs.getAbfsStore()
|
|
|
|
+ .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
|
|
|
|
+ // no network calls are made for hflush in case of appendblob
|
|
|
|
+ extraCalls++;
|
|
}
|
|
}
|
|
|
|
+ long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
|
|
|
|
+ getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
|
|
|
+ expectedGetResponses, metricMap);
|
|
|
|
+
|
|
// Testing that bytes received is equal to bytes sent.
|
|
// Testing that bytes received is equal to bytes sent.
|
|
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
|
|
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
|
|
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|
|
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|