|
@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
|
|
|
|
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URL;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
@@ -65,7 +68,6 @@ public class StandbyCheckpointer {
|
|
|
private final Configuration conf;
|
|
|
private final FSNamesystem namesystem;
|
|
|
private long lastCheckpointTime;
|
|
|
- private long lastUploadTime;
|
|
|
private final CheckpointerThread thread;
|
|
|
private final ThreadFactory uploadThreadFactory;
|
|
|
private List<URL> activeNNAddresses;
|
|
@@ -73,12 +75,14 @@ public class StandbyCheckpointer {
|
|
|
|
|
|
private final Object cancelLock = new Object();
|
|
|
private Canceler canceler;
|
|
|
- private boolean isPrimaryCheckPointer = true;
|
|
|
|
|
|
// Keep track of how many checkpoints were canceled.
|
|
|
// This is for use in tests.
|
|
|
private static int canceledCount = 0;
|
|
|
-
|
|
|
+
|
|
|
+ // A map from NN url to the most recent image upload time.
|
|
|
+ private final HashMap<String, CheckpointReceiverEntry> checkpointReceivers;
|
|
|
+
|
|
|
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
|
|
|
throws IOException {
|
|
|
this.namesystem = ns;
|
|
@@ -89,6 +93,37 @@ public class StandbyCheckpointer {
|
|
|
.setNameFormat("TransferFsImageUpload-%d").build();
|
|
|
|
|
|
setNameNodeAddresses(conf);
|
|
|
+ this.checkpointReceivers = new HashMap<>();
|
|
|
+ for (URL address : activeNNAddresses) {
|
|
|
+ this.checkpointReceivers.put(address.toString(),
|
|
|
+ new CheckpointReceiverEntry());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final class CheckpointReceiverEntry {
|
|
|
+ private long lastUploadTime;
|
|
|
+ private boolean isPrimary;
|
|
|
+
|
|
|
+ CheckpointReceiverEntry() {
|
|
|
+ this.lastUploadTime = 0L;
|
|
|
+ this.isPrimary = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setLastUploadTime(long lastUploadTime) {
|
|
|
+ this.lastUploadTime = lastUploadTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setIsPrimary(boolean isPrimaryFor) {
|
|
|
+ this.isPrimary = isPrimaryFor;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getLastUploadTime() {
|
|
|
+ return lastUploadTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isPrimary() {
|
|
|
+ return isPrimary;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -156,7 +191,7 @@ public class StandbyCheckpointer {
|
|
|
thread.interrupt();
|
|
|
}
|
|
|
|
|
|
- private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
|
|
|
+ private void doCheckpoint() throws InterruptedException, IOException {
|
|
|
assert canceler != null;
|
|
|
final long txid;
|
|
|
final NameNodeFile imageType;
|
|
@@ -208,11 +243,6 @@ public class StandbyCheckpointer {
|
|
|
namesystem.cpUnlock();
|
|
|
}
|
|
|
|
|
|
- //early exit if we shouldn't actually send the checkpoint to the ANN
|
|
|
- if(!sendCheckpoint){
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
// Upload the saved checkpoint back to the active
|
|
|
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
|
|
|
// than the expected number of tasks to run or queue up
|
|
@@ -222,54 +252,67 @@ public class StandbyCheckpointer {
|
|
|
uploadThreadFactory);
|
|
|
// for right now, just match the upload to the nn address by convention. There is no need to
|
|
|
// directly tie them together by adding a pair class.
|
|
|
- List<Future<TransferFsImage.TransferResult>> uploads =
|
|
|
- new ArrayList<Future<TransferFsImage.TransferResult>>();
|
|
|
+ HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
|
|
|
+ new HashMap<>();
|
|
|
for (final URL activeNNAddress : activeNNAddresses) {
|
|
|
- Future<TransferFsImage.TransferResult> upload =
|
|
|
- executor.submit(new Callable<TransferFsImage.TransferResult>() {
|
|
|
- @Override
|
|
|
- public TransferFsImage.TransferResult call() throws IOException {
|
|
|
- return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
|
|
|
- .getFSImage().getStorage(), imageType, txid, canceler);
|
|
|
- }
|
|
|
- });
|
|
|
- uploads.add(upload);
|
|
|
+ // Upload image if at least 1 of 2 following conditions met:
|
|
|
+ // 1. has been quiet for long enough, try to contact the node.
|
|
|
+ // 2. this standby IS the primary checkpointer of target NN.
|
|
|
+ String addressString = activeNNAddress.toString();
|
|
|
+ assert checkpointReceivers.containsKey(addressString);
|
|
|
+ CheckpointReceiverEntry receiverEntry =
|
|
|
+ checkpointReceivers.get(addressString);
|
|
|
+ long secsSinceLastUpload =
|
|
|
+ TimeUnit.MILLISECONDS.toSeconds(
|
|
|
+ monotonicNow() - receiverEntry.getLastUploadTime());
|
|
|
+ boolean shouldUpload = receiverEntry.isPrimary() ||
|
|
|
+ secsSinceLastUpload >= checkpointConf.getQuietPeriod();
|
|
|
+ if (shouldUpload) {
|
|
|
+ Future<TransferFsImage.TransferResult> upload =
|
|
|
+ executor.submit(new Callable<TransferFsImage.TransferResult>() {
|
|
|
+ @Override
|
|
|
+ public TransferFsImage.TransferResult call() throws IOException {
|
|
|
+ return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
|
|
|
+ .getFSImage().getStorage(), imageType, txid, canceler);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ uploads.put(addressString, upload);
|
|
|
+ }
|
|
|
}
|
|
|
InterruptedException ie = null;
|
|
|
- IOException ioe= null;
|
|
|
- int i = 0;
|
|
|
- boolean success = false;
|
|
|
- for (; i < uploads.size(); i++) {
|
|
|
- Future<TransferFsImage.TransferResult> upload = uploads.get(i);
|
|
|
+ List<IOException> ioes = Lists.newArrayList();
|
|
|
+ for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
|
|
|
+ uploads.entrySet()) {
|
|
|
+ String url = entry.getKey();
|
|
|
+ Future<TransferFsImage.TransferResult> upload = entry.getValue();
|
|
|
try {
|
|
|
- // TODO should there be some smarts here about retries nodes that are not the active NN?
|
|
|
+ // TODO should there be some smarts here about retries nodes that
|
|
|
+ // are not the active NN?
|
|
|
+ CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
|
|
|
if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
|
|
|
- success = true;
|
|
|
- //avoid getting the rest of the results - we don't care since we had a successful upload
|
|
|
- break;
|
|
|
+ receiverEntry.setLastUploadTime(monotonicNow());
|
|
|
+ receiverEntry.setIsPrimary(true);
|
|
|
+ } else {
|
|
|
+ receiverEntry.setIsPrimary(false);
|
|
|
}
|
|
|
-
|
|
|
} catch (ExecutionException e) {
|
|
|
- ioe = new IOException("Exception during image upload: " + e.getMessage(),
|
|
|
- e.getCause());
|
|
|
- break;
|
|
|
+ // Even if exception happens, still proceeds to next NN url.
|
|
|
+ // so that fail to upload to previous NN does not cause the
|
|
|
+ // remaining NN not getting the fsImage.
|
|
|
+ ioes.add(new IOException("Exception during image upload", e));
|
|
|
} catch (InterruptedException e) {
|
|
|
ie = e;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- lastUploadTime = monotonicNow();
|
|
|
-
|
|
|
- // we are primary if we successfully updated the ANN
|
|
|
- this.isPrimaryCheckPointer = success;
|
|
|
-
|
|
|
// cleaner than copying code for multiple catch statements and better than catching all
|
|
|
// exceptions, so we just handle the ones we expect.
|
|
|
- if (ie != null || ioe != null) {
|
|
|
+ if (ie != null) {
|
|
|
|
|
|
// cancel the rest of the tasks, and close the pool
|
|
|
- for (; i < uploads.size(); i++) {
|
|
|
- Future<TransferFsImage.TransferResult> upload = uploads.get(i);
|
|
|
+ for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
|
|
|
+ uploads.entrySet()) {
|
|
|
+ Future<TransferFsImage.TransferResult> upload = entry.getValue();
|
|
|
// The background thread may be blocked waiting in the throttler, so
|
|
|
// interrupt it.
|
|
|
upload.cancel(true);
|
|
@@ -282,11 +325,11 @@ public class StandbyCheckpointer {
|
|
|
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
// re-throw the exception we got, since one of these two must be non-null
|
|
|
- if (ie != null) {
|
|
|
- throw ie;
|
|
|
- } else if (ioe != null) {
|
|
|
- throw ioe;
|
|
|
- }
|
|
|
+ throw ie;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!ioes.isEmpty()) {
|
|
|
+ throw MultipleIOException.createIOException(ioes);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -369,7 +412,6 @@ public class StandbyCheckpointer {
|
|
|
// Reset checkpoint time so that we don't always checkpoint
|
|
|
// on startup.
|
|
|
lastCheckpointTime = monotonicNow();
|
|
|
- lastUploadTime = monotonicNow();
|
|
|
while (shouldRun) {
|
|
|
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
|
|
if (!needRollbackCheckpoint) {
|
|
@@ -422,10 +464,7 @@ public class StandbyCheckpointer {
|
|
|
|
|
|
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
|
|
|
// rollback request, are the checkpointer, are outside the quiet period.
|
|
|
- final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
|
|
|
- boolean sendRequest = isPrimaryCheckPointer
|
|
|
- || secsSinceLastUpload >= checkpointConf.getQuietPeriod();
|
|
|
- doCheckpoint(sendRequest);
|
|
|
+ doCheckpoint();
|
|
|
|
|
|
// reset needRollbackCheckpoint to false only when we finish a ckpt
|
|
|
// for rollback image
|