Browse Source

YARN-1984. LeveldbTimelineStore does not handle db exceptions properly. Contributed by Varun Saxena
(cherry picked from commit 1ce4d33c2dc86d711b227a04d2f9a2ab696a24a1)

(cherry picked from commit 89ef49fb0814baea3640798fdf66d2ae3a550896)

Jason Lowe 10 năm trước cách đây
mục cha
commit
17a23b8191

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -48,6 +48,9 @@ Release 2.6.1 - UNRELEASED
     YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
     (Jian He via jlowe)
 
+    YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
+    (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 117 - 89
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java

@@ -66,10 +66,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBException;
-import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.ReadOptions;
 import org.iq80.leveldb.WriteBatch;
@@ -438,13 +438,15 @@ public class LeveldbTimelineStore extends AbstractService
         .add(entityType).add(writeReverseOrderedLong(revStartTime))
         .add(entityId).getBytesForLookup();
 
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(prefix);
 
       return getEntity(entityId, entityType, revStartTime, fields, iterator,
           prefix, prefix.length);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -455,7 +457,7 @@ public class LeveldbTimelineStore extends AbstractService
    * specified fields for this entity, return null.
    */
   private static TimelineEntity getEntity(String entityId, String entityType,
-      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
       byte[] prefix, int prefixlen) throws IOException {
     if (fields == null) {
       fields = EnumSet.allOf(Field.class);
@@ -562,7 +564,7 @@ public class LeveldbTimelineStore extends AbstractService
                 o2.length);
           }
         });
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       // look up start times for the specified entities
       // skip entities with no start time
@@ -606,7 +608,7 @@ public class LeveldbTimelineStore extends AbstractService
           if (limit == null) {
             limit = DEFAULT_LIMIT;
           }
-          iterator = db.iterator();
+          iterator = new LeveldbIterator(db);
           for (iterator.seek(first); entity.getEvents().size() < limit &&
               iterator.hasNext(); iterator.next()) {
             byte[] key = iterator.peekNext().getKey();
@@ -623,6 +625,8 @@ public class LeveldbTimelineStore extends AbstractService
           }
         }
       }
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -683,7 +687,7 @@ public class LeveldbTimelineStore extends AbstractService
       String entityType, Long limit, Long starttime, Long endtime,
       String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fields) throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
       // only db keys matching the prefix (base + entity type) will be parsed
@@ -724,7 +728,7 @@ public class LeveldbTimelineStore extends AbstractService
       }
 
       TimelineEntities entities = new TimelineEntities();
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(first);
       // iterate until one of the following conditions is met: limit is
       // reached, there are no more keys, the key prefix no longer matches,
@@ -783,10 +787,23 @@ public class LeveldbTimelineStore extends AbstractService
         }
       }
       return entities;
+    } catch(DBException e) {
+      throw new IOException(e);   	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
   }
