浏览代码

HDDS-692. Use the ProgressBar class in the RandomKeyGenerator freon test. Contributed by Zsolt Horvath.

Márton Elek 6 年之前
父节点
当前提交
4ecdcc9620

+ 82 - 145
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java

@@ -14,197 +14,134 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
+
 package org.apache.hadoop.ozone.freon;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
- * Run an arbitrary code and print progress on the provided stream. The
- * progressbar stops when: - the provided currentvalue is less the the maxvalue
- * - exception thrown
+ * Creates and runs a ProgressBar in new Thread which gets printed on
+ * the provided PrintStream.
  */
 public class ProgressBar {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ProgressBar.class);
   private static final long REFRESH_INTERVAL = 1000L;
 
-  private PrintStream stream;
-  private AtomicLong currentValue;
-  private long maxValue;
-  private Thread progressBar;
-  private volatile boolean exception = false;
-  private long startTime;
+  private final long maxValue;
+  private final Supplier<Long> currentValue;
+  private final Thread progressBar;
+
+  private volatile boolean running;
+
+  private volatile long startTime;
 
   /**
-   * @param stream Used to display the progress
+   * Creates a new ProgressBar instance which prints the progress on the given
+   * PrintStream when started.
+   *
+   * @param stream to display the progress
    * @param maxValue Maximum value of the progress
+   * @param currentValue Supplier that provides the current value
    */
