Explorar o código

YARN-2958. Made RMStateStore not update the last sequence number when updating the delegation token. Contributed by Varun Saxena.

(cherry picked from commit 562a701945be3a672f9cb5a52cc6db2c1589ba2b)
Zhijie Shen %!s(int64=10) %!d(string=hai) anos
pai
achega
c6cf748985
Modificáronse 11 ficheiros con 131 adicións e 144 borrados
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 25 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  3. 25 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
  4. 19 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  5. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  6. 18 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  7. 2 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDTEvent.java
  8. 18 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  9. 5 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
  10. 5 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  11. 5 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -279,6 +279,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
     YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
     (Rohith Sharmaks via ozawa)
     (Rohith Sharmaks via ozawa)
 
 
+    YARN-2958. Made RMStateStore not update the last sequence number when updating the
+    delegation token. (Varun Saxena via zjshen)
+
 Release 2.6.0 - 2014-11-18
 Release 2.6.0 - 2014-11-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 25 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -60,8 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
 @Private
 @Private
@@ -452,11 +450,10 @@ public class FileSystemRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  public synchronized void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier identifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
-    storeOrUpdateRMDelegationTokenAndSequenceNumberState(
-        identifier, renewDate,latestSequenceNumber, false);
+  public synchronized void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier identifier, Long renewDate)
+      throws Exception {
+    storeOrUpdateRMDelegationTokenState(identifier, renewDate, false);
   }
   }
 
 
   @Override
   @Override
@@ -469,16 +466,15 @@ public class FileSystemRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
-    storeOrUpdateRMDelegationTokenAndSequenceNumberState(
-        rmDTIdentifier, renewDate,latestSequenceNumber, true);
+  protected void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
+    storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
   }
   }
 
 
-  private void storeOrUpdateRMDelegationTokenAndSequenceNumberState(
+  private void storeOrUpdateRMDelegationTokenState(
       RMDelegationTokenIdentifier identifier, Long renewDate,
       RMDelegationTokenIdentifier identifier, Long renewDate,
-      int latestSequenceNumber, boolean isUpdate) throws Exception {
+      boolean isUpdate) throws Exception {
     Path nodeCreatePath =
     Path nodeCreatePath =
         getNodePath(rmDTSecretManagerRoot,
         getNodePath(rmDTSecretManagerRoot,
           DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
           DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
@@ -490,23 +486,24 @@ public class FileSystemRMStateStore extends RMStateStore {
     } else {
     } else {
       LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
       LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
       writeFile(nodeCreatePath, identifierData.toByteArray());
       writeFile(nodeCreatePath, identifierData.toByteArray());
-    }
 
 
-    // store sequence number
-    Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
-          DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
-    LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
-        + latestSequenceNumber);
-    if (dtSequenceNumberPath == null) {
-      if (!createFile(latestSequenceNumberPath)) {
-        throw new Exception("Failed to create " + latestSequenceNumberPath);
-      }
-    } else {
-      if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
-        throw new Exception("Failed to rename " + dtSequenceNumberPath);
+      // store sequence number
+      Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+            + identifier.getSequenceNumber());
+      LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+          + identifier.getSequenceNumber());
+      if (dtSequenceNumberPath == null) {
+        if (!createFile(latestSequenceNumberPath)) {
+          throw new Exception("Failed to create " + latestSequenceNumberPath);
+        }
+      } else {
+        if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
+          throw new Exception("Failed to rename " + dtSequenceNumberPath);
+        }
       }
       }
+      dtSequenceNumberPath = latestSequenceNumberPath;
     }
     }
-    dtSequenceNumberPath = latestSequenceNumberPath;
   }
   }
 
 
   @Override
   @Override

+ 25 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -544,31 +544,30 @@ public class LeveldbRMStateStore extends RMStateStore {
       throw new IOException(e);
       throw new IOException(e);
     }
     }
   }
   }