+  
+  /**
+   * Handle error and set it in response.
+   */
+  private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
+    TimelinePutError error = new TimelinePutError();
+    error.setEntityId(entity.getEntityId());
+    error.setEntityType(entity.getEntityType());
+    error.setErrorCode(errorCode);
+    response.addError(error);
+  }
 
   /**
    * Put a single entity.  If there is an error, add a TimelinePutError to the
@@ -812,11 +829,7 @@ public class LeveldbTimelineStore extends AbstractService
           entity.getStartTime(), events);
       if (startAndInsertTime == null) {
         // if no start time is found, add an error and return
-        TimelinePutError error = new TimelinePutError();
-        error.setEntityId(entity.getEntityId());
-        error.setEntityType(entity.getEntityType());
-        error.setErrorCode(TimelinePutError.NO_START_TIME);
-        response.addError(error);
+        handleError(entity, response, TimelinePutError.NO_START_TIME);   
         return;
       }
       revStartTime = writeReverseOrderedLong(startAndInsertTime
@@ -883,11 +896,7 @@ public class LeveldbTimelineStore extends AbstractService
               if (!domainId.equals(entity.getDomainId())) {
                 // in this case the entity will be put, but the relation will be
                 // ignored
-                TimelinePutError error = new TimelinePutError();
-                error.setEntityId(entity.getEntityId());
-                error.setEntityType(entity.getEntityType());
-                error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
-                response.addError(error);
+                handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
                 continue;
               }
             }
@@ -933,11 +942,7 @@ public class LeveldbTimelineStore extends AbstractService
       if (entity.getDomainId() == null ||
           entity.getDomainId().length() == 0) {
         if (!allowEmptyDomainId) {
-          TimelinePutError error = new TimelinePutError();
-          error.setEntityId(entity.getEntityId());
-          error.setEntityType(entity.getEntityType());
-          error.setErrorCode(TimelinePutError.NO_DOMAIN);
-          response.addError(error);
+          handleError(entity, response, TimelinePutError.NO_DOMAIN);
           return;
         }
       } else {
@@ -946,14 +951,14 @@ public class LeveldbTimelineStore extends AbstractService
             entity.getDomainId().getBytes());
       }
       db.write(writeBatch);
+    } catch (DBException de) {
+      LOG.error("Error putting entity " + entity.getEntityId() +
+                " of type " + entity.getEntityType(), de);
+      handleError(entity, response, TimelinePutError.IO_EXCEPTION);
     } catch (IOException e) {
       LOG.error("Error putting entity " + entity.getEntityId() +
           " of type " + entity.getEntityType(), e);
-      TimelinePutError error = new TimelinePutError();
-      error.setEntityId(entity.getEntityId());
-      error.setEntityType(entity.getEntityType());
-      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
-      response.addError(error);
+      handleError(entity, response, TimelinePutError.IO_EXCEPTION);
     } finally {
       lock.unlock();
       writeLocks.returnLock(lock);
@@ -983,15 +988,16 @@ public class LeveldbTimelineStore extends AbstractService
             relatedEntity.getType(), relatedEntityStartTime),
             writeReverseOrderedLong(relatedEntityStartAndInsertTime
                 .insertTime));
+      } catch (DBException de) {
+        LOG.error("Error putting related entity " + relatedEntity.getId() +
+            " of type " + relatedEntity.getType() + " for entity " +
+            entity.getEntityId() + " of type " + entity.getEntityType(), de);
+        handleError(entity, response, TimelinePutError.IO_EXCEPTION);
       } catch (IOException e) {
         LOG.error("Error putting related entity " + relatedEntity.getId() +
             " of type " + relatedEntity.getType() + " for entity " +
             entity.getEntityId() + " of type " + entity.getEntityType(), e);
-        TimelinePutError error = new TimelinePutError();
-        error.setEntityId(entity.getEntityId());
-        error.setEntityType(entity.getEntityType());
-        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
-        response.addError(error);
+        handleError(entity, response, TimelinePutError.IO_EXCEPTION);
       } finally {
         lock.unlock();
         writeLocks.returnLock(lock);
@@ -1072,23 +1078,27 @@ public class LeveldbTimelineStore extends AbstractService
   private Long getStartTimeLong(String entityId, String entityType)
       throws IOException {
     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
-    // start time is not provided, so try to look it up
-    if (startTimeReadCache.containsKey(entity)) {
-      // found the start time in the cache
-      return startTimeReadCache.get(entity);
-    } else {
-      // try to look up the start time in the db
-      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-      byte[] v = db.get(b);
-      if (v == null) {
-        // did not find the start time in the db
-        return null;
+    try {
+      // start time is not provided, so try to look it up
+      if (startTimeReadCache.containsKey(entity)) {
+        // found the start time in the cache
+        return startTimeReadCache.get(entity);
       } else {
-        // found the start time in the db
-        Long l = readReverseOrderedLong(v, 0);
-        startTimeReadCache.put(entity, l);
-        return l;
+        // try to look up the start time in the db
+        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+        byte[] v = db.get(b);
+        if (v == null) {
+          // did not find the start time in the db
+          return null;
+        } else {
+          // found the start time in the db
+          Long l = readReverseOrderedLong(v, 0);
+          startTimeReadCache.put(entity, l);
+          return l;
+        }
       }
+    } catch(DBException e) {
+      throw new IOException(e);   
     }
   }
 
@@ -1152,28 +1162,32 @@ public class LeveldbTimelineStore extends AbstractService
     StartAndInsertTime startAndInsertTime = null;
     // create lookup key for start time
     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
-    // retrieve value for key
-    byte[] v = db.get(b);
-    if (v == null) {
-      // start time doesn't exist in db
-      if (suggestedStartTime == null) {
-        return null;
+    try {
+      // retrieve value for key
+      byte[] v = db.get(b);
+      if (v == null) {
+        // start time doesn't exist in db
+        if (suggestedStartTime == null) {
+          return null;
+        }
+        startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+            System.currentTimeMillis());
+        
+        // write suggested start time
+        v = new byte[16];
+        writeReverseOrderedLong(suggestedStartTime, v, 0);
+        writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
+        WriteOptions writeOptions = new WriteOptions();
+        writeOptions.sync(true);
+        db.put(b, v, writeOptions);
+      } else {
+        // found start time in db, so ignore suggested start time
+        startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+            readReverseOrderedLong(v, 8));
       }
-      startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
-          System.currentTimeMillis());
-
-      // write suggested start time
-      v = new byte[16];
-      writeReverseOrderedLong(suggestedStartTime, v, 0);
-      writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
-      WriteOptions writeOptions = new WriteOptions();
-      writeOptions.sync(true);
-      db.put(b, v, writeOptions);
-    } else {
-      // found start time in db, so ignore suggested start time
-      startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
-          readReverseOrderedLong(v, 8));
-    }
+    } catch(DBException e) {
+      throw new IOException(e);            	
+    } 
     startTimeWriteCache.put(entity, startAndInsertTime);
     startTimeReadCache.put(entity, startAndInsertTime.startTime);
     return startAndInsertTime;
@@ -1373,7 +1387,7 @@ public class LeveldbTimelineStore extends AbstractService
 
   @VisibleForTesting
   List<String> getEntityTypes() throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       iterator = getDbIterator(false);
       List<String> entityTypes = new ArrayList<String>();
@@ -1396,6 +1410,8 @@ public class LeveldbTimelineStore extends AbstractService
         iterator.seek(lookupKey);
       }
       return entityTypes;
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -1406,7 +1422,7 @@ public class LeveldbTimelineStore extends AbstractService
    * the given write batch.
    */
   private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
