Преглед изворни кода

HDFS-17543. [ARR] AsyncUtil makes asynchronous code more concise and easier. (#6868). Contributed by Jian Zhang.

Reviewed-by: hfutatzhanghb <hfutzhanghb@163.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Jian Zhang пре 10 месеци
родитељ
комит
a3e5258a4c
15 измењених фајлова са 2299 додато и 0 уклоњено
  1. 89 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java
  2. 115 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java
  3. 162 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java
  4. 83 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java
  5. 174 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java
  6. 185 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java
  7. 74 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java
  8. 380 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java
  9. 120 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java
  10. 96 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java
  11. 35 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java
  12. 249 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java
  13. 66 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java
  14. 194 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java
  15. 277 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java

+ 89 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/ApplyFunction.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * Represents a function that accepts a value of type T and produces a result of type R.
+ * This interface extends {@link Async} and provides methods to apply the function
+ * asynchronously using {@link CompletableFuture}.
+ *
+ * <p>ApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync(input);
+ *    // Can use ApplyFunction
+ *    R result = thenApply(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ */
+@FunctionalInterface
+public interface ApplyFunction<T, R> extends Async<R>{
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   * @throws IOException if an I/O error occurs
+   */
+  R apply(T t) throws IOException;
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link CompletableFuture}.
+   * The function is executed on the same thread as the completion of the given future.
+   *
+   * @param in the input future
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in) {
+    return in.thenApply(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw warpCompletionException(e);
+      }
+    });
+  }
+
+  /**
+   * Applies this function asynchronously to the result of the given {@link CompletableFuture},
+   * using the specified executor for the asynchronous computation.
+   *
+   * @param in the input future
+   * @param executor the executor to use for the asynchronous computation
+   * @return a new future that holds the result of the function application
+   */
+  default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
+    return in.thenApplyAsync(t -> {
+      try {
+        return ApplyFunction.this.apply(t);
+      } catch (IOException e) {
+        throw warpCompletionException(e);
+      }
+    }, executor);
+  }
+}

+ 115 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/Async.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionException;
+
+/**
+ * An interface for asynchronous operations, providing utility methods
+ * and constants related to asynchronous computations.
+ *
+ * @param <R> The type of the result of the asynchronous operation
+ */
+public interface Async<R> {
+
+  /**
+   * A thread-local variable to store the {@link CompletableFuture} instance for the current thread.
+   * <p>
+   * <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture
+   * of the asynchronous method in the thread's local variable
+   */
+  ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE
+      = new ThreadLocal<>();
+
+  /**
+   * Sets the {@link CompletableFuture} instance for the current thread.
+   *
+   * @param completableFuture The {@link CompletableFuture} instance to be set
+   * @param <T> The type of the result in the CompletableFuture
+   */
+  default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) {
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
+  }
+
+  /**
+   * Gets the {@link CompletableFuture} instance for the current thread.
+   *
+   * @return The {@link CompletableFuture} instance for the current thread,
+   * or {@code null} if not set
+   */
+  default CompletableFuture<R> getCurCompletableFuture() {
+    return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
+  }
+
+  /**
+   * Blocks and retrieves the result of the {@link CompletableFuture} instance
+   * for the current thread.
+   *
+   * @return The result of the CompletableFuture, or {@code null} if the thread was interrupted
+   * @throws IOException If the completion exception to the CompletableFuture
+   * is an IOException or a subclass of it
+   */
+  default R result() throws IOException {
+    try {
+      CompletableFuture<R> completableFuture =
+          (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
+      assert completableFuture != null;
+      return completableFuture.get();
+    } catch (InterruptedException e) {
+      return null;
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException)cause;
+      }
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Extracts the real cause of an exception wrapped by CompletionException.
+   *
+   * @param e The incoming exception, which may be a CompletionException.
+   * @return Returns the real cause of the original exception,
+   * or the original exception if there is no cause.
+   */
+  static Throwable unWarpCompletionException(Throwable e) {
+    if (e instanceof CompletionException) {
+      if (e.getCause() != null) {
+        return e.getCause();
+      }
+    }
+    return e;
+  }
+
+  /**
+   * Wraps the incoming exception in a new CompletionException.
+   *
+   * @param e The incoming exception, which may be any type of Throwable.
+   * @return Returns a new CompletionException with the original exception as its cause.
+   */
+  static CompletionException warpCompletionException(Throwable e) {
+    if (e instanceof CompletionException) {
+      return (CompletionException) e;
+    }
+    return new CompletionException(e);
+  }
+}

+ 162 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncApplyFunction.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The AsyncApplyFunction interface represents a function that
+ * asynchronously accepts a value of type T and produces a result
+ * of type R. This interface extends {@link ApplyFunction} and is
+ * designed to be used with asynchronous computation frameworks,
+ * such as Java's {@link java.util.concurrent.CompletableFuture}.
+ *
+ * <p>An implementation of this interface is expected to perform an
+ * asynchronous operation and return a result, which is typically
+ * represented as a {@code CompletableFuture<R>}. This allows for
+ * non-blocking execution of tasks and is particularly useful for
+ * I/O operations or any operation that may take a significant amount
+ * of time to complete.</p>
+ *
+ * <p>AsyncApplyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    T res = doAsync1(input);
+ *    // Can use AsyncApplyFunction
+ *    R result = doAsync2(res);
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the input to the function
+ * @param <R> the type of the result of the function
+ * @see ApplyFunction
+ * @see java.util.concurrent.CompletableFuture
+ */
+@FunctionalInterface
+public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {
+
+  /**
+   * Asynchronously applies this function to the given argument.
+   *
+   * <p>This method is intended to initiate the function application
+   * without waiting for the result. It is typically used when the
+   * result of the operation is not required immediately or when the
+   * operation is part of a larger asynchronous workflow.</p>
+   *
+   * @param t the function argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  void applyAsync(T t) throws IOException;
+
+  /**
+   * Synchronously applies this function to the given argument and
+   * returns the result.
+   *
+   * <p>This method waits for the asynchronous operation to complete
+   * and returns its result. It is useful when the result is needed
+   * immediately and the calling code cannot proceed without it.</p>
+   *
+   * @param t the function argument
+   * @return the result of applying the function to the argument
+   * @throws IOException if an I/O error occurs during the application
+   *                     of the function
+   */
+  @Override
+  default R apply(T t) throws IOException {
+    applyAsync(t);
+    return result();
+  }
+
+  /**
+   * Initiates the asynchronous application of this function to the given result.
+   * <p>
+   * This method calls applyAsync to start the asynchronous operation and then retrieves
+   * the current thread's CompletableFuture using getCurCompletableFuture.
+   * It returns this CompletableFuture, which will be completed with the result of the
+   * asynchronous operation once it is finished.
+   * <p>
+   * This method is useful for chaining with other asynchronous operations, as it allows the
+   * current operation to be part of a larger asynchronous workflow.
+   *
+   * @param t the function argument
+   * @return a CompletableFuture that will be completed with the result of the
+   *         asynchronous operation
+   * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
+   */
+  default CompletableFuture<R> async(T t) throws IOException {
+    applyAsync(t);
+    CompletableFuture<R> completableFuture = getCurCompletableFuture();
+    assert completableFuture != null;
+    return completableFuture;
+  }
+
+  /**
+   * Asynchronously applies this function to the result of the given
+   * CompletableFuture.
+   *
+   * <p>This method chains the function application to the completion
+   * of the input future. It returns a new CompletableFuture that
+   * completes with the function's result when the input future
+   * completes.</p>
+   *
+   * @param in the input future
+   * @return a new CompletableFuture that holds the result of the
+   *         function application
+   */
+  @Override
+  default CompletableFuture<R> apply(CompletableFuture<T> in) {
+    return in.thenCompose(t -> {
+      try {
+        return async(t);
+      } catch (IOException e) {
+        throw warpCompletionException(e);
+      }
+    });
+  }
+
+  /**
+   * Asynchronously applies this function to the result of the given
+   * CompletableFuture, using the specified executor for the
+   * asynchronous computation.
+   *
+   * <p>This method allows for more control over the execution
+   * context of the asynchronous operation, such as running the
+   * operation in a separate thread or thread pool.</p>
+   *
+   * @param in the input future
+   * @param executor the executor to use for the asynchronous
+   *                 computation
+   * @return a new CompletableFuture that holds the result of the
+   *         function application
+   */
+  @Override
+  default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
+    return in.thenComposeAsync(t -> {
+      try {
+        return async(t);
+      } catch (IOException e) {
+        throw warpCompletionException(e);
+      }
+    }, executor);
+  }
+}

