瀏覽代碼

HDFS-14063. Support noredirect param for CREATE/APPEND/OPEN/GETFILECHECKSUM in HttpFS. Contributed by Íñigo Goiri.

Weiwei Yang 6 年之前
父節點
當前提交
ad5256e44d

+ 23 - 4
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java

@@ -52,23 +52,25 @@ public class HttpFSParametersProvider extends ParametersProvider {
 
   static {
     PARAMS_DEF.put(Operation.OPEN,
-        new Class[]{OffsetParam.class, LenParam.class});
+        new Class[]{OffsetParam.class, LenParam.class, NoRedirectParam.class});
     PARAMS_DEF.put(Operation.GETFILESTATUS, new Class[]{});
     PARAMS_DEF.put(Operation.LISTSTATUS, new Class[]{FilterParam.class});
     PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{});
     PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{});
-    PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{});
+    PARAMS_DEF.put(Operation.GETFILECHECKSUM,
+        new Class[]{NoRedirectParam.class});
     PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{});
     PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{});
     PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{});
     PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});
-    PARAMS_DEF.put(Operation.APPEND, new Class[]{DataParam.class});
+    PARAMS_DEF.put(Operation.APPEND,
+        new Class[]{DataParam.class, NoRedirectParam.class});
     PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class});
     PARAMS_DEF.put(Operation.TRUNCATE, new Class[]{NewLengthParam.class});
     PARAMS_DEF.put(Operation.CREATE,
         new Class[]{PermissionParam.class, OverwriteParam.class,
             ReplicationParam.class, BlockSizeParam.class, DataParam.class,
-            UnmaskedPermissionParam.class});
+            UnmaskedPermissionParam.class, NoRedirectParam.class});
     PARAMS_DEF.put(Operation.MKDIRS, new Class[]{PermissionParam.class,
         UnmaskedPermissionParam.class});
     PARAMS_DEF.put(Operation.RENAME, new Class[]{DestinationParam.class});
@@ -177,6 +179,23 @@ public class HttpFSParametersProvider extends ParametersProvider {
     }
   }
 
+  /**
+   * Class for noredirect parameter.
+   */
+  @InterfaceAudience.Private
+  public static class NoRedirectParam extends BooleanParam {
+    /**
+     * Parameter name.
+     */
+    public static final String NAME = "noredirect";
+    /**
+     * Constructor.
+     */
+    public NoRedirectParam() {
+      super(NAME, false);
+    }
+  }
+
   /**
    * Class for operation parameter.
    */

+ 88 - 37
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.NewLengthParam;
+import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.NoRedirectParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OldSnapshotNameParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
@@ -52,6 +53,7 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrEncodingPa
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam;
 import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam;
+import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.http.JettyUtils;
 import org.apache.hadoop.lib.service.FileSystemAccess;
 import org.apache.hadoop.lib.service.FileSystemAccessException;
