|
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType;
|
|
|
|
|
|
import java.io.EOFException;
|
|
@@ -47,6 +48,7 @@ public abstract class AbstractFSNodeStore<M> {
|
|
|
private FSDataOutputStream editlogOs;
|
|
|
|
|
|
private Path editLogPath;
|
|
|
+ private int replication;
|
|
|
private StoreSchema schema;
|
|
|
|
|
|
protected M manager;
|
|
@@ -65,6 +67,8 @@ public abstract class AbstractFSNodeStore<M> {
|
|
|
initFileSystem(conf);
|
|
|
// mkdir of root dir path
|
|
|
fs.mkdirs(fsWorkingPath);
|
|
|
+ this.replication = conf.getInt(YarnConfiguration.FS_STORE_FILE_REPLICATION,
|
|
|
+ YarnConfiguration.DEFAULT_FS_STORE_FILE_REPLICATION);
|
|
|
LOG.info("Created store directory :" + fsWorkingPath);
|
|
|
}
|
|
|
|
|
@@ -162,6 +166,7 @@ public abstract class AbstractFSNodeStore<M> {
|
|
|
StoreOp op = FSStoreOpHandler.getMirrorOp(storeType);
|
|
|
op.write(os, manager);
|
|
|
}
|
|
|
+ checkAvailability(writingMirrorPath);
|
|
|
// Move mirror to mirror.old
|
|
|
if (fs.exists(mirrorPath)) {
|
|
|
fs.delete(oldMirrorPath, false);
|
|
@@ -178,11 +183,27 @@ public abstract class AbstractFSNodeStore<M> {
|
|
|
// create a new editlog file
|
|
|
editlogOs = fs.create(editLogPath, true);
|
|
|
editlogOs.close();
|
|
|
-
|
|
|
+ checkAvailability(editLogPath);
|
|
|
LOG.info("Finished write mirror at:" + mirrorPath.toString());
|
|
|
LOG.info("Finished create editlog file at:" + editLogPath.toString());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Make sure replica is highly available. It will avoid setting replication,
|
|
|
+ * if the value configured for
|
|
|
+ * {@link YarnConfiguration#FS_STORE_FILE_REPLICATION} is 0.
|
|
|
+ */
|
|
|
+ private void checkAvailability(Path file) throws IOException {
|
|
|
+ try {
|
|
|
+ if (replication != 0
|
|
|
+ && fs.getFileStatus(file).getReplication() < replication) {
|
|
|
+ fs.setReplication(file, (short) replication);
|
|
|
+ }
|
|
|
+ } catch (UnsupportedOperationException e) {
|
|
|
+ LOG.error("Failed set replication for a file : {}", file);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
protected void loadManagerFromEditLog(Path editPath) throws IOException {
|
|
|
if (!fs.exists(editPath)) {
|
|
|
return;
|