+ 83 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncBiFunction.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The {@code AsyncBiFunction} interface represents a bi-function that
+ * asynchronously accepts two arguments and produces a result. This interface
+ * extends the {@link Async} interface and provides a method to apply the
+ * function asynchronously.
+ *
+ * <p>Implementations of this interface are expected to perform an
+ * asynchronous operation that takes two parameters of types T and P, and
+ * returns a result of type R. The asynchronous operation is typically
+ * represented as a {@link CompletableFuture} of the result type R.</p>
+ *
+ * <p>For example, an implementation of this interface might perform an
+ * asynchronous computation using the two input parameters and return a
+ * future representing the result of that computation.</p>
+ *
+ * @param <T> the type of the first argument to the function
+ * @param <P> the type of the second argument to the function
+ * @param <R> the type of the result of the function
+ * @see Async
+ */
+@FunctionalInterface
+public interface AsyncBiFunction<T, P, R> extends Async<R>{
+
+  /**
+   * Asynchronously applies this function to the given arguments.
+   *
+   * <p>This method is intended to initiate the function application
+   * without waiting for the result. It should be used when the
+   * operation can be performed in the background, and the result
+   * is not required immediately.</p>
+   *
+   * @param t the first argument to the function
+   * @param p the second argument to the function
+   * @throws IOException if an I/O error occurs during the application of the function
+   */
+  void applyAsync(T t, P p) throws IOException;
+
+  /**
+   * Initiates the asynchronous application of this function to the given result.
+   * <p>
+   * This method calls applyAsync to start the asynchronous operation and then retrieves
+   * the current thread's CompletableFuture using getCurCompletableFuture.
+   * It returns this CompletableFuture, which will be completed with the result of the
+   * asynchronous operation once it is finished.
+   * <p>
+   * This method is useful for chaining with other asynchronous operations, as it allows the
+   * current operation to be part of a larger asynchronous workflow.
+   *
+   * @param t the first argument to the function
+   * @param p the second argument to the function
+   * @return a CompletableFuture that will be completed with the result of the
+   *         asynchronous operation
+   * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
+   */
+  default CompletableFuture<R> async(T t, P p) throws IOException {
+    applyAsync(t, p);
+    CompletableFuture<R> completableFuture = getCurCompletableFuture();
+    assert completableFuture != null;
+    return completableFuture;
+  }
+}

+ 174 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncCatchFunction.java

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The AsyncCatchFunction interface represents a function that handles exceptions
+ * occurring within an asynchronous operation. It extends the CatchFunction
+ * interface and adds the capability to perform asynchronous exception handling.
+ *
+ * <p>This interface is part of the asynchronous utilities provided by the Hadoop
+ * Distributed File System (HDFS) Federation router. It is used in conjunction
+ * with other asynchronous interfaces such as AsyncRun to build complex,
+ * non-blocking operations.</p>
+ *
+ * <p>An implementation of this interface should define how to handle a caught
+ * exception asynchronously. It takes two parameters: the result of the
+ * asynchronous operation (if any) and the caught exception. The function can then
+ * initiate an asynchronous process to handle the exception, which may involve
+ * logging the error, performing a recovery operation, or any other custom logic.</p>
+ *
+ * <p>For example, the applyAsync method is intended to be called when an exception
+ * is caught during an asynchronous operation. It should initiate the asynchronous
+ * process to handle the exception without blocking the main execution thread.</p>
+ *
+ * <p>The default method apply is provided to allow synchronous operation in case
+ * the asynchronous handling is not required. It simply calls applyAsync and waits
+ * for the result.</p>
+ *
+ * <p>The default method async is provided to allow chaining with other asynchronous
+ * operations. It calls applyAsync and returns a CompletableFuture that completes
+ * with the result of the operation.</p>
+ *
+ * <p>AsyncCatchFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    try{
+ *      R res = doAsync1(input);
+ *    } catch(E e) {
+ *      // Can use AsyncCatchFunction
+ *      R result = doAsync2(res, e);
+ *    }
+ * }
+ * </pre>
+ *
+ * @param <R> the type of the result of the asynchronous operation
+ * @param <E> the type of the exception to catch, extending Throwable
+ * @see CatchFunction
+ * @see AsyncRun
+ */
+@FunctionalInterface
+public interface AsyncCatchFunction<R, E extends Throwable>
+    extends CatchFunction<R, E> {
+
+  /**
+   * Asynchronously applies this function to the given exception.
+   *
+   * <p>This method is intended to be called when an exception is caught
+   * during an asynchronous operation. It should initiate the asynchronous process
+   * to handle the exception without blocking the main execution thread.
+   * The actual handling of the exception, such as logging the error, performing
+   * a recovery operation, or any other custom logic, should be implemented in this
+   * method.</p>
+   *
+   * @param r the result of the asynchronous operation, if any; may be null
+   * @param e the caught exception
+   * @throws IOException if an I/O error occurs during the application of the function
+   */
+  void applyAsync(R r, E e) throws IOException;
+
+  /**
+   * Synchronously applies this function to the given result and exception.
+   * <p>
+   * This method first calls {@code applyAsync} to initiate the asynchronous handling
+   * of the exception. Then, it waits for the asynchronous operation to complete
+   * by calling {@code result}, which retrieves the result of the current
+   * thread's {@link CompletableFuture}.
+   * <p>
+   *
+   * @param r the result of the asynchronous operation, if any; may be null
+   * @param e the caught exception
+   * @return the result after applying the function
+   * @throws IOException if an I/O error occurs during the application of the function
+   */
+  @Override
+  default R apply(R r, E e) throws IOException {
+    applyAsync(r, e);
+    return result();
+  }
+
+  /**
+   * Initiates the asynchronous application of this function to the given result and exception.
+   * <p>
+   * This method calls applyAsync to start the asynchronous operation and then retrieves
+   * the current thread's CompletableFuture using getCurCompletableFuture.
+   * It returns this CompletableFuture, which will be completed with the result of the
+   * asynchronous operation once it is finished.
+   * <p>
+   * This method is useful for chaining with other asynchronous operations, as it allows the
+   * current operation to be part of a larger asynchronous workflow.
+   *
+   * @param r the result of the asynchronous operation, if any; may be null
+   * @param e the caught exception
+   * @return a CompletableFuture that will be completed with the result of the
+   *         asynchronous operation
+   * @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
+   */
+  default CompletableFuture<R> async(R r, E e) throws IOException {
+    applyAsync(r, e);
+    CompletableFuture<R> completableFuture = getCurCompletableFuture();
+    assert completableFuture != null;
+    return completableFuture;
+  }
+
+  /**
+   * Applies the catch function to a {@link CompletableFuture}, handling exceptions of a
+   * specified type.
+   * <p>
+   * This method is a default implementation that provides a way to integrate exception
+   * handling into a chain of asynchronous operations. It takes a {@code CompletableFuture}
+   * and a class object representing the exception type to catch. The method then completes
+   * the future with the result of applying this catch function to the input future and the
+   * specified exception type.
+   * <p>
+   * If the input future completes exceptionally with an instance of the specified exception
+   * type, the catch function is applied to the exception. Otherwise, if the future
+   * completes with a different type of exception or normally, the original result or
+   * exception is propagated.
+   *
+   * @param in the input {@code CompletableFuture} to which the catch function is applied
+   * @param eClazz the class object representing the exception type to catch
+   * @return a new {@code CompletableFuture} that completes with the result of applying
+   *         the catch function, or propagates the original exception if it does not match
+   *         the specified type
+   */
+  @Override
+  default CompletableFuture<R> apply(
+      CompletableFuture<R> in, Class<E> eClazz) {
+    return in.handle((r, e) -> {
+      if (e == null) {
+        return in;
+      }
+      Throwable readException = unWarpCompletionException(e);
+      if (eClazz.isInstance(readException)) {
+        try {
+          return async(r, (E) readException);
+        } catch (IOException ex) {
+          throw warpCompletionException(ex);
+        }
+      }
+      throw warpCompletionException(e);
+    }).thenCompose(result -> result);
+  }
+}

