Browse Source

Revert "HDDS-692. Use the ProgressBar class in the RandomKeyGenerator freon test. Contributed by Zsolt Horvath."

This reverts commit 2a7f4859912e83910f9a418f69ce6d4bd4a37815.
Márton Elek 6 years ago
parent
commit
c946f1b121

+ 145 - 82
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ProgressBar.java

@@ -14,134 +14,197 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.ozone.freon;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
- * Creates and runs a ProgressBar in new Thread which gets printed on
- * the provided PrintStream.
+ * Run an arbitrary code and print progress on the provided stream. The
+ * progressbar stops when: - the provided currentvalue is less the the maxvalue
+ * - exception thrown
  */
 public class ProgressBar {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ProgressBar.class);
   private static final long REFRESH_INTERVAL = 1000L;
 
-  private final long maxValue;
-  private final Supplier<Long> currentValue;
-  private final Thread progressBar;
-
-  private volatile boolean running;
-
-  private volatile long startTime;
+  private PrintStream stream;
+  private AtomicLong currentValue;
+  private long maxValue;
+  private Thread progressBar;
+  private volatile boolean exception = false;
+  private long startTime;
 
   /**
-   * Creates a new ProgressBar instance which prints the progress on the given
-   * PrintStream when started.
-   *
-   * @param stream to display the progress
+   * @param stream Used to display the progress
    * @param maxValue Maximum value of the progress
-   * @param currentValue Supplier that provides the current value
    */
