Переглянути джерело

HADOOP-14546. Azure: Concurrent I/O does not work when secure.mode is enabled. Contributed by Thomas

Mingliang Liu 8 роки тому
батько
коміт
7e031c2c18

+ 1 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -852,7 +852,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     rootDirectory = container.getDirectoryReference("");
 
     canCreateOrModifyContainer = true;
-    tolerateOobAppends = false;
   }
 
   /**
@@ -1911,8 +1910,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     // If reads concurrent to OOB writes are allowed, the interception will reset
     // the conditional header on all Azure blob storage read requests.
     if (bindConcurrentOOBIo) {
-      SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
-          operationContext, true);
+      SendRequestIntercept.bind(operationContext);
     }
 
     if (testHookOperationContext != null) {

+ 9 - 82
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java

@@ -35,7 +35,7 @@ import com.microsoft.azure.storage.StorageException;
 
 /**
  * Manages the lifetime of binding on the operation contexts to intercept send
- * request events to Azure storage.
+ * request events to Azure storage and allow concurrent OOB I/Os.
  */
 @InterfaceAudience.Private
 public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent> {
@@ -43,70 +43,22 @@ public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent
   public static final Log LOG = LogFactory.getLog(SendRequestIntercept.class);
 
   private static final String ALLOW_ALL_REQUEST_PRECONDITIONS = "*";
-  private final StorageCredentials storageCreds;
-  private final boolean allowConcurrentOOBIo;
-  private final OperationContext opContext;
 
   /**
-   * Getter returning the storage account credentials.
-   * 
-   * @return storageCreds - account storage credentials.
-   */
-  private StorageCredentials getCredentials() {
-    return storageCreds;
-  }
-
-  /**
-   * Query if out-of-band I/Os are allowed.
-   * 
-   * return allowConcurrentOOBIo - true if OOB I/O is allowed, and false
-   * otherwise.
+   * Hidden default constructor for SendRequestIntercept.
    */
-  private boolean isOutOfBandIoAllowed() {
-    return allowConcurrentOOBIo;
-  }
-
-  /**
-   * Getter returning the operation context.
-   * 
-   * @return storageCreds - account storage credentials.
-   */
-  private OperationContext getOperationContext() {
-    return opContext;
-  }
-
-  /**
-   * Constructor for SendRequestThrottle.
-   * 
-   * @param storageCreds
-   *          - storage account credentials for signing packets.
-   * 
-   */
-  private SendRequestIntercept(StorageCredentials storageCreds,
-      boolean allowConcurrentOOBIo, OperationContext opContext) {
-    // Capture the send delay callback interface.
-    this.storageCreds = storageCreds;
-    this.allowConcurrentOOBIo = allowConcurrentOOBIo;
-    this.opContext = opContext;
+  private SendRequestIntercept() {
   }
 
   /**
    * Binds a new lister to the operation context so the WASB file system can
-   * appropriately intercept sends. By allowing concurrent OOB I/Os, we bypass
-   * the blob immutability check when reading streams.
+   * appropriately intercept sends and allow concurrent OOB I/Os.  This
+   * by-passes the blob immutability check when reading streams.
    *
-   * @param storageCreds The credential of blob storage.
-   * @param opContext
-   *          The operation context to bind to listener.
-   * 
-   * @param allowConcurrentOOBIo
-   *          True if reads are allowed with concurrent OOB writes.
+   * @param opContext the operation context assocated with this request.
    */
-  public static void bind(StorageCredentials storageCreds,
-      OperationContext opContext, boolean allowConcurrentOOBIo) {
-    SendRequestIntercept sendListener = new SendRequestIntercept(storageCreds,
-        allowConcurrentOOBIo, opContext);
-    opContext.getSendingRequestEventHandler().addListener(sendListener);
+  public static void bind(OperationContext opContext) {
+    opContext.getSendingRequestEventHandler().addListener(new SendRequestIntercept());
   }
 
   /**
@@ -134,36 +86,11 @@ public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent
     // Determine whether this is a download request by checking that the request
     // method
     // is a "GET" operation.
-    if (urlConnection.getRequestMethod().equalsIgnoreCase("GET")
-        && isOutOfBandIoAllowed()) {
+    if (urlConnection.getRequestMethod().equalsIgnoreCase("GET")) {
       // If concurrent reads on OOB writes are allowed, reset the if-match
       // condition on the conditional header.
       urlConnection.setRequestProperty(HeaderConstants.IF_MATCH,
           ALLOW_ALL_REQUEST_PRECONDITIONS);
-
-      // In the Java AzureSDK the packet is signed before firing the
-      // SendRequest. Setting
-      // the conditional packet header property changes the contents of the
-      // packet, therefore the packet has to be re-signed.
-      try {
-        // Sign the request. GET's have no payload so the content length is
-        // zero.
-        StorageCredentialsHelper.signBlobQueueAndFileRequest(getCredentials(),
-          urlConnection, -1L, getOperationContext());
-      } catch (InvalidKeyException e) {
-        // Log invalid key exception to track signing error before the send
-        // fails.
-        String errString = String.format(
-            "Received invalid key exception when attempting sign packet."
-                + " Cause: %s", e.getCause().toString());
-        LOG.error(errString);
-      } catch (StorageException e) {
-        // Log storage exception to track signing error before the call fails.
-        String errString = String.format(
-            "Received storage exception when attempting to sign packet."
-                + " Cause: %s", e.getCause().toString());
-        LOG.error(errString);
-      }
     }
   }
 }

+ 7 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java

@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
 import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_LOCAL_SAS_KEY_MODE;
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
 
 /**
  * Helper class to create WASB file systems backed by either a mock in-memory
@@ -335,6 +336,11 @@ public final class AzureBlobStorageTestAccount {
 
   public static AzureBlobStorageTestAccount createOutOfBandStore(
       int uploadBlockSize, int downloadBlockSize) throws Exception {
+    return createOutOfBandStore(uploadBlockSize, downloadBlockSize, false);
+  }
+
+   public static AzureBlobStorageTestAccount createOutOfBandStore(
+      int uploadBlockSize, int downloadBlockSize, boolean enableSecureMode) throws Exception {
 
     saveMetricsConfigFile();
 
@@ -359,6 +365,7 @@ public final class AzureBlobStorageTestAccount {
     // out-of-band appends.
     conf.setBoolean(KEY_DISABLE_THROTTLING, true);
     conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true);
+    conf.setBoolean(KEY_USE_SECURE_MODE, enableSecureMode);
     configureSecureModeTestSettings(conf);
 
     // Set account URI and initialize Azure file system.

+ 2 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java

@@ -40,9 +40,9 @@ public class TestAzureConcurrentOutOfBandIo {
   static final int BLOB_SIZE = 32 * 1024 * 1024;
 
   // Number of blocks to be written before flush.
-  private static final int NUMBER_OF_BLOCKS = 2;
+  static final int NUMBER_OF_BLOCKS = 2;
 
-  private AzureBlobStorageTestAccount testAccount;
+  protected AzureBlobStorageTestAccount testAccount;
 
   // Overridden TestCase methods.
   @Before

+ 50 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIoWithSecureMode.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Extends TestAzureConcurrentOutOfBandIo in order to run testReadOOBWrites with secure mode
+ * (fs.azure.secure.mode) both enabled and disabled.
+ */
+public class TestAzureConcurrentOutOfBandIoWithSecureMode extends  TestAzureConcurrentOutOfBandIo {
+
+  // Overridden TestCase methods.
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    testAccount = AzureBlobStorageTestAccount.createOutOfBandStore(
+        UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE, true);
+    assumeNotNull(testAccount);
+  }
+}