-      DBIterator iterator) {
+      LeveldbIterator iterator) {
     for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
       byte[] key = iterator.peekNext().getKey();
       if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1418,7 +1434,7 @@ public class LeveldbTimelineStore extends AbstractService
 
   @VisibleForTesting
   boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
-      DBIterator iterator, DBIterator pfIterator, boolean seeked)
+      LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
       throws IOException {
     WriteBatch writeBatch = null;
     try {
@@ -1524,6 +1540,8 @@ public class LeveldbTimelineStore extends AbstractService
       writeOptions.sync(true);
       db.write(writeBatch, writeOptions);
       return true;
+    } catch(DBException e) {
+      throw new IOException(e);
     } finally {
       IOUtils.cleanup(LOG, writeBatch);
     }
@@ -1542,8 +1560,8 @@ public class LeveldbTimelineStore extends AbstractService
     try {
       List<String> entityTypes = getEntityTypes();
       for (String entityType : entityTypes) {
-        DBIterator iterator = null;
-        DBIterator pfIterator = null;
+        LeveldbIterator iterator = null;
+        LeveldbIterator pfIterator = null;
         long typeCount = 0;
         try {
           deleteLock.writeLock().lock();
@@ -1583,21 +1601,25 @@ public class LeveldbTimelineStore extends AbstractService
   }
 
   @VisibleForTesting
-  DBIterator getDbIterator(boolean fillCache) {
+  LeveldbIterator getDbIterator(boolean fillCache) {
     ReadOptions readOptions = new ReadOptions();
     readOptions.fillCache(fillCache);
-    return db.iterator(readOptions);
+    return new LeveldbIterator(db, readOptions);
   }
   
   Version loadVersion() throws IOException {
-    byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
-    // if version is not stored previously, treat it as 1.0.
-    if (data == null || data.length == 0) {
-      return Version.newInstance(1, 0);
+    try {
+      byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
+      // if version is not stored previously, treat it as 1.0.
+      if (data == null || data.length == 0) {
+        return Version.newInstance(1, 0);
+      }
+      Version version =
+          new VersionPBImpl(VersionProto.parseFrom(data));
+      return version;
+    } catch(DBException e) {
+      throw new IOException(e);    	
     }
-    Version version =
-        new VersionPBImpl(VersionProto.parseFrom(data));
-    return version;
   }
   
   // Only used for test
@@ -1726,6 +1748,8 @@ public class LeveldbTimelineStore extends AbstractService
       writeBatch.put(domainEntryKey, timestamps);
       writeBatch.put(ownerLookupEntryKey, timestamps);
       db.write(writeBatch);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, writeBatch);
     }
@@ -1754,13 +1778,15 @@ public class LeveldbTimelineStore extends AbstractService
   @Override
   public TimelineDomain getDomain(String domainId)
       throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       byte[] prefix = KeyBuilder.newInstance()
           .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
-      iterator = db.iterator();
+      iterator = new LeveldbIterator(db);
       iterator.seek(prefix);
       return getTimelineDomain(iterator, domainId, prefix);
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
@@ -1769,12 +1795,12 @@ public class LeveldbTimelineStore extends AbstractService
   @Override
   public TimelineDomains getDomains(String owner)
       throws IOException {
-    DBIterator iterator = null;
+    LeveldbIterator iterator = null;
     try {
       byte[] prefix = KeyBuilder.newInstance()
           .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
       List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
-      for (iterator = db.iterator(), iterator.seek(prefix);
+      for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
           iterator.hasNext();) {
         byte[] key = iterator.peekNext().getKey();
         if (!prefixMatches(prefix, prefix.length, key)) {
@@ -1809,13 +1835,15 @@ public class LeveldbTimelineStore extends AbstractService
       TimelineDomains domainsToReturn = new TimelineDomains();
       domainsToReturn.addDomains(domains);
       return domainsToReturn;
+    } catch(DBException e) {
+      throw new IOException(e);            	
     } finally {
       IOUtils.cleanup(LOG, iterator);
     }
   }
 
   private static TimelineDomain getTimelineDomain(
-      DBIterator iterator, String domainId, byte[] prefix) throws IOException {
+      LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
     // Iterate over all the rows whose key starts with prefix to retrieve the
     // domain information.
     TimelineDomain domain = new TimelineDomain();
@@ -1852,5 +1880,5 @@ public class LeveldbTimelineStore extends AbstractService
     } else {
       return domain;
     }
-  }
+  }    
 }

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java

@@ -45,7 +45,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
-import org.iq80.leveldb.DBIterator;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.iq80.leveldb.DBException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -146,13 +147,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
 
   private boolean deleteNextEntity(String entityType, byte[] ts)
       throws IOException, InterruptedException {
-    DBIterator iterator = null;
-    DBIterator pfIterator = null;
+    LeveldbIterator iterator = null;
+    LeveldbIterator pfIterator = null;
     try {
       iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
       pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
       return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
           iterator, pfIterator, false);
+    } catch(DBException e) {
+      throw new IOException(e);   	
     } finally {
       IOUtils.cleanup(null, iterator, pfIterator);
     }