瀏覽代碼

YARN-2819. NPE in ATS Timeline Domains when upgrading from 2.4 to 2.6. Contributed by Zhijie Shen

(cherry picked from commit 4a114dd67aae83e5bb2d65470166de954acf36a2)
(cherry picked from commit 7d26cffb0cb21c16b335e8ae9b02523565ad3b5d)
Xuan 10 年之前
父節點
當前提交
629274c09b

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -839,6 +839,9 @@ Release 2.6.0 - UNRELEASED
 
     YARN-2825. Container leak on NM (Jian He via jlowe)
 
+    YARN-2819. NPE in ATS Timeline Domains when upgrading from 2.4 to 2.6.
+    (Zhijie Shen via xgong)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 36 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java

@@ -792,7 +792,8 @@ public class LeveldbTimelineStore extends AbstractService
    * Put a single entity.  If there is an error, add a TimelinePutError to the
    * given response.
    */
-  private void put(TimelineEntity entity, TimelinePutResponse response) {
+  private void put(TimelineEntity entity, TimelinePutResponse response,
+      boolean allowEmptyDomainId) {
     LockMap.CountingReentrantLock<EntityIdentifier> lock =
         writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
             entity.getEntityType()));
@@ -867,10 +868,18 @@ public class LeveldbTimelineStore extends AbstractService
                   new EntityIdentifier(relatedEntityId, relatedEntityType));
               continue;
             } else {
+              // This is the existing entity
               byte[] domainIdBytes = db.get(createDomainIdKey(
                   relatedEntityId, relatedEntityType, relatedEntityStartTime));
-              // This is the existing entity
-              String domainId = new String(domainIdBytes);
+              // The timeline data created by the server before 2.6 won't have
+              // the domain field. We assume this timeline data is in the
+              // default timeline domain.
+              String domainId = null;
+              if (domainIdBytes == null) {
+                domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
+              } else {
+                domainId = new String(domainIdBytes);
+              }
               if (!domainId.equals(entity.getDomainId())) {
                 // in this case the entity will be put, but the relation will be
                 // ignored
@@ -923,12 +932,14 @@ public class LeveldbTimelineStore extends AbstractService
           entity.getEntityType(), revStartTime);
       if (entity.getDomainId() == null ||
           entity.getDomainId().length() == 0) {
-        TimelinePutError error = new TimelinePutError();
-        error.setEntityId(entity.getEntityId());
-        error.setEntityType(entity.getEntityType());
-        error.setErrorCode(TimelinePutError.NO_DOMAIN);
-        response.addError(error);
-        return;
+        if (!allowEmptyDomainId) {
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entity.getEntityId());
+          error.setEntityType(entity.getEntityType());
+          error.setErrorCode(TimelinePutError.NO_DOMAIN);
+          response.addError(error);
+          return;
+        }
       } else {
         writeBatch.put(key, entity.getDomainId().getBytes());
         writePrimaryFilterEntries(writeBatch, primaryFilters, key,
@@ -1011,7 +1022,22 @@ public class LeveldbTimelineStore extends AbstractService
       deleteLock.readLock().lock();
       TimelinePutResponse response = new TimelinePutResponse();
       for (TimelineEntity entity : entities.getEntities()) {
-        put(entity, response);
+        put(entity, response, false);
+      }
+      return response;
+    } finally {
+      deleteLock.readLock().unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
+    try {
+      deleteLock.readLock().lock();
+      TimelinePutResponse response = new TimelinePutResponse();
+      for (TimelineEntity entity : entities.getEntities()) {
+        put(entity, response, true);
       }
       return response;
     } finally {

+ 16 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java

@@ -124,6 +124,7 @@ public class TimelineDataManager extends AbstractService {
           entities.getEntities().iterator();
       while (entitiesItr.hasNext()) {
         TimelineEntity entity = entitiesItr.next();
+        addDefaultDomainIdIfAbsent(entity);
         try {
           // check ACLs
           if (!timelineACLsManager.checkAccess(
@@ -161,6 +162,7 @@ public class TimelineDataManager extends AbstractService {
     entity =
         store.getEntity(entityId, entityType, fields);
     if (entity != null) {
+      addDefaultDomainIdIfAbsent(entity);
       // check ACLs
       if (!timelineACLsManager.checkAccess(
           callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
@@ -203,6 +205,7 @@ public class TimelineDataManager extends AbstractService {
               eventsOfOneEntity.getEntityId(),
               eventsOfOneEntity.getEntityType(),
               EnumSet.of(Field.PRIMARY_FILTERS));
+          addDefaultDomainIdIfAbsent(entity);
           // check ACLs
           if (!timelineACLsManager.checkAccess(
               callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
@@ -254,10 +257,12 @@ public class TimelineDataManager extends AbstractService {
         existingEntity =
             store.getEntity(entityID.getId(), entityID.getType(),
                 EnumSet.of(Field.PRIMARY_FILTERS));
-        if (existingEntity != null &&
-            !existingEntity.getDomainId().equals(entity.getDomainId())) {
-          throw new YarnException("The domain of the timeline entity "
-            + entityID + " is not allowed to be changed.");
+        if (existingEntity != null) {
+          addDefaultDomainIdIfAbsent(existingEntity);
+          if (!existingEntity.getDomainId().equals(entity.getDomainId())) {
+            throw new YarnException("The domain of the timeline entity "
+              + entityID + " is not allowed to be changed.");
+          }
         }
         if (!timelineACLsManager.checkAccess(
             callerUGI, ApplicationAccessType.MODIFY_APP, entity)) {
@@ -355,4 +360,11 @@ public class TimelineDataManager extends AbstractService {
     }
   }
 
+  private static void addDefaultDomainIdIfAbsent(TimelineEntity entity) {
+    // be compatible with the timeline data created before 2.6
+    if (entity.getDomainId() == null) {
+      entity.setDomainId(DEFAULT_DOMAIN_ID);
+    }
+  }
+
 }

+ 74 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
@@ -160,12 +161,13 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
   @Test
   public void testGetEntityTypes() throws IOException {
     List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
-    assertEquals(5, entityTypes.size());
-    assertEquals(entityType1, entityTypes.get(0));
-    assertEquals(entityType2, entityTypes.get(1));
-    assertEquals(entityType4, entityTypes.get(2));
-    assertEquals(entityType5, entityTypes.get(3));
-    assertEquals(entityType7, entityTypes.get(4));
+    assertEquals(6, entityTypes.size());
+    assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(0));
+    assertEquals(entityType1, entityTypes.get(1));
+    assertEquals(entityType2, entityTypes.get(2));
+    assertEquals(entityType4, entityTypes.get(3));
+    assertEquals(entityType5, entityTypes.get(4));
+    assertEquals(entityType7, entityTypes.get(5));
   }
 
   @Test
@@ -196,7 +198,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
     assertEquals(2, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
-    assertEquals(4, ((LeveldbTimelineStore)store).getEntityTypes().size());
+    assertEquals(5, ((LeveldbTimelineStore)store).getEntityTypes().size());
 
     ((LeveldbTimelineStore)store).discardOldEntities(123l);
     assertEquals(0, getEntities("type_1").size());
@@ -327,4 +329,69 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     super.testGetDomains();
   }
 
+  @Test
+  public void testRelatingToNonExistingEntity() throws IOException {
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
+    entityToStore.setEntityId("TEST_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+    TimelineEntity entityToGet =
+        store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    Assert.assertEquals("TEST_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("TEST_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+  }
+
+  @Test
+  public void testRelatingToOldEntityWithoutDomainId() throws IOException {
+    // New entity is put in the default domain
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
+    entityToStore.setEntityId("NEW_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+
+    TimelineEntity entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertNull(entityToGet.getDomainId());
+    Assert.assertEquals("NEW_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("NEW_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+
+    // New entity is not put in the default domain
+    entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
+    entityToStore.setEntityId("NEW_ENTITY_ID_2");
+    entityToStore.setDomainId("NON_DEFAULT");
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    TimelinePutResponse response = store.put(entities);
+    Assert.assertEquals(1, response.getErrors().size());
+    Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
+        response.getErrors().get(0).getErrorCode());
+    entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertNull(entityToGet.getDomainId());
+    // Still have one related entity
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
+        .iterator().next().size());
+  }
+
 }

+ 152 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestTimelineDataManager extends TimelineStoreTestUtils {
+
+  private FileContext fsContext;
+  private File fsPath;
+  private TimelineDataManager dataManaer;
+
+  @Before
+  public void setup() throws Exception {
+    fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    fsContext = FileContext.getLocalFSFileContext();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+    Configuration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    store = new LeveldbTimelineStore();
+    store.init(conf);
+    store.start();
+    loadTestEntityData();
+    loadVerificationEntityData();
+    loadTestDomainData();
+
+    TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
+    dataManaer = new TimelineDataManager(store, aclsManager);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (store != null) {
+      store.stop();
+    }
+    if (fsContext != null) {
+      fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+    }
+  }
+
+  @Test
+  public void testGetOldEntityWithOutDomainId() throws Exception {
+    TimelineEntity entity = dataManaer.getEntity(
+        "OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1", null,
+        UserGroupInformation.getCurrentUser());
+    Assert.assertNotNull(entity);
+    Assert.assertEquals("OLD_ENTITY_ID_1", entity.getEntityId());
+    Assert.assertEquals("OLD_ENTITY_TYPE_1", entity.getEntityType());
+    Assert.assertEquals(
+        TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
+  }
+
+  @Test
+  public void testGetOldEntitiesWithOutDomainId() throws Exception {
+    TimelineEntities entities = dataManaer.getEntities(
+        "OLD_ENTITY_TYPE_1", null, null, null, null, null, null, null, null,
+        UserGroupInformation.getCurrentUser());
+    Assert.assertEquals(2, entities.getEntities().size());
+    Assert.assertEquals("OLD_ENTITY_ID_2",
+        entities.getEntities().get(0).getEntityId());
+    Assert.assertEquals("OLD_ENTITY_TYPE_1",
+        entities.getEntities().get(0).getEntityType());
+    Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID,
+        entities.getEntities().get(0).getDomainId());
+    Assert.assertEquals("OLD_ENTITY_ID_1",
+        entities.getEntities().get(1).getEntityId());
+    Assert.assertEquals("OLD_ENTITY_TYPE_1",
+        entities.getEntities().get(1).getEntityType());
+    Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID,
+        entities.getEntities().get(1).getDomainId());
+  }
+
+  @Test
+  public void testUpdatingOldEntityWithoutDomainId() throws Exception {
+    // Set the domain to the default domain when updating
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityType("OLD_ENTITY_TYPE_1");
+    entity.setEntityId("OLD_ENTITY_ID_1");
+    entity.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entity.addOtherInfo("NEW_OTHER_INFO_KEY", "NEW_OTHER_INFO_VALUE");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+    TimelinePutResponse response = dataManaer.postEntities(
+        entities, UserGroupInformation.getCurrentUser());
+    Assert.assertEquals(0, response.getErrors().size());
+    entity = store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entity);
+    // Even in leveldb, the domain is updated to the default domain Id
+    Assert.assertEquals(
+        TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
+    Assert.assertEquals(1, entity.getOtherInfo().size());
+    Assert.assertEquals("NEW_OTHER_INFO_KEY",
+        entity.getOtherInfo().keySet().iterator().next());
+    Assert.assertEquals("NEW_OTHER_INFO_VALUE",
+        entity.getOtherInfo().values().iterator().next());
+    
+    // Set the domain to the non-default domain when updating
+    entity = new TimelineEntity();
+    entity.setEntityType("OLD_ENTITY_TYPE_1");
+    entity.setEntityId("OLD_ENTITY_ID_2");
+    entity.setDomainId("NON_DEFAULT");
+    entity.addOtherInfo("NEW_OTHER_INFO_KEY", "NEW_OTHER_INFO_VALUE");
+    entities = new TimelineEntities();
+    entities.addEntity(entity);
+    response = dataManaer.postEntities(
+        entities, UserGroupInformation.getCurrentUser());
+    Assert.assertEquals(1, response.getErrors().size());
+    Assert.assertEquals(TimelinePutResponse.TimelinePutError.ACCESS_DENIED,
+        response.getErrors().get(0).getErrorCode());
+    entity = store.getEntity("OLD_ENTITY_ID_2", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entity);
+    // In leveldb, the domain Id is still null
+    Assert.assertNull(entity.getDomainId());
+    // Updating is not executed
+    Assert.assertEquals(0, entity.getOtherInfo().size());
+  }
+  
+}

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java

@@ -210,6 +210,18 @@ public class TimelineStoreTestUtils {
     assertEquals(entityId7, response.getErrors().get(0).getEntityId());
     assertEquals(TimelinePutError.FORBIDDEN_RELATION,
         response.getErrors().get(0).getErrorCode());
+
+    if (store instanceof LeveldbTimelineStore) {
+      LeveldbTimelineStore leveldb = (LeveldbTimelineStore) store;
+      entities.setEntities(Collections.singletonList(createEntity(
+          "OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", 63l, null, null, null, null,
+          null)));
+      leveldb.putWithNoDomainId(entities);
+      entities.setEntities(Collections.singletonList(createEntity(
+          "OLD_ENTITY_ID_2", "OLD_ENTITY_TYPE_1", 64l, null, null, null, null,
+          null)));
+      leveldb.putWithNoDomainId(entities);
+    }
   }
 
   /**