|
@@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hbase.client.Connection;
|
|
import org.apache.hadoop.hbase.client.Connection;
|
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
import org.apache.hadoop.service.AbstractService;
|
|
import org.apache.hadoop.service.AbstractService;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
|
@@ -47,6 +53,12 @@ public class HBaseTimelineReaderImpl
|
|
|
|
|
|
private Configuration hbaseConf = null;
|
|
private Configuration hbaseConf = null;
|
|
private Connection conn;
|
|
private Connection conn;
|
|
|
|
+ private Configuration monitorHBaseConf = null;
|
|
|
|
+ private Connection monitorConn;
|
|
|
|
+ private ScheduledExecutorService monitorExecutorService;
|
|
|
|
+ private TimelineReaderContext monitorContext;
|
|
|
|
+ private long monitorInterval;
|
|
|
|
+ private AtomicBoolean hbaseDown = new AtomicBoolean();
|
|
|
|
|
|
public HBaseTimelineReaderImpl() {
|
|
public HBaseTimelineReaderImpl() {
|
|
super(HBaseTimelineReaderImpl.class.getName());
|
|
super(HBaseTimelineReaderImpl.class.getName());
|
|
@@ -55,22 +67,72 @@ public class HBaseTimelineReaderImpl
|
|
@Override
|
|
@Override
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
super.serviceInit(conf);
|
|
super.serviceInit(conf);
|
|
|
|
+
|
|
|
|
+ String clusterId = conf.get(
|
|
|
|
+ YarnConfiguration.RM_CLUSTER_ID,
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
|
|
|
+ monitorContext =
|
|
|
|
+ new TimelineReaderContext(clusterId, null, null, null, null,
|
|
|
|
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
|
|
|
|
+ monitorInterval = conf.getLong(
|
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
|
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
|
|
|
|
+
|
|
|
|
+ monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
|
|
|
+ monitorHBaseConf.setInt("hbase.client.retries.number", 3);
|
|
|
|
+ monitorHBaseConf.setLong("hbase.client.pause", 1000);
|
|
|
|
+ monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
|
|
|
|
+ monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
|
|
|
|
+ monitorInterval);
|
|
|
|
+ monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
|
|
|
|
+ monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
|
|
|
|
+
|
|
|
|
+ monitorExecutorService = Executors.newScheduledThreadPool(1);
|
|
|
|
+
|
|
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
|
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
|
+ super.serviceStart();
|
|
|
|
+ LOG.info("Scheduling HBase liveness monitor at interval {}",
|
|
|
|
+ monitorInterval);
|
|
|
|
+ monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
|
|
|
|
+ monitorInterval, TimeUnit.MILLISECONDS);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected void serviceStop() throws Exception {
|
|
protected void serviceStop() throws Exception {
|
|
if (conn != null) {
|
|
if (conn != null) {
|
|
LOG.info("closing the hbase Connection");
|
|
LOG.info("closing the hbase Connection");
|
|
conn.close();
|
|
conn.close();
|
|
}
|
|
}
|
|
|
|
+ if (monitorExecutorService != null) {
|
|
|
|
+ monitorExecutorService.shutdownNow();
|
|
|
|
+ if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
|
+ LOG.warn("failed to stop the monitir task in time. " +
|
|
|
|
+ "will still proceed to close the monitor.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ monitorConn.close();
|
|
super.serviceStop();
|
|
super.serviceStop();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void checkHBaseDown() throws IOException {
|
|
|
|
+ if (hbaseDown.get()) {
|
|
|
|
+ throw new IOException("HBase is down");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public boolean isHBaseDown() {
|
|
|
|
+ return hbaseDown.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public TimelineEntity getEntity(TimelineReaderContext context,
|
|
public TimelineEntity getEntity(TimelineReaderContext context,
|
|
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
|
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
|
|
|
+ checkHBaseDown();
|
|
TimelineEntityReader reader =
|
|
TimelineEntityReader reader =
|
|
TimelineEntityReaderFactory.createSingleEntityReader(context,
|
|
TimelineEntityReaderFactory.createSingleEntityReader(context,
|
|
dataToRetrieve);
|
|
dataToRetrieve);
|
|
@@ -81,6 +143,7 @@ public class HBaseTimelineReaderImpl
|
|
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
|
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
|
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
|
|
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ checkHBaseDown();
|
|
TimelineEntityReader reader =
|
|
TimelineEntityReader reader =
|
|
TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
|
|
TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
|
|
filters, dataToRetrieve);
|
|
filters, dataToRetrieve);
|
|
@@ -90,7 +153,37 @@ public class HBaseTimelineReaderImpl
|
|
@Override
|
|
@Override
|
|
public Set<String> getEntityTypes(TimelineReaderContext context)
|
|
public Set<String> getEntityTypes(TimelineReaderContext context)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ checkHBaseDown();
|
|
EntityTypeReader reader = new EntityTypeReader(context);
|
|
EntityTypeReader reader = new EntityTypeReader(context);
|
|
return reader.readEntityTypes(hbaseConf, conn);
|
|
return reader.readEntityTypes(hbaseConf, conn);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ protected static final TimelineEntityFilters MONITOR_FILTERS =
|
|
|
|
+ new TimelineEntityFilters.Builder().entityLimit(1L).build();
|
|
|
|
+ protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
|
|
|
|
+ new TimelineDataToRetrieve(null, null, null, null, null, null);
|
|
|
|
+
|
|
|
|
+ private class HBaseMonitor implements Runnable {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ LOG.info("Running HBase liveness monitor");
|
|
|
|
+ TimelineEntityReader reader =
|
|
|
|
+ TimelineEntityReaderFactory.createMultipleEntitiesReader(
|
|
|
|
+ monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
|
|
|
|
+ reader.readEntities(monitorHBaseConf, monitorConn);
|
|
|
|
+
|
|
|
|
+ // on success, reset hbase down flag
|
|
|
|
+ if (hbaseDown.getAndSet(false)) {
|
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("HBase request succeeded, assuming HBase up");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.warn("Got failure attempting to read from timeline storage, " +
|
|
|
|
+ "assuming HBase down", e);
|
|
|
|
+ hbaseDown.getAndSet(true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|