+ 185 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncForEachRun.java

@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The AsyncForEachRun class is part of the asynchronous operation utilities
+ * within the Hadoop Distributed File System (HDFS) Federation router.
+ * It provides the functionality to perform asynchronous operations on each
+ * element of an Iterator, applying a given async function.
+ *
+ * <p>This class is designed to work with other asynchronous interfaces and
+ * utility classes to enable complex asynchronous workflows. It allows for
+ * non-blocking execution of tasks, which can improve the performance and
+ * responsiveness of HDFS operations.</p>
+ *
+ * <p>The class implements the AsyncRun interface, which means it can be used
+ * in asynchronous task chains. It maintains an Iterator of elements to
+ * process, an asyncDoOnce to apply to each element.</p>
+ *
+ * <p>The run method initiates the asynchronous operation, and the doOnce
+ * method recursively applies the asyncDoOnce to each element and handles
+ * the results. If the shouldBreak flag is set, the operation is completed
+ * with the current result.</p>
+ *
+ * <p>AsyncForEachRun is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ * for (I element : elements) {
+ *     R result = asyncDoOnce(element);
+ * }
+ * return result;
+ * }
+ * </pre>
+ *
+ * @param <I> the type of the elements being iterated over
+ * @param <R> the type of the final result after applying the thenApply function
+ * @see AsyncRun
+ * @see AsyncBiFunction
+ */
+public class AsyncForEachRun<I, R> implements AsyncRun<R> {
+
+  // Indicates whether the iteration should be broken immediately
+  // after the next asynchronous operation is completed.
+  private boolean shouldBreak = false;
+  // The Iterator over the elements to process asynchronously.
+  private Iterator<I> iterator;
+  // The async function to apply to each element from the iterator.
+  private AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDoOnce;
+
+  /**
+   * Initiates the asynchronous foreach operation by starting the iteration process
+   * over the elements provided by the iterator. This method sets up the initial
+   * call to doOnce(R) with a null result, which begins the recursive
+   * application of the async function to each element of the iterator.
+   *
+   * <p>This method is an implementation of the {@link AsyncRun} interface's
+   * {@code run} method, allowing it to be used in a chain of asynchronous
+   * operations. It is responsible for starting the asynchronous processing and
+   * handling the completion of the operation through the internal
+   * {@link CompletableFuture}.</p>
+   *
+   * <p>If an exception occurs during the first call to {@code doOnce}, the
+   * exception is caught and the internal CompletableFuture is completed
+   * exceptionally with a {@link CompletionException} wrapping the original
+   * IOException.</p>
+   *
+   * <p>After initiating the operation, the method sets the current thread's
+   * {@link Async} {@link CompletableFuture} by calling
+   * {@link #setCurCompletableFuture(CompletableFuture)} with the internal result
+   * CompletableFuture. This allows other parts of the asynchronous workflow to
+   * chain further operations or handle the final result once the foreach loop
+   * completes.</p>
+   *
+   * @see AsyncRun
+   * @see Async#setCurCompletableFuture(CompletableFuture)
+   */
+  @Override
+  public void run() {
+    if (iterator == null || !iterator.hasNext()) {
+      setCurCompletableFuture(CompletableFuture.completedFuture(null));
+      return;
+    }
+    CompletableFuture<R> result;
+    try {
+      result = doOnce(iterator.next());
+    } catch (IOException ioe) {
+      result = new CompletableFuture<>();
+      result.completeExceptionally(warpCompletionException(ioe));
+    }
+    setCurCompletableFuture(result);
+  }
+
+  /**
+   * Recursively applies the async function to the next element of the iterator
+   * and handles the result. This method is called for each iteration of the
+   * asynchronous foreach loop, applying the async function to each element
+   * and chaining the results.
+   *
+   * <p>If the iterator has no more elements, the CompletableFuture held by this
+   * class is completed with the last result. If an exception occurs during
+   * the application of the async function, it is propagated to the
+   * CompletableFuture, which completes exceptionally.</p>
+   *
+   * <p>This method is designed to be called by the {@link #run()} method and
+   * handles the iteration logic, including breaking the loop if the
+   * {@link #shouldBreak} flag is set to true.</p>
+   *
+   * @param element The current element from the async function application.
+   * @throws IOException if an I/O error occurs during the application of the async function.
+   */
+  private CompletableFuture<R> doOnce(I element) throws IOException {
+    CompletableFuture<R> completableFuture = asyncDoOnce.async(AsyncForEachRun.this, element);
+    return completableFuture.thenCompose(res -> {
+      if (shouldBreak || !iterator.hasNext()) {
+        return completableFuture;
+      }
+      try {
+        return doOnce(iterator.next());
+      } catch (IOException e) {
+        throw warpCompletionException(e);
+      }
+    });
+  }
+
+  /**
+   * Triggers the termination of the current asynchronous iteration.
+   *
+   * <p>This method is used to break out of the asynchronous for-each loop
+   * prematurely. It sets a flag that indicates the iteration should be
+   * terminated at the earliest opportunity. This is particularly useful when
+   * the processing logic determines that further iteration is unnecessary
+   * or when a specific condition has been met.</p>
+   *
+   * <p>Once this method is called, the next time the loop is about to process
+   * a new element, it will check the flag and cease operation, allowing the
+   * application to move on to the next step or complete the task.</p>
+   */
+  public void breakNow() {
+    shouldBreak = true;
+  }
+
+  /**
+   * Sets the Iterator for the elements to be processed in the asynchronous operation.
+   *
+   * @param forEach The Iterator over the elements.
+   * @return The current AsyncForEachRun instance for chaining.
+   */
+  public AsyncForEachRun<I, R> forEach(Iterator<I> forEach) {
+    this.iterator = forEach;
+    return this;
+  }
+
+  /**
+   * Sets the async function to apply to each element from the iterator.
+   *
+   * @param asyncDo The async function.
+   * @return The current AsyncForEachRun instance for chaining.
+   */
+  public AsyncForEachRun<I, R> asyncDo(AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) {
+    this.asyncDoOnce = asyncDo;
+    return this;
+  }
+}

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncRun.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The AsyncRun interface represents an asynchronous operation that can be
+ * executed in the context of the Hadoop Distributed File System (HDFS)
+ * Federation router. Implementations of this interface are responsible for
+ * performing a task and completing a {@link CompletableFuture} with the
+ * result of the operation.
+ *
+ * <p>
+ * The {@code run} method of this interface is intended to be used in an
+ * asynchronous workflow, where the operation may involve I/O tasks or
+ * other long-running processes. By implementing this interface, classes
+ * can define custom asynchronous behavior that can be chained with other
+ * asynchronous operations using utility methods provided by the
+ * {@link AsyncUtil} class.</p>
+ *
+ * <p>
+ * For example, an implementation of AsyncRun could perform a non-blocking
+ * read or write operation to HDFS, and upon completion, it could use
+ * AsyncUtil methods to handle the result or propagate any exceptions that
+ * occurred during the operation.</p>
+ *
+ * @param <R> the type of the result produced by the asynchronous operation
+ * @see AsyncUtil
+ */
+@FunctionalInterface
+public interface AsyncRun<R> extends Async<R> {
+
+  /**
+   * Executes the asynchronous operation represented by this AsyncRun instance.
+   * This method is expected to perform the operation and, upon completion,
+   * complete the current thread's {@link CompletableFuture} with the result.
+   *
+   * @throws IOException if an I/O error occurs during the execution of the operation
+   */
+  void run() throws IOException;
+
+  /**
+   * Provides an asynchronous version of the {@code run} method, which returns a
+   * {@link CompletableFuture} representing the result of the operation.
+   * This method is typically used in an asynchronous workflow to initiate the
+   * operation without waiting for its completion.
+   *
+   * @return a CompletableFuture that completes with the result of the operation
+   * @throws IOException if an I/O error occurs during the initiation of the operation
+   */
+  default CompletableFuture<R> async() throws IOException {
+    run();
+    CompletableFuture<R> completableFuture = getCurCompletableFuture();
+    assert completableFuture != null;
+    return completableFuture;
+  }
+}