-
-  @Override
-  protected void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier tokenId, Long renewDate,
-      int latestSequenceNumber) throws IOException {
+  
+  private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
+      Long renewDate, boolean isUpdate) throws IOException {
     String tokenKey = getRMDTTokenNodeKey(tokenId);
     String tokenKey = getRMDTTokenNodeKey(tokenId);
     RMDelegationTokenIdentifierData tokenData =
     RMDelegationTokenIdentifierData tokenData =
         new RMDelegationTokenIdentifierData(tokenId, renewDate);
         new RMDelegationTokenIdentifierData(tokenId, renewDate);
-    ByteArrayOutputStream bs = new ByteArrayOutputStream();
-    DataOutputStream ds = new DataOutputStream(bs);
-    try {
-      ds.writeInt(latestSequenceNumber);
-    } finally {
-      ds.close();
-    }
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing token to " + tokenKey);
       LOG.debug("Storing token to " + tokenKey);
-      LOG.debug("Storing " + latestSequenceNumber + " to "
-          + RM_DT_SEQUENCE_NUMBER_KEY);
     }
     }
     try {
     try {
       WriteBatch batch = db.createWriteBatch();
       WriteBatch batch = db.createWriteBatch();
       try {
       try {
         batch.put(bytes(tokenKey), tokenData.toByteArray());
         batch.put(bytes(tokenKey), tokenData.toByteArray());
-        batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
+        if(!isUpdate) {
+          ByteArrayOutputStream bs = new ByteArrayOutputStream();
+          try (DataOutputStream ds = new DataOutputStream(bs)) {
+            ds.writeInt(tokenId.getSequenceNumber());
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Storing " + tokenId.getSequenceNumber() + " to "
+                + RM_DT_SEQUENCE_NUMBER_KEY);   
+          }
+          batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
+        }
         db.write(batch);
         db.write(batch);
       } finally {
       } finally {
         batch.close();
         batch.close();
@@ -579,11 +578,17 @@ public class LeveldbRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier tokenId, Long renewDate,
-      int latestSequenceNumber) throws IOException {
-    storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate,
-        latestSequenceNumber);
+  protected void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    storeOrUpdateRMDT(tokenId, renewDate, false);
+  }
+
+  @Override
+  protected void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    storeOrUpdateRMDT(tokenId, renewDate, true);
   }
   }
 
 
   @Override
   @Override

+ 19 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -149,23 +149,30 @@ public class MemoryRMStateStore extends RMStateStore {
     }
     }
   }
   }
 
 
-  @Override
-  public synchronized void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  private void storeOrUpdateRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
+      Long renewDate, boolean isUpdate) throws Exception {
     Map<RMDelegationTokenIdentifier, Long> rmDTState =
     Map<RMDelegationTokenIdentifier, Long> rmDTState =
         state.rmSecretManagerState.getTokenState();
         state.rmSecretManagerState.getTokenState();
     if (rmDTState.containsKey(rmDTIdentifier)) {
     if (rmDTState.containsKey(rmDTIdentifier)) {
       IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
       IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
-              + "is already stored.");
+          + "is already stored.");
       LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
       LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
       throw e;
       throw e;
     }
     }
     rmDTState.put(rmDTIdentifier, renewDate);
     rmDTState.put(rmDTIdentifier, renewDate);
-    state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
+    if(!isUpdate) {
+      state.rmSecretManagerState.dtSequenceNumber = 
+          rmDTIdentifier.getSequenceNumber();
+    }
     LOG.info("Store RMDT with sequence number "
     LOG.info("Store RMDT with sequence number "
-        + rmDTIdentifier.getSequenceNumber()
-        + ". And the latest sequence number is " + latestSequenceNumber);
+             + rmDTIdentifier.getSequenceNumber());
+  }
+
+  @Override
+  public synchronized void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
+    storeOrUpdateRMDT(rmDTIdentifier, renewDate, false);
   }
   }
 
 
   @Override
   @Override
@@ -179,12 +186,11 @@ public class MemoryRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  protected void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
     removeRMDelegationTokenState(rmDTIdentifier);
     removeRMDelegationTokenState(rmDTIdentifier);
-    storeRMDelegationTokenAndSequenceNumberState(
-        rmDTIdentifier, renewDate, latestSequenceNumber);
+    storeOrUpdateRMDT(rmDTIdentifier, renewDate, true);
     LOG.info("Update RMDT with sequence number "
     LOG.info("Update RMDT with sequence number "
         + rmDTIdentifier.getSequenceNumber());
         + rmDTIdentifier.getSequenceNumber());
   }
   }

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -77,9 +77,9 @@ public class NullRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  public void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  public void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
     // Do nothing
     // Do nothing
   }
   }
 
 
@@ -90,9 +90,9 @@ public class NullRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  protected void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
     // Do nothing
     // Do nothing
   }
   }
 
 

