|
@@ -44,8 +44,10 @@ import java.net.SocketTimeoutException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
|
|
|
import com.google.common.collect.ImmutableList;
|
|
@@ -111,6 +113,9 @@ import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.type.MapType;
|
|
|
+
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyInt;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
@@ -993,6 +998,150 @@ public class TestWebHDFS {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testWebHdfsGetBlockLocations() throws Exception{
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ final Configuration conf = WebHdfsTestUtil.createConf();
|
|
|
+ final int offset = 42;
|
|
|
+ final int length = 512;
|
|
|
+ final Path path = new Path("/foo");
|
|
|
+ byte[] contents = new byte[1024];
|
|
|
+ RANDOM.nextBytes(contents);
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
|
|
+ WebHdfsConstants.WEBHDFS_SCHEME);
|
|
|
+ try (OutputStream os = fs.create(path)) {
|
|
|
+ os.write(contents);
|
|
|
+ }
|
|
|
+ BlockLocation[] locations = fs.getFileBlockLocations(path, offset,
|
|
|
+ length);
|
|
|
+
|
|
|
+ // Query webhdfs REST API to get block locations
|
|
|
+ InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
|
|
|
+
|
|
|
+ // Case 1
|
|
|
+ // URL without length or offset parameters
|
|
|
+ URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
|
|
|
+
|
|
|
+ String response1 = getResponse(url1, "GET");
|
|
|
+ // Parse BlockLocation array from json output using object mapper
|
|
|
+ BlockLocation[] locationArray1 = toBlockLocationArray(response1);
|
|
|
+
|
|
|
+ // Verify the result from rest call is same as file system api
|
|
|
+ verifyEquals(locations, locationArray1);
|
|
|
+
|
|
|
+ // Case 2
|
|
|
+ // URL contains length and offset parameters
|
|
|
+ URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&length=" + length + "&offset=" + offset);
|
|
|
+
|
|
|
+ String response2 = getResponse(url2, "GET");
|
|
|
+ BlockLocation[] locationArray2 = toBlockLocationArray(response2);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray2);
|
|
|
+
|
|
|
+ // Case 3
|
|
|
+ // URL contains length parameter but without offset parameters
|
|
|
+ URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&length=" + length);
|
|
|
+
|
|
|
+ String response3 = getResponse(url3, "GET");
|
|
|
+ BlockLocation[] locationArray3 = toBlockLocationArray(response3);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray3);
|
|
|
+
|
|
|
+ // Case 4
|
|
|
+ // URL contains offset parameter but without length parameter
|
|
|
+ URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&offset=" + offset);
|
|
|
+
|
|
|
+ String response4 = getResponse(url4, "GET");
|
|
|
+ BlockLocation[] locationArray4 = toBlockLocationArray(response4);
|
|
|
+
|
|
|
+ verifyEquals(locations, locationArray4);
|
|
|
+
|
|
|
+ // Case 5
|
|
|
+ // URL specifies offset exceeds the file length
|
|
|
+ URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
|
|
|
+ WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
|
|
|
+ + "&offset=1200");
|
|
|
+
|
|
|
+ String response5 = getResponse(url5, "GET");
|
|
|
+ BlockLocation[] locationArray5 = toBlockLocationArray(response5);
|
|
|
+
|
|
|
+ // Expected an empty array of BlockLocation
|
|
|
+ verifyEquals(new BlockLocation[] {}, locationArray5);
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlockLocation[] toBlockLocationArray(String json)
|
|
|
+ throws IOException {
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ MapType subType = mapper.getTypeFactory().constructMapType(
|
|
|
+ Map.class,
|
|
|
+ String.class,
|
|
|
+ BlockLocation[].class);
|
|
|
+ MapType rootType = mapper.getTypeFactory().constructMapType(
|
|
|
+ Map.class,
|
|
|
+ mapper.constructType(String.class),
|
|
|
+ mapper.constructType(subType));
|
|
|
+
|
|
|
+ Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
|
|
|
+ .readValue(json, rootType);
|
|
|
+ Map<String, BlockLocation[]> locationMap = jsonMap
|
|
|
+ .get("BlockLocations");
|
|
|
+ BlockLocation[] locationArray = locationMap.get(
|
|
|
+ BlockLocation.class.getSimpleName());
|
|
|
+ return locationArray;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyEquals(BlockLocation[] locations1,
|
|
|
+ BlockLocation[] locations2) throws IOException {
|
|
|
+ for(int i=0; i<locations1.length; i++) {
|
|
|
+ BlockLocation location1 = locations1[i];
|
|
|
+ BlockLocation location2 = locations2[i];
|
|
|
+ Assert.assertEquals(location1.getLength(),
|
|
|
+ location2.getLength());
|
|
|
+ Assert.assertEquals(location1.getOffset(),
|
|
|
+ location2.getOffset());
|
|
|
+ Assert.assertArrayEquals(location1.getCachedHosts(),
|
|
|
+ location2.getCachedHosts());
|
|
|
+ Assert.assertArrayEquals(location1.getHosts(),
|
|
|
+ location2.getHosts());
|
|
|
+ Assert.assertArrayEquals(location1.getNames(),
|
|
|
+ location2.getNames());
|
|
|
+ Assert.assertArrayEquals(location1.getTopologyPaths(),
|
|
|
+ location2.getTopologyPaths());
|
|
|
+ Assert.assertArrayEquals(location1.getStorageTypes(),
|
|
|
+ location2.getStorageTypes());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getResponse(URL url, String httpRequestType)
|
|
|
+ throws IOException {
|
|
|
+ HttpURLConnection conn = null;
|
|
|
+ try {
|
|
|
+ conn = (HttpURLConnection) url.openConnection();
|
|
|
+ conn.setRequestMethod(httpRequestType);
|
|
|
+ conn.setInstanceFollowRedirects(false);
|
|
|
+ return IOUtils.toString(conn.getInputStream(),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ } finally {
|
|
|
+ if(conn != null) {
|
|
|
+ conn.disconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
|
|
|
final URI uri, final String userName) throws Exception {
|
|
|
|