|
@@ -29,6 +29,7 @@ import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.io.PrintWriter;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.SocketException;
|
|
@@ -38,8 +39,16 @@ import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
|
|
|
+import javax.servlet.ServletException;
|
|
|
+import javax.servlet.http.HttpServlet;
|
|
|
+import javax.servlet.http.HttpServletRequest;
|
|
|
+import javax.servlet.http.HttpServletResponse;
|
|
|
+import javax.ws.rs.core.MediaType;
|
|
|
+
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -66,7 +75,11 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries;
|
|
|
import org.apache.hadoop.hdfs.TestFileCreation;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
|
@@ -77,6 +90,8 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
|
+import org.apache.hadoop.http.HttpServer2;
|
|
|
+import org.apache.hadoop.http.HttpServerFunctionalTest;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
|
@@ -93,8 +108,12 @@ import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.type.MapType;
|
|
|
+
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyInt;
|
|
|
+import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.spy;
|
|
@@ -857,6 +876,76 @@ public class TestWebHDFS {
|
|
|
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
|
|
|
storageTypes[0] == StorageType.DISK);
|
|
|
}
|
|
|
+
|
|
|
+ // Query webhdfs REST API to get block locations
|
|
|
+ InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
|
|
+
|
|
|
+ // Case 1
|
|
|
+ // URL without length or offset parameters
|
|
|
+ URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
|
|
|
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
|
|
|
+
|
|
|
+ String response1 = getResponse(url1, "GET");
|
|
|
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
|
|
|
+ // Parse BlockLocation array from json output using object mapper
|
|
|
+ BlockLocation[] locationArray1 = toBlockLocationArray(response1);
|
|
|
+
|
|
|
+ // Verify the result from rest call is same as file system api
|
|
|
+ verifyEquals(locations, locationArray1);
|
|
|
+
|
|
|
+ // Case 2
|
|
|
+ // URL contains length and offset parameters
|
|
|
+ URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&length=" + LENGTH + "&offset=" + OFFSET);
|
|
|
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
|
|
|
+
|
|
|
+ String response2 = getResponse(url2, "GET");
|
|
|
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
|
|
|
+ BlockLocation[] locationArray2 = toBlockLocationArray(response2);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray2);
|
|
|
+
|
|
|
+ // Case 3
|
|
|
+ // URL contains length parameter but without offset parameters
|
|
|
+ URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&length=" + LENGTH);
|
|
|
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
|
|
|
+
|
|
|
+ String response3 = getResponse(url3, "GET");
|
|
|
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
|
|
|
+ BlockLocation[] locationArray3 = toBlockLocationArray(response3);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray3);
|
|
|
+
|
|
|
+ // Case 4
|
|
|
+ // URL contains offset parameter but without length parameter
|
|
|
+ URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&offset=" + OFFSET);
|
|
|
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
|
|
|
+
|
|
|
+ String response4 = getResponse(url4, "GET");
|
|
|
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
|
|
|
+ BlockLocation[] locationArray4 = toBlockLocationArray(response4);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray4);
|
|
|
+
|
|
|
+ // Case 5
|
|
|
+ // URL specifies offset exceeds the file length
|
|
|
+ URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&offset=1200");
|
|
|
+ LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
|
|
|
+
|
|
|
+ String response5 = getResponse(url5, "GET");
|
|
|
+ LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
|
|
|
+ BlockLocation[] locationArray5 = toBlockLocationArray(response5);
|
|
|
+
|
|
|
+ // Expected an empty array of BlockLocation
|
|
|
+ verifyEquals(new BlockLocation[] {}, locationArray5);
|
|
|
} finally {
|
|
|
if (cluster != null) {
|
|
|
cluster.shutdown();
|
|
@@ -864,6 +953,66 @@ public class TestWebHDFS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private BlockLocation[] toBlockLocationArray(String json)
|
|
|
+ throws IOException {
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ MapType subType = mapper.getTypeFactory().constructMapType(
|
|
|
+ Map.class,
|
|
|
+ String.class,
|
|
|
+ BlockLocation[].class);
|
|
|
+ MapType rootType = mapper.getTypeFactory().constructMapType(
|
|
|
+ Map.class,
|
|
|
+ mapper.constructType(String.class),
|
|
|
+ mapper.constructType(subType));
|
|
|
+
|
|
|
+ Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
|
|
|
+ .readValue(json, rootType);
|
|
|
+ Map<String, BlockLocation[]> locationMap = jsonMap
|
|
|
+ .get("BlockLocations");
|
|
|
+ BlockLocation[] locationArray = locationMap.get(
|
|
|
+ BlockLocation.class.getSimpleName());
|
|
|
+ return locationArray;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyEquals(BlockLocation[] locations1,
|
|
|
+ BlockLocation[] locations2) throws IOException {
|
|
|
+ for(int i=0; i<locations1.length; i++) {
|
|
|
+ BlockLocation location1 = locations1[i];
|
|
|
+ BlockLocation location2 = locations2[i];
|
|
|
+ Assert.assertEquals(location1.getLength(),
|
|
|
+ location2.getLength());
|
|
|
+ Assert.assertEquals(location1.getOffset(),
|
|
|
+ location2.getOffset());
|
|
|
+ Assert.assertArrayEquals(location1.getCachedHosts(),
|
|
|
+ location2.getCachedHosts());
|
|
|
+ Assert.assertArrayEquals(location1.getHosts(),
|
|
|
+ location2.getHosts());
|
|
|
+ Assert.assertArrayEquals(location1.getNames(),
|
|
|
+ location2.getNames());
|
|
|
+ Assert.assertArrayEquals(location1.getStorageIds(),
|
|
|
+ location2.getStorageIds());
|
|
|
+ Assert.assertArrayEquals(location1.getTopologyPaths(),
|
|
|
+ location2.getTopologyPaths());
|
|
|
+ Assert.assertArrayEquals(location1.getStorageTypes(),
|
|
|
+ location2.getStorageTypes());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getResponse(URL url, String httpRequestType)
|
|
|
+ throws IOException {
|
|
|
+ HttpURLConnection conn = null;
|
|
|
+ try {
|
|
|
+ conn = (HttpURLConnection) url.openConnection();
|
|
|
+ conn.setRequestMethod(httpRequestType);
|
|
|
+ conn.setInstanceFollowRedirects(false);
|
|
|
+ return IOUtils.toString(conn.getInputStream());
|
|
|
+ } finally {
|
|
|
+ if(conn != null) {
|
|
|
+ conn.disconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
|
|
|
final URI uri, final String userName) throws Exception {
|
|
|
|
|
@@ -1220,4 +1369,131 @@ public class TestWebHDFS {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A mock class to handle the {@link WebHdfsFileSystem} client
|
|
|
+ * request. The format of the response depends on how many of
|
|
|
+ * times it gets called (1 to 3 times).
|
|
|
+ * <p>
|
|
|
+ * First time call it return a wrapped json response with a
|
|
|
+ * IllegalArgumentException
|
|
|
+ * <p>
|
|
|
+ * Second time call it return a valid GET_BLOCK_LOCATIONS
|
|
|
+ * json response
|
|
|
+ * <p>
|
|
|
+ * Third time call it return a wrapped json response with
|
|
|
+ * a random IOException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public static class MockWebHdfsServlet extends HttpServlet {
|
|
|
+
|
|
|
+ private static final long serialVersionUID = 1L;
|
|
|
+ private static int respondTimes = 0;
|
|
|
+ private static final String RANDOM_EXCEPTION_MSG =
|
|
|
+ "This is a random exception";
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void doGet(HttpServletRequest request,
|
|
|
+ HttpServletResponse response) throws ServletException, IOException {
|
|
|
+ response.setHeader("Content-Type",
|
|
|
+ MediaType.APPLICATION_JSON);
|
|
|
+ String param = request.getParameter("op");
|
|
|
+ if(respondTimes == 0) {
|
|
|
+ Exception mockException = new IllegalArgumentException(
|
|
|
+ "Invalid value for webhdfs parameter \"op\". "
|
|
|
+ + "" + "No enum constant " + param);
|
|
|
+ sendException(request, response, mockException);
|
|
|
+ } else if (respondTimes == 1) {
|
|
|
+ sendResponse(request, response);
|
|
|
+ } else if (respondTimes == 2) {
|
|
|
+ Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
|
|
|
+ sendException(request, response, mockException);
|
|
|
+ }
|
|
|
+ respondTimes++;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendResponse(HttpServletRequest request,
|
|
|
+ HttpServletResponse response) throws IOException {
|
|
|
+ response.setStatus(HttpServletResponse.SC_OK);
|
|
|
+ // Construct a LocatedBlock for testing
|
|
|
+ DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
|
|
|
+ DatanodeInfo[] ds = new DatanodeInfo[1];
|
|
|
+ ds[0] = d;
|
|
|
+ ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
|
|
|
+ LocatedBlock l1 = new LocatedBlock(b1, ds);
|
|
|
+ l1.setStartOffset(0);
|
|
|
+ l1.setCorrupt(false);
|
|
|
+ List<LocatedBlock> ls = Arrays.asList(l1);
|
|
|
+ LocatedBlocks locatedblocks =
|
|
|
+ new LocatedBlocks(10, false, ls, l1,
|
|
|
+ true, null, null);
|
|
|
+
|
|
|
+ try (PrintWriter pw = response.getWriter()) {
|
|
|
+ pw.write(JsonUtil.toJsonString(locatedblocks));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendException(HttpServletRequest request,
|
|
|
+ HttpServletResponse response,
|
|
|
+ Exception mockException) throws IOException {
|
|
|
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
|
+ String errJs = JsonUtil.toJsonString(mockException);
|
|
|
+ try (PrintWriter pw = response.getWriter()) {
|
|
|
+ pw.write(errJs);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFileBlockLocationsBackwardsCompatibility()
|
|
|
+ throws Exception {
|
|
|
+ final Configuration conf = WebHdfsTestUtil.createConf();
|
|
|
+ final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
|
|
|
+ HttpServer2 http = null;
|
|
|
+ try {
|
|
|
+ http = HttpServerFunctionalTest.createTestServer(conf);
|
|
|
+ http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
|
|
|
+ http.start();
|
|
|
+
|
|
|
+ // Write the address back to configuration so
|
|
|
+ // WebHdfsFileSystem could connect to the mock server
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
|
|
+ "localhost:" + http.getConnectorAddress(0).getPort());
|
|
|
+
|
|
|
+ final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
|
|
|
+ conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
|
|
+
|
|
|
+ WebHdfsFileSystem spyFs = spy(webFS);
|
|
|
+ BlockLocation[] locations = spyFs
|
|
|
+ .getFileBlockLocations(new Path("p"), 0, 100);
|
|
|
+
|
|
|
+ // Verify result
|
|
|
+ assertEquals(1, locations.length);
|
|
|
+ assertEquals(121, locations[0].getLength());
|
|
|
+
|
|
|
+ // Verify the fall back
|
|
|
+ // The function should be called exactly 2 times
|
|
|
+ // 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
|
|
|
+ // 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
|
|
|
+ verify(spyFs, times(2)).getFileBlockLocations(any(),
|
|
|
+ any(), anyLong(), anyLong());
|
|
|
+
|
|
|
+ // Verify it doesn't erroneously fall back
|
|
|
+ // When server returns a different error, it should directly
|
|
|
+ // throw an exception.
|
|
|
+ try {
|
|
|
+ spyFs.getFileBlockLocations(new Path("p"), 0, 100);
|
|
|
+ } catch (Exception e) {
|
|
|
+ assertTrue(e instanceof IOException);
|
|
|
+ assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
|
|
|
+ // Totally this function has been called 3 times
|
|
|
+ verify(spyFs, times(3)).getFileBlockLocations(any(),
|
|
|
+ any(), anyLong(), anyLong());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if(http != null) {
|
|
|
+ http.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|