|
@@ -17,6 +17,9 @@
|
|
|
|
|
|
package org.apache.hadoop.ozone.om;
|
|
|
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.ObjectReader;
|
|
|
+import com.fasterxml.jackson.databind.ObjectWriter;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.protobuf.BlockingService;
|
|
@@ -41,6 +44,7 @@ import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.ozone.OmUtils;
|
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
import org.apache.hadoop.ozone.audit.AuditAction;
|
|
|
import org.apache.hadoop.ozone.audit.AuditEventStatus;
|
|
@@ -67,19 +71,29 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Service
|
|
|
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.GenericOptionsParser;
|
|
|
+import org.apache.hadoop.util.ShutdownHookManager;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import javax.management.ObjectName;
|
|
|
+import java.io.BufferedWriter;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStreamWriter;
|
|
|
import java.io.PrintStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.StandardCopyOption;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
|
|
|
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
|
|
@@ -87,10 +101,19 @@ import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
|
|
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
|
|
|
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
|
|
|
import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
|
|
|
+import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
|
|
|
+import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
|
|
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
|
|
|
+
|
|
|
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
|
|
|
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
|
|
|
+import static org.apache.hadoop.ozone.om.OMConfigKeys
|
|
|
+ .OZONE_OM_HANDLER_COUNT_DEFAULT;
|
|
|
+import static org.apache.hadoop.ozone.om.OMConfigKeys
|
|
|
+ .OZONE_OM_HANDLER_COUNT_KEY;
|
|
|
+import static org.apache.hadoop.ozone.om.OMConfigKeys
|
|
|
+ .OZONE_OM_METRICS_SAVE_INTERVAL;
|
|
|
+import static org.apache.hadoop.ozone.om.OMConfigKeys
|
|
|
+ .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
|
|
|
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
|
|
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
|
|
|
@@ -111,19 +134,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
+ StartupOption.INIT.getName() + " ]\n " + "ozone om [ "
|
|
|
+ StartupOption.HELP.getName() + " ]\n";
|
|
|
private final OzoneConfiguration configuration;
|
|
|
- private final RPC.Server omRpcServer;
|
|
|
- private final InetSocketAddress omRpcAddress;
|
|
|
+ private RPC.Server omRpcServer;
|
|
|
+ private InetSocketAddress omRpcAddress;
|
|
|
private final OMMetadataManager metadataManager;
|
|
|
private final VolumeManager volumeManager;
|
|
|
private final BucketManager bucketManager;
|
|
|
private final KeyManager keyManager;
|
|
|
private final OMMetrics metrics;
|
|
|
- private final OzoneManagerHttpServer httpServer;
|
|
|
+ private OzoneManagerHttpServer httpServer;
|
|
|
private final OMStorage omStorage;
|
|
|
private final ScmBlockLocationProtocol scmBlockClient;
|
|
|
private final StorageContainerLocationProtocol scmContainerClient;
|
|
|
private ObjectName omInfoBeanName;
|
|
|
private final S3BucketManager s3BucketManager;
|
|
|
+ private Timer metricsTimer;
|
|
|
+ private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask;
|
|
|
+ private static final ObjectWriter WRITER =
|
|
|
+ new ObjectMapper().writerWithDefaultPrettyPrinter();
|
|
|
+ private static final ObjectReader READER =
|
|
|
+ new ObjectMapper().readerFor(OmMetricsInfo.class);
|
|
|
+ private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
+ private final Runnable shutdownHook;
|
|
|
+ private final File omMetaDir;
|
|
|
|
|
|
private OzoneManager(OzoneConfiguration conf) throws IOException {
|
|
|
Preconditions.checkNotNull(conf);
|
|
@@ -143,33 +175,79 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
throw new OMException("SCM version info mismatch.",
|
|
|
ResultCodes.SCM_VERSION_MISMATCH_ERROR);
|
|
|
}
|
|
|
- final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
|
|
|
- OZONE_OM_HANDLER_COUNT_DEFAULT);
|
|
|
|
|
|
RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
|
|
|
ProtobufRpcEngine.class);
|
|
|
|
|
|
- BlockingService omService = newReflectiveBlockingService(
|
|
|
- new OzoneManagerProtocolServerSideTranslatorPB(this));
|
|
|
- final InetSocketAddress omNodeRpcAddr =
|
|
|
- getOmAddress(configuration);
|
|
|
- omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
|
|
|
- OzoneManagerProtocolPB.class, omService,
|
|
|
- handlerCount);
|
|
|
- omRpcAddress = updateRPCListenAddress(configuration,
|
|
|
- OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
|
|
|
metadataManager = new OmMetadataManagerImpl(configuration);
|
|
|
volumeManager = new VolumeManagerImpl(metadataManager, configuration);
|
|
|
bucketManager = new BucketManagerImpl(metadataManager);
|
|
|
+ metrics = OMMetrics.create();
|
|
|
+
|
|
|
s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager,
|
|
|
volumeManager, bucketManager);
|
|
|
- metrics = OMMetrics.create();
|
|
|
keyManager =
|
|
|
new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
|
|
|
omStorage.getOmId());
|
|
|
- httpServer = new OzoneManagerHttpServer(configuration, this);
|
|
|
+
|
|
|
+ shutdownHook = () -> {
|
|
|
+ saveOmMetrics();
|
|
|
+ };
|
|
|
+ ShutdownHookManager.get().addShutdownHook(shutdownHook,
|
|
|
+ SHUTDOWN_HOOK_PRIORITY);
|
|
|
+
|
|
|
+ omMetaDir = OmUtils.getOmDbDir(configuration);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Class which schedule saving metrics to a file.
|
|
|
+ */
|
|
|
+ private class ScheduleOMMetricsWriteTask extends TimerTask {
|
|
|
+ public void run() {
|
|
|
+ saveOmMetrics();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveOmMetrics() {
|
|
|
+ try {
|
|
|
+ boolean success;
|
|
|
+ try (BufferedWriter writer = new BufferedWriter(
|
|
|
+ new OutputStreamWriter(new FileOutputStream(
|
|
|
+ getTempMetricsStorageFile()), "UTF-8"))) {
|
|
|
+ OmMetricsInfo metricsInfo = new OmMetricsInfo();
|
|
|
+ metricsInfo.setNumKeys(metrics.getNumKeys());
|
|
|
+ WRITER.writeValue(writer, metricsInfo);
|
|
|
+ success = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (success) {
|
|
|
+ Files.move(getTempMetricsStorageFile().toPath(),
|
|
|
+ getMetricsStorageFile().toPath(), StandardCopyOption
|
|
|
+ .ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Unable to write the om Metrics file", ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns temporary metrics storage file.
|
|
|
+ * @return File
|
|
|
+ */
|
|
|
+ private File getTempMetricsStorageFile() {
|
|
|
+ return new File(omMetaDir, OM_METRICS_TEMP_FILE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns metrics storage file.
|
|
|
+ * @return File
|
|
|
+ */
|
|
|
+ private File getMetricsStorageFile() {
|
|
|
+ return new File(omMetaDir, OM_METRICS_FILE);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* Create a scm block client, used by putKey() and getKey().
|
|
|
*
|
|
@@ -448,12 +526,49 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
* Start service.
|
|
|
*/
|
|
|
public void start() throws IOException {
|
|
|
+
|
|
|
+ InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
|
|
|
+ int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
|
|
|
+ OZONE_OM_HANDLER_COUNT_DEFAULT);
|
|
|
+ BlockingService omService = newReflectiveBlockingService(
|
|
|
+ new OzoneManagerProtocolServerSideTranslatorPB(this));
|
|
|
+ omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
|
|
|
+ OzoneManagerProtocolPB.class, omService,
|
|
|
+ handlerCount);
|
|
|
+ omRpcAddress = updateRPCListenAddress(configuration,
|
|
|
+ OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
|
|
|
+ omRpcServer.start();
|
|
|
+
|
|
|
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
|
|
|
omRpcAddress));
|
|
|
+
|
|
|
+
|
|
|
DefaultMetricsSystem.initialize("OzoneManager");
|
|
|
- metadataManager.start();
|
|
|
- keyManager.start();
|
|
|
- omRpcServer.start();
|
|
|
+
|
|
|
+ metadataManager.start(configuration);
|
|
|
+
|
|
|
+
|
|
|
+ // Set metrics and start metrics back ground thread
|
|
|
+ metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
|
|
|
+ .getVolumeTable()));
|
|
|
+ metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager
|
|
|
+ .getBucketTable()));
|
|
|
+
|
|
|
+ if (getMetricsStorageFile().exists()) {
|
|
|
+ OmMetricsInfo metricsInfo = READER.readValue(getMetricsStorageFile());
|
|
|
+ metrics.setNumKeys(metricsInfo.getNumKeys());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Schedule save metrics
|
|
|
+ long period = configuration.getTimeDuration(OZONE_OM_METRICS_SAVE_INTERVAL,
|
|
|
+ OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
|
|
+ scheduleOMMetricsWriteTask = new ScheduleOMMetricsWriteTask();
|
|
|
+ metricsTimer = new Timer();
|
|
|
+ metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
|
|
|
+
|
|
|
+ keyManager.start(configuration);
|
|
|
+
|
|
|
+ httpServer = new OzoneManagerHttpServer(configuration, this);
|
|
|
httpServer.start();
|
|
|
registerMXBean();
|
|
|
setStartTime();
|
|
@@ -464,10 +579,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
*/
|
|
|
public void stop() {
|
|
|
try {
|
|
|
- metadataManager.stop();
|
|
|
+ // Cancel the metrics timer and set to null.
|
|
|
+ metricsTimer.cancel();
|
|
|
+ metricsTimer = null;
|
|
|
+ scheduleOMMetricsWriteTask = null;
|
|
|
omRpcServer.stop();
|
|
|
keyManager.stop();
|
|
|
httpServer.stop();
|
|
|
+ metadataManager.stop();
|
|
|
metrics.unRegister();
|
|
|
unregisterMXBean();
|
|
|
} catch (Exception e) {
|
|
@@ -500,6 +619,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
volumeManager.createVolume(args);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.CREATE_VOLUME,
|
|
|
(args == null) ? null : args.toAuditMap()));
|
|
|
+ metrics.incNumVolumes();
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumVolumeCreateFails();
|
|
|
AUDIT.logWriteFailure(
|
|
@@ -633,6 +753,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
volumeManager.deleteVolume(volume);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_VOLUME,
|
|
|
buildAuditMap(volume)));
|
|
|
+ metrics.decNumVolumes();
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumVolumeDeleteFails();
|
|
|
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.DELETE_VOLUME,
|
|
@@ -727,6 +848,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
bucketManager.createBucket(bucketInfo);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.CREATE_BUCKET,
|
|
|
(bucketInfo == null) ? null : bucketInfo.toAuditMap()));
|
|
|
+ metrics.incNumBuckets();
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumBucketCreateFails();
|
|
|
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.CREATE_BUCKET,
|
|
@@ -835,6 +957,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
keyManager.commitKey(args, clientID);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.COMMIT_KEY,
|
|
|
auditMap));
|
|
|
+ // As when we commit the key it is visible, so we should increment here.
|
|
|
+ // As key also can have multiple versions, we need to increment keys
|
|
|
+ // only if version is 0. Currently we have not complete support of
|
|
|
+ // versioning of keys. So, this can be revisited later.
|
|
|
+ if (args != null && args.getLocationInfoList() != null &&
|
|
|
+ args.getLocationInfoList().size() > 0 &&
|
|
|
+ args.getLocationInfoList().get(0) != null &&
|
|
|
+ args.getLocationInfoList().get(0).getCreateVersion() == 0) {
|
|
|
+ metrics.incNumKeys();
|
|
|
+ }
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumKeyCommitFails();
|
|
|
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.COMMIT_KEY,
|
|
@@ -925,6 +1057,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
keyManager.deleteKey(args);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_KEY,
|
|
|
(args == null) ? null : args.toAuditMap()));
|
|
|
+ metrics.decNumKeys();
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumKeyDeleteFails();
|
|
|
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.DELETE_KEY,
|
|
@@ -998,6 +1131,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
bucketManager.deleteBucket(volume, bucket);
|
|
|
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_BUCKET,
|
|
|
auditMap));
|
|
|
+ metrics.decNumBuckets();
|
|
|
} catch (Exception ex) {
|
|
|
metrics.incNumBucketDeleteFails();
|
|
|
AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.DELETE_BUCKET,
|
|
@@ -1136,7 +1270,30 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
*/
|
|
|
public void createS3Bucket(String userName, String s3BucketName)
|
|
|
throws IOException {
|
|
|
- s3BucketManager.createS3Bucket(userName, s3BucketName);
|
|
|
+ try {
|
|
|
+ metrics.incNumBucketCreates();
|
|
|
+ try {
|
|
|
+ boolean newVolumeCreate = s3BucketManager.createOzoneVolumeIfNeeded(
|
|
|
+ userName);
|
|
|
+ if (newVolumeCreate) {
|
|
|
+ metrics.incNumVolumeCreates();
|
|
|
+ metrics.incNumVolumes();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ // We need to increment volume creates also because this is first
|
|
|
+ // time we are trying to create a volume, it failed. As we increment
|
|
|
+ // ops and create when we try to do that operation.
|
|
|
+ metrics.incNumVolumeCreates();
|
|
|
+ metrics.incNumVolumeCreateFails();
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
+ s3BucketManager.createS3Bucket(userName, s3BucketName);
|
|
|
+ metrics.incNumBuckets();
|
|
|
+ } catch (IOException ex) {
|
|
|
+ metrics.incNumBucketCreateFails();
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1145,7 +1302,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
|
|
|
*/
|
|
|
public void deleteS3Bucket(String s3BucketName)
|
|
|
throws IOException {
|
|
|
- s3BucketManager.deleteS3Bucket(s3BucketName);
|
|
|
+ try {
|
|
|
+ metrics.incNumBucketDeletes();
|
|
|
+ s3BucketManager.deleteS3Bucket(s3BucketName);
|
|
|
+ metrics.decNumBuckets();
|
|
|
+ } catch (IOException ex) {
|
|
|
+ metrics.incNumBucketDeleteFails();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|