+ 380 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncUtil.java

@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.CUR_COMPLETABLE_FUTURE;
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The AsyncUtil class provides a collection of utility methods to simplify
+ * the implementation of asynchronous operations using Java's CompletableFuture.
+ * It encapsulates common patterns such as applying functions, handling exceptions,
+ * and executing tasks in a non-blocking manner. This class is designed to work
+ * with Hadoop's asynchronous router operations in HDFS Federation.
+ *
+ * <p>The utility methods support a fluent-style API, allowing for the chaining of
+ * asynchronous operations. For example, after an asynchronous operation completes,
+ * a function can be applied to its result, and the process can continue with
+ * the new result. This is particularly useful for complex workflows that require
+ * multiple steps, where each step depends on the completion of the previous one.</p>
+ *
+ * <p>The class also provides methods to handle exceptions that may occur during a
+ * synchronous operation. This ensures that error handling is integrated smoothly
+ * into the workflow, allowing for robust and fault-tolerant applications.</p>
+ *
+ * @see CompletableFuture
+ * @see ApplyFunction
+ * @see AsyncApplyFunction
+ * @see AsyncRun
+ * @see AsyncForEachRun
+ * @see CatchFunction
+ * @see AsyncCatchFunction
+ * @see FinallyFunction
+ * @see AsyncBiFunction
+ */
+public final class AsyncUtil {
+  private static final Boolean BOOLEAN_RESULT = false;
+  private static final Long LONG_RESULT = -1L;
+  private static final Object NULL_RESULT = null;
+
+  private AsyncUtil(){}
+
+  /**
+   * Provides a default value based on the type specified.
+   *
+   * @param clazz The {@link Class} object representing the type of the value
+   *              to be returned.
+   * @param <R>   The type of the value to be returned.
+   * @return An object with a value determined by the type:
+   *         <ul>
+   *           <li>{@code false} if {@code clazz} is {@link Boolean},
+   *           <li>-1 if {@code clazz} is {@link Long},
+   *           <li>{@code null} for any other type.
+   *         </ul>
+   */
+  public static <R> R asyncReturn(Class<R> clazz) {
+    if (clazz == null) {
+      return null;
+    }
+    if (clazz.equals(Boolean.class)) {
+      return (R) BOOLEAN_RESULT;
+    } else if (clazz.equals(Long.class)) {
+      return (R) LONG_RESULT;
+    }
+    return (R) NULL_RESULT;
+  }
+
+  /**
+   * Synchronously returns the result of the current asynchronous operation.
+   * This method is designed to be used in scenarios where the result of an
+   * asynchronous operation is needed synchronously, and it is known that
+   * the operation has completed.
+   *
+   * <p>The method retrieves the current thread's {@link CompletableFuture} and
+   * attempts to get the result. If the future is not yet complete, this
+   * method will block until the result is available. If the future completed
+   * exceptionally, the cause of the exception is thrown as a runtime
+   * exception wrapped in an {@link ExecutionException}.</p>
+   *
+   * <p>This method is typically used after an asynchronous operation has been
+   * initiated and the caller needs to obtain the result in a synchronous
+   * manner, for example, when bridging between asynchronous and synchronous
+   * code paths.</p>
+   *
+   * @param <R> the type of the result to be returned
+   * @param clazz the {@link Class} object representing the type of the value
+   *              to be returned, used to cast the result to the correct type
+   * @return the result of the asynchronous operation as an object of the
+   *         specified class
+   * @throws Exception if an error occurs during the synchronous retrieval of
+   *                    the result, including the original exception if the
+   *                    future completed exceptionally
+   */
+  public static <R> R syncReturn(Class<R> clazz)
+      throws Exception {
+    CompletableFuture<Object> completableFuture = CUR_COMPLETABLE_FUTURE.get();
+    assert completableFuture != null;
+    try {
+      return (R) completableFuture.get();
+    } catch (ExecutionException e) {
+      throw (Exception)e.getCause();
+    }
+  }
+
+  /**
+   * Completes the current asynchronous operation with the specified value.
+   * This method sets the result of the current thread's {@link CompletableFuture}
+   * to the provided value, effectively completing the asynchronous operation.
+   *
+   * @param value The value to complete the future with.
+   * @param <R>    The type of the value to be completed.
+   */
+  public static <R> void asyncComplete(R value) {
+    CUR_COMPLETABLE_FUTURE.set(
+        CompletableFuture.completedFuture(value));
+  }
+
+  /**
+   * Completes the current asynchronous operation with an exception.
+   * This method sets the result of the current thread's {@link CompletableFuture}
+   * to an exceptional completion, using the provided {@link Throwable} as the cause.
+   * This is typically used to handle errors in asynchronous operations.
+   *
+   * @param e The exception to complete the future exceptionally with.
+   */
+  public static void asyncThrowException(Throwable e) {
+    CompletableFuture<Object> result = new CompletableFuture<>();
+    result.completeExceptionally(warpCompletionException(e));
+    CUR_COMPLETABLE_FUTURE.set(result);
+  }
+
+  /**
+   * Applies an asynchronous function to the current {@link CompletableFuture}.
+   * This method retrieves the current thread's {@link CompletableFuture} and applies
+   * the provided {@link ApplyFunction} to it. It is used to chain asynchronous
+   * operations, where the result of one operation is used as the input for the next.
+   *
+   * @param function The asynchronous function to apply, which takes a type T and
+   *                 produces a type R.
+   * @param <T>       The type of the input to the function.
+   * @param <R>       The type of the result of the function.
+   * @see CompletableFuture
+   * @see ApplyFunction
+   */
+  public static <T, R> void asyncApply(ApplyFunction<T, R> function) {
+    CompletableFuture<T> completableFuture =
+        (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get();
+    assert completableFuture != null;
+    CompletableFuture<R> result = function.apply(completableFuture);
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+  }
+
+  /**
+   * Applies an asynchronous function to the current {@link CompletableFuture}
+   * using the specified executor. This method retrieves the current thread's
+   * {@link CompletableFuture} and applies the provided{@link ApplyFunction} to
+   * it with the given executor service. It allows for more control over the
+   * execution context, such as running the operation in a separate thread or
+   * thread pool.
+   *
+   * <p>This is particularly useful when you need to perform blocking I/O operations
+   * or other long-running tasks without blocking the main thread or
+   * when you want to manage the thread resources more efficiently.</p>
+   *
+   * @param function The asynchronous function to apply, which takes a type T and
+   *                 produces a type R.
+   * @param executor The executor service used to run the asynchronous function.
+   * @param <T> The type of the input to the function.
+   * @param <R> The type of the result of the function.
+   * @see CompletableFuture
+   * @see ApplyFunction
+   */
+  public static <T, R> void asyncApplyUseExecutor(
+      ApplyFunction<T, R> function, Executor executor) {
+    CompletableFuture<T> completableFuture =
+        (CompletableFuture<T>) CUR_COMPLETABLE_FUTURE.get();
+    assert completableFuture != null;
+    CompletableFuture<R> result = function.apply(completableFuture, executor);
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+  }
+
+  /**
+   * Attempts to execute an asynchronous task defined by the provided
+   * {@link AsyncRun} and associates it with the current thread's
+   * {@link CompletableFuture}. This method is useful for trying operations
+   * that may throw exceptions and handling them asynchronously.
+   *
+   * <p>The provided {@code asyncRun} is a functional interface that
+   * encapsulates the logic to be executed asynchronously. It is executed in
+   * the context of the current CompletableFuture, allowing for chaining further
+   * asynchronous operations based on the result or exception of this try.</p>
+   *
+   * <p>If the operation completes successfully, the result is propagated to the
+   * next operation in the chain. If an exception occurs, it can be caught and
+   * handled using the {@link #asyncCatch(CatchFunction, Class)} method,
+   * allowing for error recovery or alternative processing.</p>
+   *
+   * @param asyncRun The asynchronous task to be executed, defined by
+   *                  an {@link AsyncRun} instance.
+   * @param <R>       The type of the result produced by the asynchronous task.
+   * @see AsyncRun
+   * @see #asyncCatch(CatchFunction, Class)
+   */
+  public static <R> void asyncTry(AsyncRun<R> asyncRun) {
+    try {
+      CompletableFuture<R> result = asyncRun.async();
+      CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+    } catch (Throwable e) {
+      asyncThrowException(e);
+    }
+  }
+
+  /**
+   * Handles exceptions to a specified type that may occur during
+   * an asynchronous operation. This method is used to catch and deal
+   * with exceptions in a non-blocking manner, allowing the application
+   * to continue processing even when errors occur.
+   *
+   * <p>The provided {@code function} is a {@link CatchFunction} that
+   * defines how to handle the caught exception. It takes the result of
+   * the asynchronous operation (if any) and the caught exception, and
+   * returns a new result or modified result to continue the asynchronous
+   * processing.</p>
+   *
+   * <p>The {@code eClass} parameter specifies the type of exceptions to
+   * catch. Only exceptions that are instances of this type (or its
+   * subclasses) will be caught and handled by the provided function.</p>
+   *
+   * @param function The {@link CatchFunction} that defines how to
+   *                  handle the caught exception.
+   * @param eClass   The class of the exception type to catch.
+   * @param <R>       The type of the result of the asynchronous operation.
+   * @param <E>       The type of the exception to catch.
+   * @see CatchFunction
+   */
+  public static <R, E extends Throwable> void asyncCatch(
+      CatchFunction<R, E> function, Class<E> eClass) {
+    CompletableFuture<R> completableFuture =
+        (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
+    assert completableFuture != null;
+    CompletableFuture<R> result = function.apply(completableFuture, eClass);
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+  }
+
+  /**
+   * Executes a final action after an asynchronous operation
+   * completes, regardless of whether the operation was successful
+   * or resulted in an exception. This method provides a way to
+   * perform cleanup or finalization tasks in an asynchronous
+   * workflow.
+   *
+   * <p>The provided {@code function} is a {@link FinallyFunction}
+   * that encapsulates the logic to be executed after the
+   * asynchronous operation. It takes the result of the operation
+   * and returns a new result, which can be used to continue the
+   * asynchronous processing or to handle the final output of
+   * the workflow.</p>
+   *
+   * <p>This method is particularly useful for releasing resources,
+   * closing connections, or performing other cleanup actions that
+   * need to occur after all other operations have completed.</p>
+   *
+   * @param function The {@link FinallyFunction} that defines
+   *                  the final action to be executed.
+   * @param <R>       The type of the result of the asynchronous
+   *                   operation.
+   * @see FinallyFunction
+   */
+  public static <R> void asyncFinally(FinallyFunction<R> function) {
+    CompletableFuture<R> completableFuture =
+        (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
+    assert completableFuture != null;
+    CompletableFuture<R> result = function.apply(completableFuture);
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+  }
+
+  /**
+   * Executes an asynchronous operation for each element in an Iterator, applying
+   * a given async function to each element. This method is part of the asynchronous
+   * utilities provided to facilitate non-blocking operations on collections of elements.
+   *
+   * <p>The provided {@code asyncDo} is an {@link AsyncBiFunction} that encapsulates
+   * the logic to be executed asynchronously for each element. It is executed in
+   * the context of the current CompletableFuture, allowing for chaining further
+   * asynchronous operations based on the result or exception of each iteration.</p>
+   *
+   * <p>The method is particularly useful for performing asynchronous iterations
+   * over collections where the processing of each element is independent.</p>
+   *
+   * @param forEach the Iterator over which to iterate and apply the async function
+   * @param asyncDo the asynchronous function to apply to each element of the Iterator,
+   *                 implemented as an {@link AsyncBiFunction}
+   * @param <I> the type of the elements being iterated over
+   * @param <R> the type of the result produced by the asynchronous task applied to each element
+   * @see AsyncBiFunction
+   * @see AsyncForEachRun
+   */
+  public static <I, R> void asyncForEach(
+      Iterator<I> forEach, AsyncBiFunction<AsyncForEachRun<I, R>, I, R> asyncDo) {
+    AsyncForEachRun<I, R> asyncForEachRun = new AsyncForEachRun<>();
+    asyncForEachRun.forEach(forEach).asyncDo(asyncDo).run();
+  }
+
+  /**
+   * Applies an asynchronous operation to each element of a collection
+   * and aggregates the results. This method is designed to process a
+   * collection of elements concurrently using asynchronous tasks, and
+   * then combine the results into a single aggregated result.
+   *
+   * <p>The operation defined by {@code asyncDo} is applied to each
+   * element of the collection. This operation is expected to return a
+   * {@link CompletableFuture} representing the asynchronous task.
+   * Once all tasks have been started, the method (async) waits for all of
+   * them to complete and then uses the {@code then} function to
+   * process and aggregate the results.</p>
+   *
+   * <p>The {@code then} function takes an array of {@link CompletableFuture}
+   * instances, each representing the future result of an individual
+   * asynchronous operation. It should return a new aggregated result
+   * based on these futures. This allows for various forms of result
+   * aggregation, such as collecting all results into a list,
+   * reducing them to a single value, or performing any other custom
+   * aggregation logic.</p>
+   *
+   * @param collection the collection of elements to process.
+   * @param asyncDo    the asynchronous operation to apply to each
+   *                   element. It must return a {@link CompletableFuture}
+   *                   representing the operation.
+   * @param then        a function that takes an array of futures
+   *                   representing the results of the asynchronous
+   *                   operations and returns an aggregated result.
+   * @param <I>        the type of the elements in the collection.
+   * @param <R>        the type of the intermediate result from the
+   *                   asynchronous operations.
+   * @param <P>        the type of the final aggregated result.
+   * @see CompletableFuture
+   */
+  public static <I, R, P> void asyncCurrent(
+      Collection<I> collection, AsyncApplyFunction<I, R> asyncDo,
+      Function<CompletableFuture<R>[], P> then) {
+    CompletableFuture<R>[] completableFutures =
+        new CompletableFuture[collection.size()];
+    int i = 0;
+    for(I entry : collection) {
+      CompletableFuture<R> future = null;
+      try {
+        future = asyncDo.async(entry);
+      } catch (IOException e) {
+        future = new CompletableFuture<>();
+        future.completeExceptionally(warpCompletionException(e));
+      }
+      completableFutures[i++] = future;
+    }
+    CompletableFuture<P> result = CompletableFuture.allOf(completableFutures)
+        .handle((unused, throwable) -> then.apply(completableFutures));
+    CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) result);
+  }
+}

+ 120 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/CatchFunction.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.unWarpCompletionException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The {@code CatchFunction} interface represents a function that handles exceptions
+ * occurring within an asynchronous operation. It provides a mechanism to catch and
+ * process exceptions to a specific type, allowing for error recovery or alternative
+ * processing paths within an asynchronous workflow.
+ *
+ * <p>This interface is part of the asynchronous utilities provided by the Hadoop
+ * Distributed File System (HDFS) Federation router. It is used in conjunction with
+ * other asynchronous interfaces such as {@link AsyncRun} and
+ * {@link FinallyFunction} to build complex, non-blocking operations.</p>
+ *
+ * <p>An implementation of this interface should define how to handle a caught
+ * exception. It takes two parameters: the result of the asynchronous operation (if
+ * any) and the caught exception. The function can then return a new result or modify
+ * the existing result to continue the asynchronous processing.</p>
+ *
+ * <p>CatchFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    try{
+ *      R res = doAsync(input);
+ *    } catch(E e) {
+ *      // Can use CatchFunction
+ *      R result = thenApply(res, e);
+ *    }
+ * }
+ * </pre>
+ *
+ * @param <R> the type of the result of the asynchronous operation
+ * @param <E> the type of the exception to catch, extending {@link Throwable}
+ * @see AsyncRun
+ * @see FinallyFunction
+ */
+@FunctionalInterface
+public interface CatchFunction<R, E extends Throwable>
+    extends Async<R>{
+
+  /**
+   * Applies this catch function to the given result and exception.
+   * <p>
+   * This method is called to process an exception that occurred during an asynchronous
+   * operation. The implementation of this method should define how to handle the
+   * caught exception. It may involve logging the error, performing a recovery operation,
+   * or any other custom error handling logic.
+   * <p>
+   * The method takes two parameters: the result of the asynchronous operation (if any),
+   * and the caught exception. Depending on the implementation, the method may return a
+   * new result, modify the existing result, or throw a new exception.
+   *
+   * @param r the result of the asynchronous operation, which may be null if the operation
+   *           did not complete successfully
+   * @param e the caught exception, which the function should handle
+   * @return the result after applying the catch function, which may be a new result or a
+   *         modified version of the input result
+   * @throws IOException if an I/O error occurs during the application of the catch function
+   */
+  R apply(R r, E e) throws IOException;
+
+  /**
+   * Applies the catch function to a {@code CompletableFuture}, handling exceptions of a
+   * specified type.
+   * <p>
+   * This default method provides a way to integrate exception handling into a chain of
+   * asynchronous operations. It takes a {@code CompletableFuture} and a class object
+   * representing the type of exception to catch. The method uses the handle method of the
+   * {@code CompletableFuture} to apply the catch function.
+   * <p>
+   * If the input future completes exceptionally with an instance of the specified exception
+   * type, the catch function is applied to the exception. If the future completes with a
+   * different type of exception or normally, the original result or exception is propagated.
+   *
+   * @param in the input {@code CompletableFuture} to which the catch function is applied
+   * @param eClazz the class object representing the exception type to catch
+   * @return a new {@code CompletableFuture} that completes with the result of applying
+   *         the catch function, or propagates the original exception if it does not match
+   *         the specified type
+   */
+  default CompletableFuture<R> apply(
+      CompletableFuture<R> in, Class<E> eClazz) {
+    return in.handle((r, e) -> {
+      if (e == null) {
+        return r;
+      }
+      Throwable readException = unWarpCompletionException(e);
+      if (eClazz.isInstance(readException)) {
+        try {
+          return CatchFunction.this.apply(r, (E) readException);
+        } catch (IOException ioe) {
+          throw warpCompletionException(ioe);
+        }
+      }
+      throw warpCompletionException(e);
+    });
+  }
+}

