|
@@ -65,8 +65,32 @@ public abstract class AbstractFSNodeStore<M> {
|
|
this.fsWorkingPath = fsStorePath;
|
|
this.fsWorkingPath = fsStorePath;
|
|
this.manager = mgr;
|
|
this.manager = mgr;
|
|
initFileSystem(conf);
|
|
initFileSystem(conf);
|
|
- // mkdir of root dir path
|
|
|
|
- fs.mkdirs(fsWorkingPath);
|
|
|
|
|
|
+ // mkdir of root dir path with retry logic
|
|
|
|
+ int maxRetries = conf.getInt(YarnConfiguration.NODE_STORE_ROOT_DIR_NUM_RETRIES,
|
|
|
|
+ YarnConfiguration.NODE_STORE_ROOT_DIR_NUM_DEFAULT_RETRIES);
|
|
|
|
+ int retryCount = 0;
|
|
|
|
+ boolean success = fs.mkdirs(fsWorkingPath);
|
|
|
|
+
|
|
|
|
+ while (!success && retryCount < maxRetries) {
|
|
|
|
+ try {
|
|
|
|
+ if (!fs.exists(fsWorkingPath)) {
|
|
|
|
+ success = fs.mkdirs(fsWorkingPath);
|
|
|
|
+ } else {
|
|
|
|
+ success = true;
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ retryCount++;
|
|
|
|
+ if (retryCount >= maxRetries) {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(conf.getInt(YarnConfiguration.NODE_STORE_ROOT_DIR_RETRY_INTERVAL,
|
|
|
|
+ YarnConfiguration.NODE_STORE_ROOT_DIR_RETRY_DEFAULT_INTERVAL));
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ throw new RuntimeException(ie);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
this.replication = conf.getInt(YarnConfiguration.FS_STORE_FILE_REPLICATION,
|
|
this.replication = conf.getInt(YarnConfiguration.FS_STORE_FILE_REPLICATION,
|
|
YarnConfiguration.DEFAULT_FS_STORE_FILE_REPLICATION);
|
|
YarnConfiguration.DEFAULT_FS_STORE_FILE_REPLICATION);
|
|
LOG.info("Created store directory :" + fsWorkingPath);
|
|
LOG.info("Created store directory :" + fsWorkingPath);
|