|
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -365,7 +366,7 @@ public class Balancer {
|
|
|
|
|
|
sendRequest(out);
|
|
|
receiveResponse(in);
|
|
|
- bytesMoved.inc(block.getNumBytes());
|
|
|
+ bytesMoved.addAndGet(block.getNumBytes());
|
|
|
LOG.info("Successfully moved " + this);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Failed to move " + this + ": " + e.getMessage());
|
|
@@ -1111,17 +1112,7 @@ public class Balancer {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- private static class BytesMoved {
|
|
|
- private long bytesMoved = 0L;;
|
|
|
- private synchronized void inc( long bytes ) {
|
|
|
- bytesMoved += bytes;
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized long get() {
|
|
|
- return bytesMoved;
|
|
|
- }
|
|
|
- };
|
|
|
- private final BytesMoved bytesMoved = new BytesMoved();
|
|
|
+ private final AtomicLong bytesMoved = new AtomicLong();
|
|
|
|
|
|
/* Start a thread to dispatch block moves for each source.
|
|
|
* The thread selects blocks to move & sends request to proxy source to
|