-  ProgressBar(PrintStream stream, long maxValue) {
-    this.stream = stream;
+  public ProgressBar(final PrintStream stream, final Long maxValue,
+                     final Supplier<Long> currentValue) {
     this.maxValue = maxValue;
-    this.currentValue = new AtomicLong(0);
-    this.progressBar = new Thread(new ProgressBarThread());
+    this.currentValue = currentValue;
+    this.progressBar = new Thread(getProgressBar(stream));
+    this.running = false;
   }
 
   /**
-   * Start a task with a progessbar without any in/out parameters Runnable used
-   * just a task wrapper.
-   *
-   * @param task Runnable
+   * Starts the ProgressBar in a new Thread.
+   * This is a non blocking call.
    */
-  public void start(Runnable task) {
-
-    startTime = System.nanoTime();
-
-    try {
-
+  public synchronized void start() {
+    if (!running) {
+      running = true;
+      startTime = System.nanoTime();
       progressBar.start();
-      task.run();
-
-    } catch (Exception e) {
-      exception = true;
-    } finally {
-
-      try {
-        progressBar.join();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
     }
   }
 
   /**
-   * Start a task with only out parameters.
-   *
-   * @param task Supplier that represents the task
-   * @param <T> Generic return type
-   * @return Whatever the supllier produces
+   * Graceful shutdown, waits for the progress bar to complete.
+   * This is a blocking call.
    */
-  public <T> T start(Supplier<T> task) {
-
-    startTime = System.nanoTime();
-    T result = null;
-
-    try {
-
-      progressBar.start();
-      result = task.get();
-
-    } catch (Exception e) {
-      exception = true;
-    } finally {
-
+  public synchronized void shutdown() {
+    if (running) {
       try {
         progressBar.join();
+        running = false;
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOG.warn("Got interrupted while waiting for the progress bar to " +
+                "complete.");
       }
-
-      return result;
     }
   }
 
   /**
-   * Start a task with in/out parameters.
-   *
-   * @param input Input of the function
-   * @param task A Function that does the task
-   * @param <T> type of the input
-   * @param <R> return type
-   * @return Whatever the Function returns
+   * Terminates the progress bar. This doesn't wait for the progress bar
+   * to complete.
    */
-  public <T, R> R start(T input, Function<T, R> task) {
-
-    startTime = System.nanoTime();
-    R result = null;
-
-    try {
-
-      progressBar.start();
-      result = task.apply(input);
-
-    } catch (Exception e) {
-      exception = true;
-      throw e;
-    } finally {
-
+  public synchronized void terminate() {
+    if (running) {
       try {
+        running = false;
         progressBar.join();
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOG.warn("Got interrupted while waiting for the progress bar to " +
+                "complete.");
       }
-
-      return result;
     }
   }
 
-  /**
-   * Increment the progress with one step.
-   */
-  public void incrementProgress() {
-    currentValue.incrementAndGet();
-  }
-
-  private class ProgressBarThread implements Runnable {
-
-    @Override
-    public void run() {
-      try {
-
-        stream.println();
-        long value;
-
-        while ((value = currentValue.get()) < maxValue) {
-          print(value);
-
-          if (exception) {
-            break;
-          }
+  private Runnable getProgressBar(final PrintStream stream) {
+    return () -> {
+      stream.println();
+      while (running && currentValue.get() < maxValue) {
+        print(stream, currentValue.get());
+        try {
           Thread.sleep(REFRESH_INTERVAL);
+        } catch (InterruptedException e) {
+          LOG.warn("ProgressBar was interrupted.");
         }
-
-        if (exception) {
-          stream.println();
-          stream.println("Incomplete termination, " + "check log for " +
-              "exception.");
-        } else {
-          print(maxValue);
-        }
-        stream.println();
-      } catch (InterruptedException e) {
-        stream.println(e);
       }
-    }
-
-    /**
-     * Given current value prints the progress bar.
-     *
-     * @param value current progress position
-     */
-    private void print(long value) {
-      stream.print('\r');
-      double percent = 100.0 * value / maxValue;
-      StringBuilder sb = new StringBuilder();
-      sb.append(" " + String.format("%.2f", percent) + "% |");
+      print(stream, maxValue);
+      stream.println();
+      running = false;
+    };
+  }
 
-      for (int i = 0; i <= percent; i++) {
-        sb.append('█');
-      }
-      for (int j = 0; j < 100 - percent; j++) {
-        sb.append(' ');
-      }
-      sb.append("|  ");
-      sb.append(value + "/" + maxValue);
-      long timeInSec = TimeUnit.SECONDS.convert(
-          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-      String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
-          (timeInSec % 3600) / 60, timeInSec % 60);
-      sb.append(" Time: " + timeToPrint);
-      stream.print(sb.toString());
+  /**
+   * Given current value prints the progress bar.
+   *
+   * @param value current progress position
+   */
+  private void print(final PrintStream stream, final long value) {
+    stream.print('\r');
+    double percent = 100.0 * value / maxValue;
+    StringBuilder sb = new StringBuilder();
+    sb.append(" " + String.format("%.2f", percent) + "% |");
+
+    for (int i = 0; i <= percent; i++) {
+      sb.append('█');
+    }
+    for (int j = 0; j < 100 - percent; j++) {
+      sb.append(' ');
     }
+    sb.append("|  ");
+    sb.append(value + "/" + maxValue);
+    long timeInSec = TimeUnit.SECONDS.convert(
+            System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+    String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
+            (timeInSec % 3600) / 60, timeInSec % 60);
+    sb.append(" Time: " + timeToPrint);
+    stream.print(sb.toString());
   }
-
-}
+}

+ 17 - 86
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -185,6 +185,7 @@ public final class RandomKeyGenerator implements Callable<Void> {
   private ArrayList<Histogram> histograms = new ArrayList<>();
 
   private OzoneConfiguration ozoneConfiguration;
+  private ProgressBar progressbar;
 
   RandomKeyGenerator() {
   }
@@ -251,13 +252,26 @@ public final class RandomKeyGenerator implements Callable<Void> {
       validator.start();
       LOG.info("Data validation is enabled.");
     }
-    Thread progressbar = getProgressBarThread();
+
+    Supplier<Long> currentValue;
+    long maxValue;
+
+    currentValue = () -> numberOfKeysAdded.get();
+    maxValue = numOfVolumes *
+            numOfBuckets *
+            numOfKeys;
+
+    progressbar = new ProgressBar(System.out, maxValue, currentValue);
+
     LOG.info("Starting progress bar Thread.");
+
     progressbar.start();
+
     processor.shutdown();
     processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
-    completed = true;
-    progressbar.join();
+
+    progressbar.shutdown();
+
     if (validateWrites) {
       validator.join();
     }
@@ -280,22 +294,6 @@ public final class RandomKeyGenerator implements Callable<Void> {
     Runtime.getRuntime().addShutdownHook(
         new Thread(() -> printStats(System.out)));
   }
-
-  private Thread getProgressBarThread() {
-    Supplier<Long> currentValue;
-    long maxValue;
-
-    currentValue = () -> numberOfKeysAdded.get();
-    maxValue = (long) numOfVolumes *
-        numOfBuckets *
-        numOfKeys;
-
-    Thread progressBarThread = new Thread(
-        new ProgressBar(System.out, currentValue, maxValue));
-    progressBarThread.setName("ProgressBar");
-    return progressBarThread;
-  }
-
   /**
    * Prints stats of {@link Freon} run to the PrintStream.
    *
@@ -896,73 +894,6 @@ public final class RandomKeyGenerator implements Callable<Void> {
     }
   }
 
-  private class ProgressBar implements Runnable {
-
-    private static final long REFRESH_INTERVAL = 1000L;
-
-    private PrintStream stream;
-    private Supplier<Long> currentValue;
-    private long maxValue;
-
-    ProgressBar(PrintStream stream, Supplier<Long> currentValue,
-        long maxValue) {
-      this.stream = stream;
-      this.currentValue = currentValue;
-      this.maxValue = maxValue;
-    }
-
-    @Override
-    public void run() {
-      try {
-        stream.println();
-        long value;
-        while ((value = currentValue.get()) < maxValue) {
-          print(value);
-          if (completed) {
-            break;
-          }
-          Thread.sleep(REFRESH_INTERVAL);
-        }
-        if (exception) {
-          stream.println();
-          stream.println("Incomplete termination, " +
-              "check log for exception.");
-        } else {
-          print(maxValue);
-        }
-        stream.println();
-      } catch (InterruptedException e) {
-      }
-    }
-
-    /**
-     * Given current value prints the progress bar.
-     *
-     * @param value
-     */
-    private void print(long value) {
-      stream.print('\r');
-      double percent = 100.0 * value / maxValue;
-      StringBuilder sb = new StringBuilder();
-      sb.append(" " + String.format("%.2f", percent) + "% |");
-
-      for (int i = 0; i <= percent; i++) {
-        sb.append('█');
-      }
-      for (int j = 0; j < 100 - percent; j++) {
-        sb.append(' ');
-      }
-      sb.append("|  ");
-      sb.append(value + "/" + maxValue);
-      long timeInSec = TimeUnit.SECONDS.convert(
-          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-      String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
-          (timeInSec % 3600) / 60, timeInSec % 60);
-      sb.append(" Time: " + timeToPrint);
-      stream.print(sb);
-    }
-  }
-
   /**
    * Validates the write done in ozone cluster.
    */

+ 16 - 55
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java

@@ -22,12 +22,15 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.PrintStream;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
-import java.util.stream.IntStream;
+import java.util.stream.LongStream;
 
 import static org.mockito.Mockito.*;
 
+/**
+ * Using Mockito runner.
+ */
 @RunWith(MockitoJUnitRunner.class)
 /**
  * Tests for the Progressbar class for Freon.
@@ -35,78 +38,36 @@ import static org.mockito.Mockito.*;
 public class TestProgressBar {
 
   private PrintStream stream;
+  private AtomicLong numberOfKeysAdded;
+  private Supplier<Long> currentValue;
 
   @Before
   public void setupMock() {
+    numberOfKeysAdded = new AtomicLong(0L);
+    currentValue = () -> numberOfKeysAdded.get();
     stream = mock(PrintStream.class);
   }
 
   @Test
   public void testWithRunnable() {
 
-    int maxValue = 10;
+    Long maxValue = 10L;
 
-    ProgressBar progressbar = new ProgressBar(stream, maxValue);
+    ProgressBar progressbar = new ProgressBar(stream, maxValue, currentValue);
 
     Runnable task = () -> {
-      IntStream.range(0, maxValue).forEach(
+      LongStream.range(0, maxValue).forEach(
           counter -> {
-            progressbar.incrementProgress();
+            numberOfKeysAdded.getAndIncrement();
           }
       );
     };
 
-    progressbar.start(task);
-
-    verify(stream, atLeastOnce()).print(anyChar());
-    verify(stream, atLeastOnce()).print(anyString());
-  }
-
-  @Test
-  public void testWithSupplier() {
-
-    int maxValue = 10;
-
-    ProgressBar progressbar = new ProgressBar(stream, maxValue);
-
-    Supplier<Long> tasks = () -> {
-      IntStream.range(0, maxValue).forEach(
-          counter -> {
-            progressbar.incrementProgress();
-          }
-      );
-
-      return 1L; //return the result of the dummy task
-    };
-
-    progressbar.start(tasks);
+    progressbar.start();
+    task.run();
+    progressbar.shutdown();
 
     verify(stream, atLeastOnce()).print(anyChar());
     verify(stream, atLeastOnce()).print(anyString());
   }
-
-  @Test
-  public void testWithFunction() {
-
-    int maxValue = 10;
-    Long result;
-
-    ProgressBar progressbar = new ProgressBar(stream, maxValue);
-
-    Function<Long, String> task = (Long l) -> {
-      IntStream.range(0, maxValue).forEach(
-          counter -> {
-            progressbar.incrementProgress();
-          }
-      );
-
-      return "dummy result"; //return the result of the dummy task
-    };
-
-    progressbar.start(1L, task);
-
-    verify(stream, atLeastOnce()).print(anyChar());
-    verify(stream, atLeastOnce()).print(anyString());
-  }
-
 }