|
@@ -29,7 +29,6 @@ import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
-import java.io.PrintWriter;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.SocketException;
|
|
@@ -39,16 +38,8 @@ import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
|
|
|
-import javax.servlet.ServletException;
|
|
|
-import javax.servlet.http.HttpServlet;
|
|
|
-import javax.servlet.http.HttpServletRequest;
|
|
|
-import javax.servlet.http.HttpServletResponse;
|
|
|
-import javax.ws.rs.core.MediaType;
|
|
|
-
|
|
|
import com.google.common.collect.ImmutableList;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -80,12 +71,8 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries;
|
|
|
import org.apache.hadoop.hdfs.TestFileCreation;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
|
|
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
|
@@ -96,8 +83,6 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
|
|
-import org.apache.hadoop.http.HttpServer2;
|
|
|
-import org.apache.hadoop.http.HttpServerFunctionalTest;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
|
@@ -114,12 +99,8 @@ import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-import com.fasterxml.jackson.databind.type.MapType;
|
|
|
-
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyInt;
|
|
|
-import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.spy;
|
|
@@ -974,76 +955,6 @@ public class TestWebHDFS {
|
|
|
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
|
|
|
storageTypes[0] == StorageType.DISK);
|
|
|
}
|
|
|
-
|
|
|
- // Query webhdfs REST API to get block locations
|
|
|
- InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
|
|
-
|
|
|
- // Case 1
|
|
|
- // URL without length or offset parameters
|
|
|
- URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
- WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
|
|
|
- LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
|
|
|
-
|
|
|
- String response1 = getResponse(url1, "GET");
|
|
|
- LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
|
|
|
- // Parse BlockLocation array from json output using object mapper
|
|
|
- BlockLocation[] locationArray1 = toBlockLocationArray(response1);
|
|
|
-
|
|
|
- // Verify the result from rest call is same as file system api
|
|
|
- verifyEquals(locations, locationArray1);
|
|
|
-
|
|
|
- // Case 2
|
|
|
- // URL contains length and offset parameters
|
|
|
- URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
- WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
- + "&length=" + LENGTH + "&offset=" + OFFSET);
|
|
|
- LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
|
|
|
-
|
|
|
- String response2 = getResponse(url2, "GET");
|
|
|
- LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
|
|
|
- BlockLocation[] locationArray2 = toBlockLocationArray(response2);
|
|
|
-
|
|
|
- verifyEquals(locations, locationArray2);
|
|
|
-
|
|
|
- // Case 3
|
|
|
- // URL contains length parameter but without offset parameters
|
|
|
- URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
- WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
- + "&length=" + LENGTH);
|
|
|
- LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
|
|
|
-
|
|
|
- String response3 = getResponse(url3, "GET");
|
|
|
- LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
|
|
|
- BlockLocation[] locationArray3 = toBlockLocationArray(response3);
|
|
|
-
|
|
|
- verifyEquals(locations, locationArray3);
|
|
|
-
|
|
|
- // Case 4
|
|
|
- // URL contains offset parameter but without length parameter
|
|
|
- URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
- WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
- + "&offset=" + OFFSET);
|
|
|
- LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
|
|
|
-
|
|
|
- String response4 = getResponse(url4, "GET");
|
|
|
- LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
|
|
|
- BlockLocation[] locationArray4 = toBlockLocationArray(response4);
|
|
|
-
|
|
|
- verifyEquals(locations, locationArray4);
|
|
|
-
|
|
|
- // Case 5
|
|
|
- // URL specifies offset exceeds the file length
|
|
|
- URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
- WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
- + "&offset=1200");
|
|
|
- LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
|
|
|
-
|
|
|
- String response5 = getResponse(url5, "GET");
|
|
|
- LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
|
|
|
- BlockLocation[] locationArray5 = toBlockLocationArray(response5);
|
|
|
-
|
|
|
- // Expected an empty array of BlockLocation
|
|
|
- verifyEquals(new BlockLocation[] {}, locationArray5);
|
|
|
} finally {
|
|
|
if (cluster != null) {
|
|
|
cluster.shutdown();
|
|
@@ -1051,66 +962,6 @@ public class TestWebHDFS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private BlockLocation[] toBlockLocationArray(String json)
|
|
|
- throws IOException {
|
|
|
- ObjectMapper mapper = new ObjectMapper();
|
|
|
- MapType subType = mapper.getTypeFactory().constructMapType(
|
|
|
- Map.class,
|
|
|
- String.class,
|
|
|
- BlockLocation[].class);
|
|
|
- MapType rootType = mapper.getTypeFactory().constructMapType(
|
|
|
- Map.class,
|
|
|
- mapper.constructType(String.class),
|
|
|
- mapper.constructType(subType));
|
|
|
-
|
|
|
- Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
|
|
|
- .readValue(json, rootType);
|
|
|
- Map<String, BlockLocation[]> locationMap = jsonMap
|
|
|
- .get("BlockLocations");
|
|
|
- BlockLocation[] locationArray = locationMap.get(
|
|
|
- BlockLocation.class.getSimpleName());
|
|
|
- return locationArray;
|
|
|
- }
|
|
|
-
|
|
|
- private void verifyEquals(BlockLocation[] locations1,
|
|
|
- BlockLocation[] locations2) throws IOException {
|
|
|
- for(int i=0; i<locations1.length; i++) {
|
|
|
- BlockLocation location1 = locations1[i];
|
|
|
- BlockLocation location2 = locations2[i];
|
|
|
- Assert.assertEquals(location1.getLength(),
|
|
|
- location2.getLength());
|
|
|
- Assert.assertEquals(location1.getOffset(),
|
|
|
- location2.getOffset());
|
|
|
- Assert.assertArrayEquals(location1.getCachedHosts(),
|
|
|
- location2.getCachedHosts());
|
|
|
- Assert.assertArrayEquals(location1.getHosts(),
|
|
|
- location2.getHosts());
|
|
|
- Assert.assertArrayEquals(location1.getNames(),
|
|
|
- location2.getNames());
|
|
|
- Assert.assertArrayEquals(location1.getStorageIds(),
|
|
|
- location2.getStorageIds());
|
|
|
- Assert.assertArrayEquals(location1.getTopologyPaths(),
|
|
|
- location2.getTopologyPaths());
|
|
|
- Assert.assertArrayEquals(location1.getStorageTypes(),
|
|
|
- location2.getStorageTypes());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static String getResponse(URL url, String httpRequestType)
|
|
|
- throws IOException {
|
|
|
- HttpURLConnection conn = null;
|
|
|
- try {
|
|
|
- conn = (HttpURLConnection) url.openConnection();
|
|
|
- conn.setRequestMethod(httpRequestType);
|
|
|
- conn.setInstanceFollowRedirects(false);
|
|
|
- return IOUtils.toString(conn.getInputStream());
|
|
|
- } finally {
|
|
|
- if(conn != null) {
|
|
|
- conn.disconnect();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
|
|
|
final URI uri, final String userName) throws Exception {
|
|
|
|
|
@@ -1467,131 +1318,4 @@ public class TestWebHDFS {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * A mock class to handle the {@link WebHdfsFileSystem} client
|
|
|
- * request. The format of the response depends on how many of
|
|
|
- * times it gets called (1 to 3 times).
|
|
|
- * <p>
|
|
|
- * First time call it return a wrapped json response with a
|
|
|
- * IllegalArgumentException
|
|
|
- * <p>
|
|
|
- * Second time call it return a valid GET_BLOCK_LOCATIONS
|
|
|
- * json response
|
|
|
- * <p>
|
|
|
- * Third time call it return a wrapped json response with
|
|
|
- * a random IOException
|
|
|
- *
|
|
|
- */
|
|
|
- public static class MockWebHdfsServlet extends HttpServlet {
|
|
|
-
|
|
|
- private static final long serialVersionUID = 1L;
|
|
|
- private static int respondTimes = 0;
|
|
|
- private static final String RANDOM_EXCEPTION_MSG =
|
|
|
- "This is a random exception";
|
|
|
-
|
|
|
- @Override
|
|
|
- public void doGet(HttpServletRequest request,
|
|
|
- HttpServletResponse response) throws ServletException, IOException {
|
|
|
- response.setHeader("Content-Type",
|
|
|
- MediaType.APPLICATION_JSON);
|
|
|
- String param = request.getParameter("op");
|
|
|
- if(respondTimes == 0) {
|
|
|
- Exception mockException = new IllegalArgumentException(
|
|
|
- "Invalid value for webhdfs parameter \"op\". "
|
|
|
- + "" + "No enum constant " + param);
|
|
|
- sendException(request, response, mockException);
|
|
|
- } else if (respondTimes == 1) {
|
|
|
- sendResponse(request, response);
|
|
|
- } else if (respondTimes == 2) {
|
|
|
- Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
|
|
|
- sendException(request, response, mockException);
|
|
|
- }
|
|
|
- respondTimes++;
|
|
|
- }
|
|
|
-
|
|
|
- private void sendResponse(HttpServletRequest request,
|
|
|
- HttpServletResponse response) throws IOException {
|
|
|
- response.setStatus(HttpServletResponse.SC_OK);
|
|
|
- // Construct a LocatedBlock for testing
|
|
|
- DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
|
|
|
- DatanodeInfo[] ds = new DatanodeInfo[1];
|
|
|
- ds[0] = d;
|
|
|
- ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
|
|
|
- LocatedBlock l1 = new LocatedBlock(b1, ds);
|
|
|
- l1.setStartOffset(0);
|
|
|
- l1.setCorrupt(false);
|
|
|
- List<LocatedBlock> ls = Arrays.asList(l1);
|
|
|
- LocatedBlocks locatedblocks =
|
|
|
- new LocatedBlocks(10, false, ls, l1,
|
|
|
- true, null, null);
|
|
|
-
|
|
|
- try (PrintWriter pw = response.getWriter()) {
|
|
|
- pw.write(JsonUtil.toJsonString(locatedblocks));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void sendException(HttpServletRequest request,
|
|
|
- HttpServletResponse response,
|
|
|
- Exception mockException) throws IOException {
|
|
|
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
|
- String errJs = JsonUtil.toJsonString(mockException);
|
|
|
- try (PrintWriter pw = response.getWriter()) {
|
|
|
- pw.write(errJs);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testGetFileBlockLocationsBackwardsCompatibility()
|
|
|
- throws Exception {
|
|
|
- final Configuration conf = WebHdfsTestUtil.createConf();
|
|
|
- final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
|
|
|
- HttpServer2 http = null;
|
|
|
- try {
|
|
|
- http = HttpServerFunctionalTest.createTestServer(conf);
|
|
|
- http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
|
|
|
- http.start();
|
|
|
-
|
|
|
- // Write the address back to configuration so
|
|
|
- // WebHdfsFileSystem could connect to the mock server
|
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
|
|
- "localhost:" + http.getConnectorAddress(0).getPort());
|
|
|
-
|
|
|
- final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
|
|
|
- conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
|
|
-
|
|
|
- WebHdfsFileSystem spyFs = spy(webFS);
|
|
|
- BlockLocation[] locations = spyFs
|
|
|
- .getFileBlockLocations(new Path("p"), 0, 100);
|
|
|
-
|
|
|
- // Verify result
|
|
|
- assertEquals(1, locations.length);
|
|
|
- assertEquals(121, locations[0].getLength());
|
|
|
-
|
|
|
- // Verify the fall back
|
|
|
- // The function should be called exactly 2 times
|
|
|
- // 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
|
|
|
- // 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
|
|
|
- verify(spyFs, times(2)).getFileBlockLocations(any(),
|
|
|
- any(), anyLong(), anyLong());
|
|
|
-
|
|
|
- // Verify it doesn't erroneously fall back
|
|
|
- // When server returns a different error, it should directly
|
|
|
- // throw an exception.
|
|
|
- try {
|
|
|
- spyFs.getFileBlockLocations(new Path("p"), 0, 100);
|
|
|
- } catch (Exception e) {
|
|
|
- assertTrue(e instanceof IOException);
|
|
|
- assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
|
|
|
- // Totally this function has been called 3 times
|
|
|
- verify(spyFs, times(3)).getFileBlockLocations(any(),
|
|
|
- any(), anyLong(), anyLong());
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if(http != null) {
|
|
|
- http.stop();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|