+ 96 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/FinallyFunction.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
+
+/**
+ * The {@code FinallyFunction} interface represents a function that is used to perform
+ * final actions after an asynchronous operation completes, regardless of whether the
+ * operation was successful or resulted in an exception. This interface is part of
+ * the asynchronous utilities provided by the Hadoop Distributed File System (HDFS)
+ * Federation router.
+ *
+ * <p>A {@code FinallyFunction} is typically used for cleanup or finalization tasks,
+ * such as releasing resources, closing connections, or performing other actions that
+ * need to occur after all other operations have completed.</p>
+ *
+ * <p>An implementation of this interface should define what the final action is. It
+ * takes the result of the asynchronous operation as an argument and returns a new
+ * result, which can be the same as the input result or a modified version of it.</p>
+ *
+ * <p>FinallyFunction is used to implement the following semantics:</p>
+ * <pre>
+ * {@code
+ *    try{
+ *      R res = doAsync(input);
+ *    } catch(...) {
+ *      ...
+ *    } finally {
+ *      // Can use FinallyFunction
+ *      R result = thenApply(res);
+ *    }
+ * }
+ * </pre>
+ *
+ * @param <R> the type of the result of the asynchronous operation
+ */
+@FunctionalInterface
+public interface FinallyFunction<R> {
+
+  /**
+   * Applies this final action function to the result of an asynchronous operation.
+   *
+   * @param r the result of the asynchronous operation, which may be null if the
+   *           operation did not complete successfully
+   * @return the result after applying the final action, which may be a new result or a
+   *         modified version of the input result
+   * @throws IOException if an I/O error occurs during the application of the final action
+   */
+  R apply(R r) throws IOException;
+
+  /**
+   * Applies this final action function to a {@code CompletableFuture}, which is expected
+   * to be the result of an asynchronous operation.
+   * <p>
+   * This method is a convenience that simplifies the use of {@code FinallyFunction}
+   * with asynchronous operations. It handles the completion of the future and applies
+   * the {@code FinallyFunction} to the result.
+   *
+   * @param in the {@code CompletableFuture} representing the asynchronous operation
+   * @return a new {@code CompletableFuture} that completes with the result of applying
+   *         the final action function
+   */
+  default CompletableFuture<R> apply(CompletableFuture<R> in) {
+    return in.handle((r, e) -> {
+      try {
+        R ret = apply(r);
+        if (e != null) {
+          throw warpCompletionException(e);
+        } else {
+          return ret;
+        }
+      } catch (IOException ioe) {
+        throw warpCompletionException(ioe);
+      }
+    });
+  }
+}