-  public ProgressBar(final PrintStream stream, final Long maxValue,
-                     final Supplier<Long> currentValue) {
+  ProgressBar(PrintStream stream, long maxValue) {
+    this.stream = stream;
     this.maxValue = maxValue;
-    this.currentValue = currentValue;
-    this.progressBar = new Thread(getProgressBar(stream));
-    this.running = false;
+    this.currentValue = new AtomicLong(0);
+    this.progressBar = new Thread(new ProgressBarThread());
   }
 
   /**
-   * Starts the ProgressBar in a new Thread.
-   * This is a non blocking call.
+   * Start a task with a progessbar without any in/out parameters Runnable used
+   * just a task wrapper.
+   *
+   * @param task Runnable
    */
-  public synchronized void start() {
-    if (!running) {
-      running = true;
-      startTime = System.nanoTime();
+  public void start(Runnable task) {
+
+    startTime = System.nanoTime();
+
+    try {
+
       progressBar.start();
+      task.run();
+
+    } catch (Exception e) {
+      exception = true;
+    } finally {
+
+      try {
+        progressBar.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
     }
   }
 
   /**
-   * Graceful shutdown, waits for the progress bar to complete.
-   * This is a blocking call.
+   * Start a task with only out parameters.
+   *
+   * @param task Supplier that represents the task
+   * @param <T> Generic return type
+   * @return Whatever the supllier produces
    */
-  public synchronized void shutdown() {
-    if (running) {
+  public <T> T start(Supplier<T> task) {
+
+    startTime = System.nanoTime();
+    T result = null;
+
+    try {
+
+      progressBar.start();
+      result = task.get();
+
+    } catch (Exception e) {
+      exception = true;
+    } finally {
+
       try {
         progressBar.join();
-        running = false;
       } catch (InterruptedException e) {
-        LOG.warn("Got interrupted while waiting for the progress bar to " +
-                "complete.");
+        e.printStackTrace();
       }
+
+      return result;
     }
   }
 
   /**
-   * Terminates the progress bar. This doesn't wait for the progress bar
-   * to complete.
+   * Start a task with in/out parameters.
+   *
+   * @param input Input of the function
+   * @param task A Function that does the task
+   * @param <T> type of the input
+   * @param <R> return type
+   * @return Whatever the Function returns
    */
-  public synchronized void terminate() {
-    if (running) {
+  public <T, R> R start(T input, Function<T, R> task) {
+
+    startTime = System.nanoTime();
+    R result = null;
+
+    try {
+
+      progressBar.start();
+      result = task.apply(input);
+
+    } catch (Exception e) {
+      exception = true;
+      throw e;
+    } finally {
+
       try {
-        running = false;
         progressBar.join();
       } catch (InterruptedException e) {
-        LOG.warn("Got interrupted while waiting for the progress bar to " +
-                "complete.");
+        e.printStackTrace();
       }
+
+      return result;
     }
   }
 
-  private Runnable getProgressBar(final PrintStream stream) {
-    return () -> {
-      stream.println();
-      while (running && currentValue.get() < maxValue) {
-        print(stream, currentValue.get());
-        try {
+  /**
+   * Increment the progress with one step.
+   */
+  public void incrementProgress() {
+    currentValue.incrementAndGet();
+  }
+
+  private class ProgressBarThread implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+
+        stream.println();
+        long value;
+
+        while ((value = currentValue.get()) < maxValue) {
+          print(value);
+
+          if (exception) {
+            break;
+          }
           Thread.sleep(REFRESH_INTERVAL);
-        } catch (InterruptedException e) {
-          LOG.warn("ProgressBar was interrupted.");
         }
-      }
-      print(stream, maxValue);
-      stream.println();
-      running = false;
-    };
-  }
 
-  /**
-   * Given current value prints the progress bar.
-   *
-   * @param value current progress position
-   */
-  private void print(final PrintStream stream, final long value) {
-    stream.print('\r');
-    double percent = 100.0 * value / maxValue;
-    StringBuilder sb = new StringBuilder();
-    sb.append(" " + String.format("%.2f", percent) + "% |");
-
-    for (int i = 0; i <= percent; i++) {
-      sb.append('█');
+        if (exception) {
+          stream.println();
+          stream.println("Incomplete termination, " + "check log for " +
+              "exception.");
+        } else {
+          print(maxValue);
+        }
+        stream.println();
+      } catch (InterruptedException e) {
+        stream.println(e);
+      }
     }
-    for (int j = 0; j < 100 - percent; j++) {
-      sb.append(' ');
+
+    /**
+     * Given current value prints the progress bar.
+     *
+     * @param value current progress position
+     */
+    private void print(long value) {
+      stream.print('\r');
+      double percent = 100.0 * value / maxValue;
+      StringBuilder sb = new StringBuilder();
+      sb.append(" " + String.format("%.2f", percent) + "% |");
+
+      for (int i = 0; i <= percent; i++) {
+        sb.append('█');
+      }
+      for (int j = 0; j < 100 - percent; j++) {
+        sb.append(' ');
+      }
+      sb.append("|  ");
+      sb.append(value + "/" + maxValue);
+      long timeInSec = TimeUnit.SECONDS.convert(
+          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
+          (timeInSec % 3600) / 60, timeInSec % 60);
+      sb.append(" Time: " + timeToPrint);
+      stream.print(sb.toString());
     }
-    sb.append("|  ");
-    sb.append(value + "/" + maxValue);
-    long timeInSec = TimeUnit.SECONDS.convert(
-            System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-    String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
-            (timeInSec % 3600) / 60, timeInSec % 60);
-    sb.append(" Time: " + timeToPrint);
-    stream.print(sb.toString());
   }
-}
+
+}

+ 70 - 17
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -185,7 +185,6 @@ public final class RandomKeyGenerator implements Callable<Void> {
   private ArrayList<Histogram> histograms = new ArrayList<>();
 
   private OzoneConfiguration ozoneConfiguration;
-  private ProgressBar progressbar;
 
   RandomKeyGenerator() {
   }
@@ -252,26 +251,13 @@ public final class RandomKeyGenerator implements Callable<Void> {
       validator.start();
       LOG.info("Data validation is enabled.");
     }
-
-    Supplier<Long> currentValue;
-    long maxValue;
-
-    currentValue = () -> numberOfKeysAdded.get();
-    maxValue = numOfVolumes *
-            numOfBuckets *
-            numOfKeys;
-
-    progressbar = new ProgressBar(System.out, maxValue, currentValue);
-
+    Thread progressbar = getProgressBarThread();
     LOG.info("Starting progress bar Thread.");
-
     progressbar.start();
-
     processor.shutdown();
     processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
-
-    progressbar.shutdown();
-
+    completed = true;
+    progressbar.join();
     if (validateWrites) {
       validator.join();
     }
@@ -910,6 +896,73 @@ public final class RandomKeyGenerator implements Callable<Void> {
     }
   }
 
+  private class ProgressBar implements Runnable {
+
+    private static final long REFRESH_INTERVAL = 1000L;
+
+    private PrintStream stream;
+    private Supplier<Long> currentValue;
+    private long maxValue;
+
+    ProgressBar(PrintStream stream, Supplier<Long> currentValue,
+        long maxValue) {
+      this.stream = stream;
+      this.currentValue = currentValue;
+      this.maxValue = maxValue;
+    }
+
+    @Override
+    public void run() {
+      try {
+        stream.println();
+        long value;
+        while ((value = currentValue.get()) < maxValue) {
+          print(value);
+          if (completed) {
+            break;
+          }
+          Thread.sleep(REFRESH_INTERVAL);
+        }
+        if (exception) {
+          stream.println();
+          stream.println("Incomplete termination, " +
+              "check log for exception.");
+        } else {
+          print(maxValue);
+        }
+        stream.println();
+      } catch (InterruptedException e) {
+      }
+    }
+
+    /**
+     * Given current value prints the progress bar.
+     *
+     * @param value
+     */
+    private void print(long value) {
+      stream.print('\r');
+      double percent = 100.0 * value / maxValue;
+      StringBuilder sb = new StringBuilder();
+      sb.append(" " + String.format("%.2f", percent) + "% |");
+
+      for (int i = 0; i <= percent; i++) {
+        sb.append('█');
+      }
+      for (int j = 0; j < 100 - percent; j++) {
+        sb.append(' ');
+      }
+      sb.append("|  ");
+      sb.append(value + "/" + maxValue);
+      long timeInSec = TimeUnit.SECONDS.convert(
+          System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
+          (timeInSec % 3600) / 60, timeInSec % 60);
+      sb.append(" Time: " + timeToPrint);
+      stream.print(sb);
+    }
+  }
+
   /**
    * Validates the write done in ozone cluster.
    */

+ 55 - 16
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestProgressBar.java

@@ -22,15 +22,12 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.PrintStream;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.LongStream;
+import java.util.stream.IntStream;
 
 import static org.mockito.Mockito.*;
 
-/**
- * Using Mockito runner.
- */
 @RunWith(MockitoJUnitRunner.class)
 /**
  * Tests for the Progressbar class for Freon.
@@ -38,36 +35,78 @@ import static org.mockito.Mockito.*;
 public class TestProgressBar {
 
   private PrintStream stream;
-  private AtomicLong numberOfKeysAdded;
-  private Supplier<Long> currentValue;
 
   @Before
   public void setupMock() {
-    numberOfKeysAdded = new AtomicLong(0L);
-    currentValue = () -> numberOfKeysAdded.get();
     stream = mock(PrintStream.class);
   }
 
   @Test
   public void testWithRunnable() {
 
-    Long maxValue = 10L;
+    int maxValue = 10;
 
-    ProgressBar progressbar = new ProgressBar(stream, maxValue, currentValue);
+    ProgressBar progressbar = new ProgressBar(stream, maxValue);
 
     Runnable task = () -> {
-      LongStream.range(0, maxValue).forEach(
+      IntStream.range(0, maxValue).forEach(
           counter -> {
-            numberOfKeysAdded.getAndIncrement();
+            progressbar.incrementProgress();
           }
       );
     };
 
-    progressbar.start();
-    task.run();
-    progressbar.shutdown();
+    progressbar.start(task);
+
+    verify(stream, atLeastOnce()).print(anyChar());
+    verify(stream, atLeastOnce()).print(anyString());
+  }
+
+  @Test
+  public void testWithSupplier() {
+
+    int maxValue = 10;
+
+    ProgressBar progressbar = new ProgressBar(stream, maxValue);
+
+    Supplier<Long> tasks = () -> {
+      IntStream.range(0, maxValue).forEach(
+          counter -> {
+            progressbar.incrementProgress();
+          }
+      );
+
+      return 1L; //return the result of the dummy task
+    };
+
+    progressbar.start(tasks);
 
     verify(stream, atLeastOnce()).print(anyChar());
     verify(stream, atLeastOnce()).print(anyString());
   }
+
+  @Test
+  public void testWithFunction() {
+
+    int maxValue = 10;
+    Long result;
+
+    ProgressBar progressbar = new ProgressBar(stream, maxValue);
+
+    Function<Long, String> task = (Long l) -> {
+      IntStream.range(0, maxValue).forEach(
+          counter -> {
+            progressbar.incrementProgress();
+          }
+      );
+
+      return "dummy result"; //return the result of the dummy task
+    };
+
+    progressbar.start(1L, task);
+
+    verify(stream, atLeastOnce()).print(anyChar());
+    verify(stream, atLeastOnce()).print(anyString());
+  }
+
 }