|
@@ -111,7 +111,9 @@ public class KeyValueHandler extends Handler {
|
|
private final BlockDeletingService blockDeletingService;
|
|
private final BlockDeletingService blockDeletingService;
|
|
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
|
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
|
private final long maxContainerSize;
|
|
private final long maxContainerSize;
|
|
- private final AutoCloseableLock handlerLock;
|
|
|
|
|
|
+
|
|
|
|
+ // A lock that is held during container creation.
|
|
|
|
+ private final AutoCloseableLock containerCreationLock;
|
|
private final boolean doSyncWrite;
|
|
private final boolean doSyncWrite;
|
|
|
|
|
|
public KeyValueHandler(Configuration config, StateContext context,
|
|
public KeyValueHandler(Configuration config, StateContext context,
|
|
@@ -143,7 +145,7 @@ public class KeyValueHandler extends Handler {
|
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
|
|
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
|
|
// this handler lock is used for synchronizing createContainer Requests,
|
|
// this handler lock is used for synchronizing createContainer Requests,
|
|
// so using a fair lock here.
|
|
// so using a fair lock here.
|
|
- handlerLock = new AutoCloseableLock(new ReentrantLock(true));
|
|
|
|
|
|
+ containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -212,7 +214,7 @@ public class KeyValueHandler extends Handler {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Handles Create Container Request. If successful, adds the container to
|
|
* Handles Create Container Request. If successful, adds the container to
|
|
- * ContainerSet.
|
|
|
|
|
|
+ * ContainerSet and sends an ICR to the SCM.
|
|
*/
|
|
*/
|
|
ContainerCommandResponseProto handleCreateContainer(
|
|
ContainerCommandResponseProto handleCreateContainer(
|
|
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
|
|
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
|
|
@@ -235,14 +237,12 @@ public class KeyValueHandler extends Handler {
|
|
KeyValueContainer newContainer = new KeyValueContainer(
|
|
KeyValueContainer newContainer = new KeyValueContainer(
|
|
newContainerData, conf);
|
|
newContainerData, conf);
|
|
|
|
|
|
- try {
|
|
|
|
- handlerLock.acquire();
|
|
|
|
|
|
+ boolean created = false;
|
|
|
|
+ try (AutoCloseableLock l = containerCreationLock.acquire()) {
|
|
if (containerSet.getContainer(containerID) == null) {
|
|
if (containerSet.getContainer(containerID) == null) {
|
|
newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
|
|
newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
|
|
- containerSet.addContainer(newContainer);
|
|
|
|
- sendICR(newContainer);
|
|
|
|
|
|
+ created = containerSet.addContainer(newContainer);
|
|
} else {
|
|
} else {
|
|
-
|
|
|
|
// The create container request for an already existing container can
|
|
// The create container request for an already existing container can
|
|
// arrive in case the ContainerStateMachine reapplies the transaction
|
|
// arrive in case the ContainerStateMachine reapplies the transaction
|
|
// on datanode restart. Just log a warning msg here.
|
|
// on datanode restart. Just log a warning msg here.
|
|
@@ -251,10 +251,15 @@ public class KeyValueHandler extends Handler {
|
|
}
|
|
}
|
|
} catch (StorageContainerException ex) {
|
|
} catch (StorageContainerException ex) {
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
- } finally {
|
|
|
|
- handlerLock.release();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (created) {
|
|
|
|
+ try {
|
|
|
|
+ sendICR(newContainer);
|
|
|
|
+ } catch (StorageContainerException ex) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
return ContainerUtils.getSuccessResponse(request);
|
|
return ContainerUtils.getSuccessResponse(request);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -282,6 +287,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
KeyValueContainerData containerData = kvContainer.getContainerData();
|
|
KeyValueContainerData containerData = kvContainer.getContainerData();
|
|
return KeyValueContainerUtil.getReadContainerResponse(
|
|
return KeyValueContainerUtil.getReadContainerResponse(
|
|
request, containerData);
|
|
request, containerData);
|
|
@@ -420,6 +433,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
BlockData responseData;
|
|
BlockData responseData;
|
|
try {
|
|
try {
|
|
BlockID blockID = BlockID.getFromProtobuf(
|
|
BlockID blockID = BlockID.getFromProtobuf(
|
|
@@ -451,6 +472,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
long blockLength;
|
|
long blockLength;
|
|
try {
|
|
try {
|
|
BlockID blockID = BlockID
|
|
BlockID blockID = BlockID
|
|
@@ -510,6 +539,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
ChunkInfo chunkInfo;
|
|
ChunkInfo chunkInfo;
|
|
byte[] data;
|
|
byte[] data;
|
|
try {
|
|
try {
|
|
@@ -537,6 +574,27 @@ public class KeyValueHandler extends Handler {
|
|
return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
|
|
return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Throw an exception if the container is unhealthy.
|
|
|
|
+ *
|
|
|
|
+ * @throws StorageContainerException if the container is unhealthy.
|
|
|
|
+ * @param kvContainer
|
|
|
|
+ */
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ void checkContainerIsHealthy(KeyValueContainer kvContainer)
|
|
|
|
+ throws StorageContainerException {
|
|
|
|
+ kvContainer.readLock();
|
|
|
|
+ try {
|
|
|
|
+ if (kvContainer.getContainerData().getState() == State.UNHEALTHY) {
|
|
|
|
+ throw new StorageContainerException(
|
|
|
|
+ "The container replica is unhealthy.",
|
|
|
|
+ CONTAINER_UNHEALTHY);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ kvContainer.readUnlock();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Handle Delete Chunk operation. Calls ChunkManager to process the request.
|
|
* Handle Delete Chunk operation. Calls ChunkManager to process the request.
|
|
*/
|
|
*/
|
|
@@ -549,6 +607,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
try {
|
|
try {
|
|
checkContainerOpen(kvContainer);
|
|
checkContainerOpen(kvContainer);
|
|
|
|
|
|
@@ -697,6 +763,14 @@ public class KeyValueHandler extends Handler {
|
|
return ContainerUtils.malformedRequest(request);
|
|
return ContainerUtils.malformedRequest(request);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The container can become unhealthy after the lock is released.
|
|
|
|
+ // The operation will likely fail/timeout in that happens.
|
|
|
|
+ try {
|
|
|
|
+ checkContainerIsHealthy(kvContainer);
|
|
|
|
+ } catch (StorageContainerException sce) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
|
|
|
|
+ }
|
|
|
|
+
|
|
GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
|
|
GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
|
|
|
|
|
|
try {
|
|
try {
|