+ 35 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/package-info.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes that facilitate asynchronous operations within the Hadoop
+ * Distributed File System (HDFS) Federation router. These classes are designed to work with
+ * the Hadoop ecosystem, providing utilities and interfaces to perform non-blocking tasks that
+ * can improve the performance and responsiveness of HDFS operations.
+ *
+ * <p>These classes work together to enable complex asynchronous workflows, making it easier to
+ * write code that can handle long-running tasks without blocking, thus improving the overall
+ * efficiency and scalability of HDFS operations.</p>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 249 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/AsyncClass.java

@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCurrent;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncFinally;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncThrowException;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
+
+/**
+ * AsyncClass demonstrates the conversion of synchronous methods
+ * from SyncClass into asynchronous operations using AsyncUtil.
+ * This class overrides methods with asynchronous logic, enhancing
+ * the performance by allowing non-blocking task execution.
+ *
+ * <p>
+ * By utilizing AsyncUtil's utility methods, such as asyncApply,
+ * asyncForEach, and others, each method in AsyncClass can perform
+ * time-consuming tasks on a separate thread, thus not blocking
+ * the main execution thread.
+ * </p>
+ *
+ * <p>
+ * For example, the applyMethod in AsyncClass is an async version of
+ * the same method in SyncClass. It uses asyncApply to schedule
+ * the timeConsumingMethod to run asynchronously and returns a
+ * CompletableFuture that will be completed with the result of
+ * the operation.
+ * </p>
+ *
+ * <p>
+ * This class serves as an example of how to transform synchronous
+ * operations into asynchronous ones using the AsyncUtil tools,
+ * which can be applied to other parts of the HDFS Federation
+ * router or similar systems to improve concurrency and
+ * performance.
+ * </p>
+ *
+ * @see SyncClass
+ * @see AsyncUtil
+ * @see CompletableFuture
+ */
+public class AsyncClass extends SyncClass{
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AsyncClass.class);
+  private ExecutorService executorService;
+  private final static String ASYNC_WORKER = "Async Worker";
+
+  public AsyncClass(long timeConsuming) {
+    super(timeConsuming);
+    executorService = Executors.newFixedThreadPool(1, r -> {
+      Thread asyncWork = new Thread(r);
+      asyncWork.setDaemon(true);
+      asyncWork.setName(ASYNC_WORKER);
+      return asyncWork;
+    });
+  }
+
+  @Override
+  public String applyMethod(int input) {
+    timeConsumingMethod(input);
+    asyncApply(res -> {
+      return "applyMethod" + res;
+    });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String applyMethod(int input, boolean canException) {
+    timeConsumingMethod(input);
+    asyncApply(res -> {
+      if (canException) {
+        if (res.equals("[2]")) {
+          throw new IOException("input 2 exception");
+        } else if (res.equals("[3]")) {
+          throw new RuntimeException("input 3 exception");
+        }
+      }
+      return res;
+    });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String exceptionMethod(int input) {
+    if (input == 2) {
+      asyncThrowException(new IOException("input 2 exception"));
+      return null;
+    } else if (input == 3) {
+      asyncThrowException(new RuntimeException("input 3 exception"));
+      return null;
+    }
+    return applyMethod(input);
+  }
+
+  @Override
+  public String forEachMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    asyncForEach(list.iterator(),
+        (forEach, input) -> {
+          timeConsumingMethod(input);
+          asyncApply(res -> {
+            result.append("forEach" + res + ",");
+            return result.toString();
+          });
+        });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String forEachBreakMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    asyncForEach(list.iterator(),
+        (forEach, input) -> {
+          timeConsumingMethod(input);
+          asyncApply(res -> {
+            if (res.equals("[2]")) {
+              forEach.breakNow();
+            } else {
+              result.append("forEach" + res + ",");
+            }
+            return result.toString();
+          });
+        });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String forEachBreakByExceptionMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    asyncForEach(list.iterator(),
+        (forEach, input) -> {
+          asyncTry(() -> {
+            applyMethod(input, true);
+            asyncApply(res -> {
+              result.append("forEach" + res + ",");
+              return result.toString();
+            });
+          });
+          asyncCatch((res, e) -> {
+            if (e instanceof IOException) {
+              result.append(e + ",");
+            } else if (e instanceof RuntimeException) {
+              forEach.breakNow();
+            }
+            return result.toString();
+          }, Exception.class);
+        });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String applyThenApplyMethod(int input) {
+    timeConsumingMethod(input);
+    asyncApply((AsyncApplyFunction<String, String>) res -> {
+      if (res.equals("[1]")) {
+        timeConsumingMethod(2);
+      } else {
+        asyncComplete(res);
+      }
+    });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String applyCatchThenApplyMethod(int input) {
+    asyncTry(() -> applyMethod(input, true));
+    asyncCatch((AsyncCatchFunction<String, IOException>) (res, ioe) -> {
+      applyMethod(1);
+    }, IOException.class);
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String applyCatchFinallyMethod(
+      int input, List<String> resource) {
+    asyncTry(() -> applyMethod(input, true));
+    asyncCatch((res, e) -> {
+      throw new IOException("Catch " + e.getMessage());
+    }, IOException.class);
+    asyncFinally((FinallyFunction<String>) res -> {
+      resource.clear();
+      return res;
+    });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String currentMethod(List<Integer> list) {
+    asyncCurrent(list,
+        input -> applyMethod(input, true),
+        (Function<CompletableFuture<String>[], String>) futures -> {
+          StringBuilder result = new StringBuilder();
+          for (Future<String> future : futures) {
+            try {
+              String res = future.get();
+              result.append(res + ",");
+            } catch (Exception e) {
+              result.append(e.getMessage() + ",");
+            }
+          }
+          return result.toString();
+        });
+    return asyncReturn(String.class);
+  }
+
+  @Override
+  public String timeConsumingMethod(int input) {
+    CompletableFuture<Object> result = CompletableFuture
+        .supplyAsync(() -> {
+          LOG.info("[{} thread] invoke consumingMethod for parameter: {}",
+              Thread.currentThread().getName(), input);
+          return AsyncClass.super.timeConsumingMethod(input);
+        }, executorService);
+    Async.CUR_COMPLETABLE_FUTURE.set(result);
+    return null;
+  }
+}

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/BaseClass.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * It defines a set of methods that can be executed either synchronously
+ * or asynchronously, depending on the implementation.
+ *
+ * <p>
+ * This interface is designed to abstract the common operations that need
+ * to be performed in a time-consuming manner, such as processing a list
+ * of items or applying a method that involves I/O operations. By defining
+ * these methods in an interface, it allows for both synchronous and
+ * asynchronous implementations, providing flexibility and the ability to
+ * improve performance without changing the external API.
+ * </p>
+ *
+ * <p>
+ * Implementations of this interface are expected to provide concrete
+ * implementations of the defined methods, either by performing the
+ * operations synchronously in a blocking manner or by performing them
+ * asynchronously in a non-blocking manner.
+ * </p>
+ *
+ * @see SyncClass
+ * @see AsyncClass
+ */
+public interface BaseClass {
+  String applyMethod(int input);
+
+  String applyMethod(int input, boolean canException) throws IOException;
+
+  String exceptionMethod(int input) throws IOException;
+
+  String forEachMethod(List<Integer> list);
+
+  String forEachBreakMethod(List<Integer> list);
+
+  String forEachBreakByExceptionMethod(List<Integer> list);
+
+  String applyThenApplyMethod(int input);
+
+  String applyCatchThenApplyMethod(int input);
+
+  String applyCatchFinallyMethod(int input, List<String> resource) throws IOException;
+
+  String currentMethod(List<Integer> list);
+}

+ 194 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/SyncClass.java

@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * SyncClass implements BaseClass, providing a synchronous
+ * version of the methods. All operations are performed in a
+ * blocking manner, waiting for completion before proceeding.
+ *
+ * This class is the foundation for the AsyncClass, which
+ * provides asynchronous implementations.
+ *
+ * @see BaseClass
+ * @see AsyncClass
+ */
+public class SyncClass implements BaseClass{
+  private long timeConsuming;
+
+  public SyncClass(long timeConsuming) {
+    this.timeConsuming = timeConsuming;
+  }
+
+  @Override
+  public String applyMethod(int input) {
+    String res = timeConsumingMethod(input);
+    return "applyMethod" + res;
+  }
+
+  @Override
+  public String applyMethod(int input, boolean canException) throws IOException {
+    String res = timeConsumingMethod(input);
+    if (canException) {
+      if (res.equals("[2]")) {
+        throw new IOException("input 2 exception");
+      } else if (res.equals("[3]")) {
+        throw new RuntimeException("input 3 exception");
+      }
+    }
+    return res;
+  }
+
+  @Override
+  public String exceptionMethod(int input) throws IOException {
+    if (input == 2) {
+      throw new IOException("input 2 exception");
+    } else if (input == 3) {
+      throw new RuntimeException("input 3 exception");
+    }
+    return applyMethod(input);
+  }
+
+  @Override
+  public String forEachMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    for (int input : list) {
+      String res = timeConsumingMethod(input);
+      result.append("forEach" + res + ",");
+    }
+    return result.toString();
+  }
+
+  @Override
+  public String forEachBreakMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    for (int input : list) {
+      String res = timeConsumingMethod(input);
+      if (res.equals("[2]")) {
+        break;
+      }
+      result.append("forEach" + res + ",");
+    }
+    return result.toString();
+  }
+
+  @Override
+  public String forEachBreakByExceptionMethod(List<Integer> list) {
+    StringBuilder result = new StringBuilder();
+    for (int input : list) {
+      try {
+        String res = applyMethod(input, true);
+        result.append("forEach" + res + ",");
+      } catch (IOException e) {
+        result.append(e + ",");
+      } catch (RuntimeException e) {
+        break;
+      }
+    }
+    return result.toString();
+  }
+
+  @Override
+  public String applyThenApplyMethod(int input) {
+    String res = timeConsumingMethod(input);
+    if (res.equals("[1]")) {
+      res = timeConsumingMethod(2);
+    }
+    return res;
+  }
+
+  @Override
+  public String applyCatchThenApplyMethod(int input) {
+    String res = null;
+    try {
+      res = applyMethod(input, true);
+    } catch (IOException e) {
+      res = applyMethod(1);
+    }
+    return res;
+  }
+
+  @Override
+  public String applyCatchFinallyMethod(
+      int input, List<String> resource) throws IOException {
+    String res = null;
+    try {
+      res = applyMethod(input, true);
+    } catch (IOException e) {
+      throw new IOException("Catch " + e.getMessage());
+    } finally {
+      resource.clear();
+    }
+    return res;
+  }
+
+  @Override
+  public String currentMethod(List<Integer> list) {
+    ExecutorService executor = getExecutorService();
+    List<Future<String>> futures = new ArrayList<>();
+    for (int input : list) {
+      Future<String> future = executor.submit(
+          () -> applyMethod(input, true));
+      futures.add(future);
+    }
+
+    StringBuilder result = new StringBuilder();
+    for (Future<String> future : futures) {
+      try {
+        String res = future.get();
+        result.append(res + ",");
+      } catch (Exception e) {
+        result.append(e.getMessage() + ",");
+      }
+    }
+    return result.toString();
+  }
+
+
+  /**
+   * Simulates a synchronous method that performs
+   * a time-consuming task and returns a result.
+   *
+   * @param input The input parameter for the method.
+   * @return A string that represents the result of the method.
+   */
+  public String timeConsumingMethod(int input) {
+    try {
+      Thread.sleep(timeConsuming);
+      return "[" + input + "]";
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return "Error:" + e.getMessage();
+    }
+  }
+
+  private ExecutorService getExecutorService() {
+    return Executors.newFixedThreadPool(2, r -> {
+      Thread t = new Thread(r);
+      t.setDaemon(true);
+      return t;
+    });
+  }
+}

+ 277 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestAsyncUtil.java

@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router.async;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The TestAsyncUtil class provides a suite of test cases for the
+ * asynchronous utility class AsyncUtil. It utilizes the JUnit testing
+ * framework to verify that asynchronous operations are performed as
+ * expected.
+ *
+ * <p>
+ * This class contains multiple test methods designed to test various
+ * asynchronous operation scenarios, including:
+ * <ul>
+ *     <li>testApply - Tests the asynchronous application of a method.</li>
+ *     <li>testApplyException - Tests exception handling in
+ *     asynchronous methods.</li>
+ *     <li>testApplyThenApplyMethod - Tests the chaining of
+ *     asynchronous method calls.</li>
+ *     <li>testCatchThenApplyMethod - Tests the invocation of
+ *     asynchronous methods after exception catching.</li>
+ *     <li>testForEach - Tests asynchronous iteration operations.</li>
+ *     <li>testForEachBreak - Tests asynchronous iteration with break
+ *     conditions.</li>
+ *     <li>testForEachBreakByException - Tests the interruption of
+ *     asynchronous iteration due to exceptions.</li>
+ * </ul>
+ * </p>
+ *
+ * The tests cover both synchronous (Sync) and asynchronous (Async)
+ * configurations to ensure consistent behavior under different
+ * execution modes.
+ *
+ * @see AsyncUtil
+ * @see BaseClass
+ * @see SyncClass
+ * @see AsyncClass
+ */
+public class TestAsyncUtil {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestAsyncUtil.class);
+  private static final long TIME_CONSUMING = 100;
+  private BaseClass baseClass;
+  private boolean enableAsync;
+
+  public enum ExecutionMode {
+    SYNC,
+    ASYNC
+  }
+
+  @Before
+  public void setUp(ExecutionMode mode) {
+    if (mode.equals(ExecutionMode.ASYNC)) {
+      baseClass = new AsyncClass(TIME_CONSUMING);
+      enableAsync = true;
+    } else {
+      baseClass = new SyncClass(TIME_CONSUMING);
+    }
+  }
+
+  @After
+  public void after() {
+    baseClass = null;
+    enableAsync = false;
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testApply(ExecutionMode mode)
+      throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.applyMethod(1);
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("applyMethod[1]", result, TIME_CONSUMING, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testApplyException(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    checkException(
+        () -> baseClass.applyMethod(2, true),
+        IOException.class, "input 2 exception");
+
+    checkException(
+        () -> baseClass.applyMethod(3, true),
+        RuntimeException.class, "input 3 exception");
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testExceptionMethod(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    checkException(
+        () -> baseClass.exceptionMethod(2),
+        IOException.class, "input 2 exception");
+
+    checkException(
+        () -> baseClass.exceptionMethod(3),
+        RuntimeException.class, "input 3 exception");
+
+    long start = Time.monotonicNow();
+    String result = baseClass.exceptionMethod(1);
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("applyMethod[1]", result, TIME_CONSUMING, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testApplyThenApplyMethod(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.applyThenApplyMethod(1);
+    long cost = Time.monotonicNow() - start;
+    checkResult("[2]", result, TIME_CONSUMING, cost);
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+
+    start = Time.monotonicNow();
+    result = baseClass.applyThenApplyMethod(3);
+    cost = Time.monotonicNow() - start;
+    checkResult("[3]", result, TIME_CONSUMING, cost);
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testCatchThenApplyMethod(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.applyCatchThenApplyMethod(2);
+    long cost = Time.monotonicNow() - start;
+    checkResult("applyMethod[1]", result, TIME_CONSUMING, cost);
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+
+    start = Time.monotonicNow();
+    result = baseClass.applyCatchThenApplyMethod(0);
+    cost = Time.monotonicNow() - start;
+    checkResult("[0]", result, TIME_CONSUMING, cost);
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testCatchFinallyMethod(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    List<String> resource = new ArrayList<>();
+    resource.add("resource1");
+    checkException(
+        () -> baseClass.applyCatchFinallyMethod(2, resource),
+        IOException.class, "input 2 exception");
+    assertTrue(resource.size() == 0);
+
+    long start = Time.monotonicNow();
+    String result = baseClass.applyCatchFinallyMethod(0, resource);
+    long cost = Time.monotonicNow() - start;
+    checkResult("[0]", result, TIME_CONSUMING, cost);
+    assertTrue(resource.size() == 0);
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testForEach(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.forEachMethod(Arrays.asList(1, 2, 3));
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("forEach[1],forEach[2],forEach[3],", result,
+        TIME_CONSUMING, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testForEachBreak(ExecutionMode mode) throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.forEachBreakMethod(Arrays.asList(1, 2, 3));
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("forEach[1],", result, TIME_CONSUMING, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testForEachBreakByException(ExecutionMode mode)
+      throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.forEachBreakByExceptionMethod(Arrays.asList(1, 2, 3));
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("forEach[1],java.io.IOException: input 2 exception,",
+        result, TIME_CONSUMING, cost);
+  }
+
+  @EnumSource(ExecutionMode.class)
+  @ParameterizedTest
+  public void testCurrentMethod(ExecutionMode mode)
+      throws Exception {
+    setUp(mode);
+    long start = Time.monotonicNow();
+    String result = baseClass.currentMethod(Arrays.asList(1, 2, 3));
+    long cost = Time.monotonicNow() - start;
+    LOG.info("[{}] main thread cost: {} ms", mode, cost);
+    checkResult("[1],java.io.IOException: input 2 exception," +
+            "java.lang.RuntimeException: input 3 exception,",
+        result, TIME_CONSUMING, cost);
+  }
+
+  private void checkResult(
+      String result, String actualResult, long cost, long actualCost)
+      throws Exception {
+    if (enableAsync) {
+      Assertions.assertNull(actualResult);
+      actualResult = AsyncUtil.syncReturn(String.class);
+      assertNotNull(actualResult);
+      assertTrue(actualCost < cost);
+    } else {
+      assertFalse(actualCost < cost);
+    }
+    assertEquals(result, actualResult);
+  }
+
+  private < E extends Throwable> void checkException(
+      Callable<String> eval, Class<E> clazz, String contained) throws Exception {
+    if (enableAsync) {
+      LambdaTestUtils.intercept(clazz, contained,
+          () -> {
+            eval.call();
+            return AsyncUtil.syncReturn(String.class);
+          });
+    } else {
+      LambdaTestUtils.intercept(clazz, contained, () -> {
+        String res = eval.call();
+        return res;
+      });
+    }
+  }
+}