+ 18 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -296,9 +296,8 @@ public abstract class RMStateStore extends AbstractService {
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       try {
       try {
         LOG.info("Storing RMDelegationToken and SequenceNumber");
         LOG.info("Storing RMDelegationToken and SequenceNumber");
-        store.storeRMDelegationTokenAndSequenceNumberState(
-            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
-            dtEvent.getLatestSequenceNumber());
+        store.storeRMDelegationTokenState(
+            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
       } catch (Exception e) {
       } catch (Exception e) {
         LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
         LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
             e);
             e);
@@ -341,9 +340,8 @@ public abstract class RMStateStore extends AbstractService {
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
       try {
       try {
         LOG.info("Updating RMDelegationToken and SequenceNumber");
         LOG.info("Updating RMDelegationToken and SequenceNumber");
-        store.updateRMDelegationTokenAndSequenceNumberInternal(
-            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(),
-            dtEvent.getLatestSequenceNumber());
+        store.updateRMDelegationTokenState(
+            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
       } catch (Exception e) {
       } catch (Exception e) {
         LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
         LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
             e);
             e);
@@ -672,11 +670,10 @@ public abstract class RMStateStore extends AbstractService {
    * RMDTSecretManager call this to store the state of a delegation token
    * RMDTSecretManager call this to store the state of a delegation token
    * and sequence number
    * and sequence number
    */
    */
-  public void storeRMDelegationTokenAndSequenceNumber(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) {
+  public void storeRMDelegationToken(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
-        latestSequenceNumber, RMStateStoreEventType.STORE_DELEGATION_TOKEN));
+        RMStateStoreEventType.STORE_DELEGATION_TOKEN));
   }
   }
 
 
   /**
   /**
@@ -684,17 +681,17 @@ public abstract class RMStateStore extends AbstractService {
    * Derived classes must implement this method to store the state of
    * Derived classes must implement this method to store the state of
    * RMDelegationToken and sequence number
    * RMDelegationToken and sequence number
    */
    */
-  protected abstract void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception;
+  protected abstract void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception;
 
 
   /**
   /**
    * RMDTSecretManager call this to remove the state of a delegation token
    * RMDTSecretManager call this to remove the state of a delegation token
    */
    */
   public void removeRMDelegationToken(
   public void removeRMDelegationToken(
-      RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+      RMDelegationTokenIdentifier rmDTIdentifier) {
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
-        sequenceNumber, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
+        RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
   }
   }
 
 
   /**
   /**
@@ -708,11 +705,10 @@ public abstract class RMStateStore extends AbstractService {
    * RMDTSecretManager call this to update the state of a delegation token
    * RMDTSecretManager call this to update the state of a delegation token
    * and sequence number
    * and sequence number
    */
    */
-  public void updateRMDelegationTokenAndSequenceNumber(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) {
+  public void updateRMDelegationToken(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
     handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
-        latestSequenceNumber, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
+        RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
   }
   }
 
 
   /**
   /**
@@ -720,9 +716,9 @@ public abstract class RMStateStore extends AbstractService {
    * Derived classes must implement this method to update the state of
    * Derived classes must implement this method to update the state of
    * RMDelegationToken and sequence number
    * RMDelegationToken and sequence number
    */
    */
-  protected abstract void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception;
+  protected abstract void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception;
 
 
   /**
   /**
    * RMDTSecretManager call this to store the state of a master key
    * RMDTSecretManager call this to store the state of a master key

+ 2 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRMDTEvent.java

@@ -23,18 +23,16 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
 public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
   private RMDelegationTokenIdentifier rmDTIdentifier;
   private RMDelegationTokenIdentifier rmDTIdentifier;
   private Long renewDate;
   private Long renewDate;
-  private int latestSequenceNumber;
 
 
   public RMStateStoreRMDTEvent(RMStateStoreEventType type) {
   public RMStateStoreRMDTEvent(RMStateStoreEventType type) {
     super(type);
     super(type);
   }
   }
 
 
   public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier,
   public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier,
-      Long renewDate, int latestSequenceNumber, RMStateStoreEventType type) {
+      Long renewDate, RMStateStoreEventType type) {
     this(type);
     this(type);
     this.rmDTIdentifier = rmDTIdentifier;
     this.rmDTIdentifier = rmDTIdentifier;
     this.renewDate = renewDate;
     this.renewDate = renewDate;
-    this.latestSequenceNumber = latestSequenceNumber;
   }
   }
 
 
   public RMDelegationTokenIdentifier getRmDTIdentifier() {
   public RMDelegationTokenIdentifier getRmDTIdentifier() {
@@ -44,8 +42,4 @@ public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
   public Long getRenewDate() {
   public Long getRenewDate() {
     return renewDate;
     return renewDate;
   }
   }
-
-  public int getLatestSequenceNumber() {
-    return latestSequenceNumber;
-  }
-}
+}

+ 18 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -698,12 +698,11 @@ public class ZKRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  protected synchronized void storeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
     ArrayList<Op> opList = new ArrayList<Op>();
     ArrayList<Op> opList = new ArrayList<Op>();
-    addStoreOrUpdateOps(
-        opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
+    addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
     doMultiWithRetries(opList);
     doMultiWithRetries(opList);
   }
   }
 
 
@@ -727,29 +726,27 @@ public class ZKRMStateStore extends RMStateStore {
   }
   }
 
 
   @Override
   @Override
-  protected synchronized void updateRMDelegationTokenAndSequenceNumberInternal(
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber) throws Exception {
+  protected synchronized void updateRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
+      throws Exception {
     ArrayList<Op> opList = new ArrayList<Op>();
     ArrayList<Op> opList = new ArrayList<Op>();
     String nodeRemovePath =
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
             + rmDTIdentifier.getSequenceNumber());
     if (existsWithRetries(nodeRemovePath, true) == null) {
     if (existsWithRetries(nodeRemovePath, true) == null) {
       // in case znode doesn't exist
       // in case znode doesn't exist
-      addStoreOrUpdateOps(
-          opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
+      addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
       LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
       LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
     } else {
     } else {
       // in case znode exists
       // in case znode exists
-      addStoreOrUpdateOps(
-          opList, rmDTIdentifier, renewDate, latestSequenceNumber, true);
+      addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
     }
     }
     doMultiWithRetries(opList);
     doMultiWithRetries(opList);
   }
   }
 
 
   private void addStoreOrUpdateOps(ArrayList<Op> opList,
   private void addStoreOrUpdateOps(ArrayList<Op> opList,
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      int latestSequenceNumber, boolean isUpdate) throws Exception {
+      boolean isUpdate) throws Exception {
     // store RM delegation token
     // store RM delegation token
     String nodeCreatePath =
     String nodeCreatePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
@@ -769,16 +766,15 @@ public class ZKRMStateStore extends RMStateStore {
       } else {
       } else {
         opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
         opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
             CreateMode.PERSISTENT));
             CreateMode.PERSISTENT));
+        // Update Sequence number only while storing DT
+        seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug((isUpdate ? "Storing " : "Updating ") +
+                    dtSequenceNumberPath + ". SequenceNumber: "
+                    + rmDTIdentifier.getSequenceNumber());
+        }
+        opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
       }
       }
-
-
-     seqOut.writeInt(latestSequenceNumber);
-     if (LOG.isDebugEnabled()) {
-        LOG.debug((isUpdate ? "Storing " : "Updating ") + dtSequenceNumberPath +
-            ". SequenceNumber: " + latestSequenceNumber);
-      }
-
-     opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
     } finally {
     } finally {
       seqOs.close();
       seqOs.close();
     }
     }

+ 5 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java

@@ -29,10 +29,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -109,8 +107,7 @@ public class RMDelegationTokenSecretManager extends
     try {
     try {
       LOG.info("storing RMDelegation token with sequence number: "
       LOG.info("storing RMDelegation token with sequence number: "
           + identifier.getSequenceNumber());
           + identifier.getSequenceNumber());
-      rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
-        identifier, renewDate, identifier.getSequenceNumber());
+      rmContext.getStateStore().storeRMDelegationToken(identifier, renewDate);
     } catch (Exception e) {
     } catch (Exception e) {
       LOG.error("Error in storing RMDelegationToken with sequence number: "
       LOG.error("Error in storing RMDelegationToken with sequence number: "
           + identifier.getSequenceNumber());
           + identifier.getSequenceNumber());
@@ -124,11 +121,10 @@ public class RMDelegationTokenSecretManager extends
     try {
     try {
       LOG.info("updating RMDelegation token with sequence number: "
       LOG.info("updating RMDelegation token with sequence number: "
           + id.getSequenceNumber());
           + id.getSequenceNumber());
-      rmContext.getStateStore().updateRMDelegationTokenAndSequenceNumber(id,
-        renewDate, id.getSequenceNumber());
+      rmContext.getStateStore().updateRMDelegationToken(id, renewDate);
     } catch (Exception e) {
     } catch (Exception e) {
-      LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
-            + id.getSequenceNumber());
+      LOG.error("Error in updating persisted RMDelegationToken" +
+                " with sequence number: " + id.getSequenceNumber());
       ExitUtil.terminate(1, e);
       ExitUtil.terminate(1, e);
     }
     }
   }
   }
@@ -139,8 +135,7 @@ public class RMDelegationTokenSecretManager extends
     try {
     try {
       LOG.info("removing RMDelegation token with sequence number: "
       LOG.info("removing RMDelegation token with sequence number: "
           + ident.getSequenceNumber());
           + ident.getSequenceNumber());
-      rmContext.getStateStore().removeRMDelegationToken(ident,
-        delegationTokenSequenceNumber);
+      rmContext.getStateStore().removeRMDelegationToken(ident);
     } catch (Exception e) {
     } catch (Exception e) {
       LOG.error("Error in removing RMDelegationToken with sequence number: "
       LOG.error("Error in removing RMDelegationToken with sequence number: "
           + ident.getSequenceNumber());
           + ident.getSequenceNumber());

+ 5 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -411,16 +411,15 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     RMDelegationTokenIdentifier dtId1 =
     RMDelegationTokenIdentifier dtId1 =
         new RMDelegationTokenIdentifier(new Text("owner1"),
         new RMDelegationTokenIdentifier(new Text("owner1"),
           new Text("renewer1"), new Text("realuser1"));
           new Text("renewer1"), new Text("realuser1"));
+    int sequenceNumber = 1111;
+    dtId1.setSequenceNumber(sequenceNumber);
     byte[] tokenBeforeStore = dtId1.getBytes();
     byte[] tokenBeforeStore = dtId1.getBytes();
     Long renewDate1 = new Long(System.currentTimeMillis());
     Long renewDate1 = new Long(System.currentTimeMillis());
-    int sequenceNumber = 1111;
-    store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
-      sequenceNumber);
+    store.storeRMDelegationToken(dtId1, renewDate1);
     modifyRMDelegationTokenState();
     modifyRMDelegationTokenState();
     Map<RMDelegationTokenIdentifier, Long> token1 =
     Map<RMDelegationTokenIdentifier, Long> token1 =
         new HashMap<RMDelegationTokenIdentifier, Long>();
         new HashMap<RMDelegationTokenIdentifier, Long>();
     token1.put(dtId1, renewDate1);
     token1.put(dtId1, renewDate1);
-
     // store delegation key;
     // store delegation key;
     DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
     DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
     HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
     HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
@@ -440,9 +439,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
 
     // update RM delegation token;
     // update RM delegation token;
     renewDate1 = new Long(System.currentTimeMillis());
     renewDate1 = new Long(System.currentTimeMillis());
-    ++sequenceNumber;
-    store.updateRMDelegationTokenAndSequenceNumber(
-        dtId1, renewDate1, sequenceNumber);
+    store.updateRMDelegationToken(dtId1, renewDate1);
     token1.put(dtId1, renewDate1);
     token1.put(dtId1, renewDate1);
 
 
     RMDTSecretManagerState updateSecretManagerState =
     RMDTSecretManagerState updateSecretManagerState =
@@ -463,7 +460,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
         noKeySecretManagerState.getDTSequenceNumber());
         noKeySecretManagerState.getDTSequenceNumber());
 
 
     // check to delete delegationToken
     // check to delete delegationToken
-    store.removeRMDelegationToken(dtId1, sequenceNumber);
+    store.removeRMDelegationToken(dtId1);
     RMDTSecretManagerState noKeyAndTokenSecretManagerState =
     RMDTSecretManagerState noKeyAndTokenSecretManagerState =
         store.loadState().getRMDTSecretManagerState();
         store.loadState().getRMDTSecretManagerState();
     token1.clear();
     token1.clear();

+ 5 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -337,20 +337,18 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     RMDelegationTokenIdentifier dtId1 =
     RMDelegationTokenIdentifier dtId1 =
         new RMDelegationTokenIdentifier(new Text("owner1"),
         new RMDelegationTokenIdentifier(new Text("owner1"),
             new Text("renewer1"), new Text("realuser1"));
             new Text("renewer1"), new Text("realuser1"));
-    Long renewDate1 = new Long(System.currentTimeMillis());
-    int sequenceNumber = 1111;
-    store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
-        sequenceNumber);
+    Long renewDate1 = new Long(System.currentTimeMillis()); 
+    dtId1.setSequenceNumber(1111);
+    store.storeRMDelegationToken(dtId1, renewDate1);
     assertEquals("RMStateStore should have been in fenced state", true,
     assertEquals("RMStateStore should have been in fenced state", true,
         store.isFencedState());
         store.isFencedState());
 
 
-    store.updateRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
-        sequenceNumber);
+    store.updateRMDelegationToken(dtId1, renewDate1);
     assertEquals("RMStateStore should have been in fenced state", true,
     assertEquals("RMStateStore should have been in fenced state", true,
         store.isFencedState());
         store.isFencedState());
 
 
     // remove delegation key;
     // remove delegation key;
-    store.removeRMDelegationToken(dtId1, sequenceNumber);
+    store.removeRMDelegationToken(dtId1);
     assertEquals("RMStateStore should have been in fenced state", true,
     assertEquals("RMStateStore should have been in fenced state", true,
         store.isFencedState());
         store.isFencedState());