@@ -161,6 +163,7 @@ public class HttpFSServer {
   /**
    * Special binding for '/' as it is not handled by the wildcard binding.
    *
+   * @param uriInfo uri info of the request.
    * @param op the HttpFS operation of the request.
    * @param params the HttpFS parameters of the request.
    *
@@ -174,11 +177,12 @@ public class HttpFSServer {
    */
   @GET
   @Produces(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8)
-  public Response getRoot(@QueryParam(OperationParam.NAME) OperationParam op,
+  public Response getRoot(@Context UriInfo uriInfo,
+                          @QueryParam(OperationParam.NAME) OperationParam op,
                           @Context Parameters params,
                           @Context HttpServletRequest request)
     throws IOException, FileSystemAccessException {
-    return get("", op, params, request);
+    return get("", uriInfo, op, params, request);
   }
 
   private String makeAbsolute(String path) {
@@ -189,6 +193,7 @@ public class HttpFSServer {
    * Binding to handle GET requests, supported operations are
    *
    * @param path the path for operation.
+   * @param uriInfo uri info of the request.
    * @param op the HttpFS operation of the request.
    * @param params the HttpFS parameters of the request.
    *
@@ -205,6 +210,7 @@ public class HttpFSServer {
   @Produces({MediaType.APPLICATION_OCTET_STREAM + "; " + JettyUtils.UTF_8,
       MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8})
   public Response get(@PathParam("path") String path,
+                      @Context UriInfo uriInfo,
                       @QueryParam(OperationParam.NAME) OperationParam op,
                       @Context Parameters params,
                       @Context HttpServletRequest request)
@@ -216,32 +222,40 @@ public class HttpFSServer {
     MDC.put("hostname", request.getRemoteAddr());
     switch (op.value()) {
     case OPEN: {
-      //Invoking the command directly using an unmanaged FileSystem that is
-      // released by the FileSystemReleaseFilter
-      final FSOperations.FSOpen command = new FSOperations.FSOpen(path);
-      final FileSystem fs = createFileSystem(user);
-      InputStream is = null;
-      UserGroupInformation ugi = UserGroupInformation
-          .createProxyUser(user.getShortUserName(),
-              UserGroupInformation.getLoginUser());
-      try {
-        is = ugi.doAs(new PrivilegedExceptionAction<InputStream>() {
-          @Override
-          public InputStream run() throws Exception {
-            return command.execute(fs);
-          }
-        });
-      } catch (InterruptedException ie) {
-        LOG.info("Open interrupted.", ie);
-        Thread.currentThread().interrupt();
-      }
-      Long offset = params.get(OffsetParam.NAME, OffsetParam.class);
-      Long len = params.get(LenParam.NAME, LenParam.class);
-      AUDIT_LOG.info("[{}] offset [{}] len [{}]",
-          new Object[] { path, offset, len });
-      InputStreamEntity entity = new InputStreamEntity(is, offset, len);
-      response =
-          Response.ok(entity).type(MediaType.APPLICATION_OCTET_STREAM).build();
+      Boolean noRedirect = params.get(
+          NoRedirectParam.NAME, NoRedirectParam.class);
+      if (noRedirect) {
+        URI redirectURL = createOpenRedirectionURL(uriInfo);
+        final String js = JsonUtil.toJsonString("Location", redirectURL);
+        response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+      } else {
+        //Invoking the command directly using an unmanaged FileSystem that is
+        // released by the FileSystemReleaseFilter
+        final FSOperations.FSOpen command = new FSOperations.FSOpen(path);
+        final FileSystem fs = createFileSystem(user);
+        InputStream is = null;
+        UserGroupInformation ugi = UserGroupInformation
+            .createProxyUser(user.getShortUserName(),
+                UserGroupInformation.getLoginUser());
+        try {
+          is = ugi.doAs(new PrivilegedExceptionAction<InputStream>() {
+            @Override
+            public InputStream run() throws Exception {
+              return command.execute(fs);
+            }
+          });
+        } catch (InterruptedException ie) {
+          LOG.info("Open interrupted.", ie);
+          Thread.currentThread().interrupt();
+        }
+        Long offset = params.get(OffsetParam.NAME, OffsetParam.class);
+        Long len = params.get(LenParam.NAME, LenParam.class);
+        AUDIT_LOG.info("[{}] offset [{}] len [{}]",
+            new Object[] { path, offset, len });
+        InputStreamEntity entity = new InputStreamEntity(is, offset, len);
+        response = Response.ok(entity).type(MediaType.APPLICATION_OCTET_STREAM)
+            .build();
+      }
       break;
     }
     case GETFILESTATUS: {
@@ -293,9 +307,18 @@ public class HttpFSServer {
     case GETFILECHECKSUM: {
       FSOperations.FSFileChecksum command =
           new FSOperations.FSFileChecksum(path);
-      Map json = fsExecute(user, command);
+
+      Boolean noRedirect = params.get(
+          NoRedirectParam.NAME, NoRedirectParam.class);
       AUDIT_LOG.info("[{}]", path);
-      response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
+      if (noRedirect) {
+        URI redirectURL = createOpenRedirectionURL(uriInfo);
+        final String js = JsonUtil.toJsonString("Location", redirectURL);
+        response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+      } else {
+        Map json = fsExecute(user, command);
+        response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
+      }
       break;
     }
     case GETFILEBLOCKLOCATIONS: {
@@ -395,6 +418,17 @@ public class HttpFSServer {
     return response;
   }
 
+  /**
+   * Create an open redirection URL from a request. It points to the same
+   * HttpFS endpoint but removes the "redirect" parameter.
+   * @param uriInfo uri info of the request.
+   * @return URL for the redirected location.
+   */
+  private URI createOpenRedirectionURL(UriInfo uriInfo) {
+    UriBuilder uriBuilder = uriInfo.getRequestUriBuilder();
+    uriBuilder.replaceQueryParam(NoRedirectParam.NAME, (Object[])null);
+    return uriBuilder.build((Object[])null);
+  }
 
   /**
    * Binding to handle DELETE requests.
@@ -491,9 +525,16 @@ public class HttpFSServer {
       case APPEND: {
         Boolean hasData = params.get(DataParam.NAME, DataParam.class);
         if (!hasData) {
-          response = Response.temporaryRedirect(
-            createUploadRedirectionURL(uriInfo,
-              HttpFSFileSystem.Operation.APPEND)).build();
+          URI redirectURL = createUploadRedirectionURL(
+              uriInfo, HttpFSFileSystem.Operation.APPEND);
+          Boolean noRedirect = params.get(
+              NoRedirectParam.NAME, NoRedirectParam.class);
+          if (noRedirect) {
+            final String js = JsonUtil.toJsonString("Location", redirectURL);
+            response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+          } else {
+            response = Response.temporaryRedirect(redirectURL).build();
+          }
         } else {
           FSOperations.FSAppend command =
             new FSOperations.FSAppend(is, path);
@@ -594,9 +635,16 @@ public class HttpFSServer {
       case CREATE: {
         Boolean hasData = params.get(DataParam.NAME, DataParam.class);
         if (!hasData) {
-          response = Response.temporaryRedirect(
-            createUploadRedirectionURL(uriInfo,
-              HttpFSFileSystem.Operation.CREATE)).build();
+          URI redirectURL = createUploadRedirectionURL(
+              uriInfo, HttpFSFileSystem.Operation.CREATE);
+          Boolean noRedirect = params.get(
+              NoRedirectParam.NAME, NoRedirectParam.class);
+          if (noRedirect) {
+            final String js = JsonUtil.toJsonString("Location", redirectURL);
+            response = Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+          } else {
+            response = Response.temporaryRedirect(redirectURL).build();
+          }
         } else {
           Short permission = params.get(PermissionParam.NAME,
                                          PermissionParam.class);
@@ -617,7 +665,10 @@ public class HttpFSServer {
               "replication [{}] blockSize [{}] unmaskedpermission [{}]",
               new Object[]{path, permission,  override, replication, blockSize,
                   unmaskedPermission});
-          response = Response.status(Response.Status.CREATED).build();
+          final String js = JsonUtil.toJsonString(
+              "Location", uriInfo.getAbsolutePath());
+          response = Response.created(uriInfo.getAbsolutePath())
+              .type(MediaType.APPLICATION_JSON).entity(js).build();
         }
         break;
       }

+ 122 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java

@@ -43,6 +43,7 @@ import java.io.OutputStream;
 import java.io.Writer;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -50,10 +51,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam;
+import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.NoRedirectParam;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -84,6 +88,10 @@ import org.eclipse.jetty.webapp.WebAppContext;
 import com.google.common.collect.Maps;
 import java.util.Properties;
 import java.util.regex.Pattern;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+
 import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 
 /**
@@ -1514,4 +1522,118 @@ public class TestHttpFSServer extends HFSTestCase {
     dfs.delete(path1, true);
     verifyGetSnapshottableDirectoryList(dfs);
   }
+
+  @Test
+  @TestDir
+  @TestJetty
+  @TestHdfs
+  public void testNoRedirect() throws Exception {
+    createHttpFSServer(false, false);
+
+    final String testContent = "Test content";
+    final String path = "/testfile.txt";
+    final String username = HadoopUsersConfTestHelper.getHadoopUsers()[0];
+
+
+    // Trigger the creation of the file which shouldn't redirect
+    URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
+        "/webhdfs/v1{0}?user.name={1}&op=CREATE&noredirect=true",
+        path, username));
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.PUT);
+    conn.connect();
+    // Verify that it returned the final write location
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    JSONObject json = (JSONObject)new JSONParser().parse(
+        new InputStreamReader(conn.getInputStream()));
+    String location = (String)json.get("Location");
+    Assert.assertTrue(location.contains(DataParam.NAME));
+    Assert.assertTrue(location.contains(NoRedirectParam.NAME));
+    Assert.assertTrue(location.contains("CREATE"));
+    Assert.assertTrue("Wrong location: " + location,
+        location.startsWith(TestJettyHelper.getJettyURL().toString()));
+
+    // Use the location to actually write the file
+    url = new URL(location);
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.PUT);
+    conn.setRequestProperty(
+        "Content-Type", MediaType.APPLICATION_OCTET_STREAM);
+    conn.setDoOutput(true);
+    conn.connect();
+    OutputStream os = conn.getOutputStream();
+    os.write(testContent.getBytes());
+    os.close();
+    // Verify that it created the file and returned the location
+    Assert.assertEquals(
+        HttpURLConnection.HTTP_CREATED, conn.getResponseCode());
+    json = (JSONObject)new JSONParser().parse(
+        new InputStreamReader(conn.getInputStream()));
+    location = (String)json.get("Location");
+    Assert.assertEquals(
+        TestJettyHelper.getJettyURL() + "/webhdfs/v1" + path, location);
+
+
+    // Read the file which shouldn't redirect
+    url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
+        "/webhdfs/v1{0}?user.name={1}&op=OPEN&noredirect=true",
+        path, username));
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.GET);
+    conn.connect();
+    // Verify that we got the final location to read from
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    json = (JSONObject)new JSONParser().parse(
+        new InputStreamReader(conn.getInputStream()));
+    location = (String)json.get("Location");
+    Assert.assertTrue(!location.contains(NoRedirectParam.NAME));
+    Assert.assertTrue(location.contains("OPEN"));
+    Assert.assertTrue("Wrong location: " + location,
+        location.startsWith(TestJettyHelper.getJettyURL().toString()));
+
+    // Use the location to actually read
+    url = new URL(location);
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.GET);
+    conn.connect();
+    // Verify that we read what we wrote
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    String content = IOUtils.toString(
+        conn.getInputStream(), Charset.defaultCharset());
+    Assert.assertEquals(testContent, content);
+
+
+    // Get the checksum of the file which shouldn't redirect
+    url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
+        "/webhdfs/v1{0}?user.name={1}&op=GETFILECHECKSUM&noredirect=true",
+        path, username));
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.GET);
+    conn.connect();
+    // Verify that we got the final location to write to
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    json = (JSONObject)new JSONParser().parse(
+        new InputStreamReader(conn.getInputStream()));
+    location = (String)json.get("Location");
+    Assert.assertTrue(!location.contains(NoRedirectParam.NAME));
+    Assert.assertTrue(location.contains("GETFILECHECKSUM"));
+    Assert.assertTrue("Wrong location: " + location,
+        location.startsWith(TestJettyHelper.getJettyURL().toString()));
+
+    // Use the location to actually get the checksum
+    url = new URL(location);
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod(HttpMethod.GET);
+    conn.connect();
+    // Verify that we read what we wrote
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    json = (JSONObject)new JSONParser().parse(
+        new InputStreamReader(conn.getInputStream()));
+    JSONObject checksum = (JSONObject)json.get("FileChecksum");
+    Assert.assertEquals(
+        "0000020000000000000000001b9c0a445fed3c0bf1e1aa7438d96b1500000000",
+        checksum.get("bytes"));
+    Assert.assertEquals(28L, checksum.get("length"));
+    Assert.assertEquals("MD5-of-0MD5-of-512CRC32C", checksum.get("algorithm"));
+  }
 }