|
@@ -28,6 +28,7 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
@@ -74,6 +75,7 @@ import org.iq80.leveldb.Options;
|
|
|
import org.iq80.leveldb.ReadOptions;
|
|
|
import org.iq80.leveldb.WriteBatch;
|
|
|
import org.nustaq.serialization.FSTConfiguration;
|
|
|
+import org.nustaq.serialization.FSTClazzNameRegistry;
|
|
|
|
|
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
|
|
|
|
@@ -170,9 +172,22 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
.getLog(RollingLevelDBTimelineStore.class);
|
|
|
private static FSTConfiguration fstConf =
|
|
|
FSTConfiguration.createDefaultConfiguration();
|
|
|
+ // Fall back to 2.24 parsing if 2.50 parsing fails
|
|
|
+ private static FSTConfiguration fstConf224 =
|
|
|
+ FSTConfiguration.createDefaultConfiguration();
|
|
|
+ // Static class code for 2.24
|
|
|
+ private static final int LINKED_HASH_MAP_224_CODE = 83;
|
|
|
|
|
|
static {
|
|
|
fstConf.setShareReferences(false);
|
|
|
+ fstConf224.setShareReferences(false);
|
|
|
+ // YARN-6654 unable to find class for code 83 (LinkedHashMap)
|
|
|
+ // The linked hash map was changed between 2.24 and 2.50 so that
|
|
|
+ // the static code for LinkedHashMap (83) was changed to a dynamic
|
|
|
+ // code.
|
|
|
+ FSTClazzNameRegistry registry = fstConf224.getClassRegistry();
|
|
|
+ registry.registerClass(
|
|
|
+ LinkedHashMap.class, LINKED_HASH_MAP_224_CODE, fstConf224);
|
|
|
}
|
|
|
|
|
|
@Private
|
|
@@ -339,7 +354,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
deletionThread.start();
|
|
|
}
|
|
|
super.serviceStart();
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
@@ -365,7 +380,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
private final long ttl;
|
|
|
private final long ttlInterval;
|
|
|
|
|
|
- public EntityDeletionThread(Configuration conf) {
|
|
|
+ EntityDeletionThread(Configuration conf) {
|
|
|
ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
|
|
|
DEFAULT_TIMELINE_SERVICE_TTL_MS);
|
|
|
ttlInterval = conf.getLong(
|
|
@@ -479,9 +494,15 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
try {
|
|
|
o = fstConf.asObject(iterator.peekNext().getValue());
|
|
|
entity.addOtherInfo(keyStr, o);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Error while decoding "
|
|
|
- + entityId + ":otherInfo:" + keyStr, e);
|
|
|
+ } catch (Exception ignore) {
|
|
|
+ try {
|
|
|
+ // Fall back to 2.24 parser
|
|
|
+ o = fstConf224.asObject(iterator.peekNext().getValue());
|
|
|
+ entity.addOtherInfo(keyStr, o);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error while decoding "
|
|
|
+ + entityId + ":otherInfo:" + keyStr, e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
|
|
@@ -1348,8 +1369,13 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
Object o = null;
|
|
|
try {
|
|
|
o = fstConf.asObject(value);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Error while decoding " + tstype, e);
|
|
|
+ } catch (Exception ignore) {
|
|
|
+ try {
|
|
|
+ // Fall back to 2.24 parser
|
|
|
+ o = fstConf224.asObject(value);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error while decoding " + tstype, e);
|
|
|
+ }
|
|
|
}
|
|
|
if (o == null) {
|
|
|
event.setEventInfo(null);
|
|
@@ -1378,8 +1404,14 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
|
|
try {
|
|
|
value = fstConf.asObject(bytes);
|
|
|
entity.addPrimaryFilter(name, value);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Error while decoding " + name, e);
|
|
|
+ } catch (Exception ignore) {
|
|
|
+ try {
|
|
|
+ // Fall back to 2.24 parser
|
|
|
+ value = fstConf224.asObject(bytes);
|
|
|
+ entity.addPrimaryFilter(name, value);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error while decoding " + name, e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|