|
@@ -62,6 +62,7 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
|
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
|
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
|
|
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
|
|
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
|
|
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
|
|
|
|
+import org.apache.hadoop.util.AutoCloseableLock;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -70,6 +71,7 @@ import java.util.HashMap;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
.Result.CONTAINER_INTERNAL_ERROR;
|
|
.Result.CONTAINER_INTERNAL_ERROR;
|
|
@@ -102,6 +104,7 @@ public class KeyValueHandler extends Handler {
|
|
private final ChunkManager chunkManager;
|
|
private final ChunkManager chunkManager;
|
|
private VolumeChoosingPolicy volumeChoosingPolicy;
|
|
private VolumeChoosingPolicy volumeChoosingPolicy;
|
|
private final int maxContainerSizeGB;
|
|
private final int maxContainerSizeGB;
|
|
|
|
+ private final AutoCloseableLock handlerLock;
|
|
|
|
|
|
|
|
|
|
public KeyValueHandler(Configuration config, ContainerSet contSet,
|
|
public KeyValueHandler(Configuration config, ContainerSet contSet,
|
|
@@ -115,6 +118,9 @@ public class KeyValueHandler extends Handler {
|
|
maxContainerSizeGB = config.getInt(ScmConfigKeys
|
|
maxContainerSizeGB = config.getInt(ScmConfigKeys
|
|
.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys
|
|
.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys
|
|
.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
|
|
.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
|
|
|
|
+ // this handler lock is used for synchronizing createContainer Requests,
|
|
|
|
+ // so using a fair lock here.
|
|
|
|
+ handlerLock = new AutoCloseableLock(new ReentrantLock(true));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -159,7 +165,6 @@ public class KeyValueHandler extends Handler {
|
|
case GetSmallFile:
|
|
case GetSmallFile:
|
|
return handleGetSmallFile(request, kvContainer);
|
|
return handleGetSmallFile(request, kvContainer);
|
|
}
|
|
}
|
|
-
|
|
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -204,10 +209,19 @@ public class KeyValueHandler extends Handler {
|
|
newContainerData, conf);
|
|
newContainerData, conf);
|
|
|
|
|
|
try {
|
|
try {
|
|
- newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
|
|
|
|
- containerSet.addContainer(newContainer);
|
|
|
|
|
|
+ handlerLock.acquire();
|
|
|
|
+ if (containerSet.getContainer(containerID) == null) {
|
|
|
|
+ newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
|
|
|
|
+ containerSet.addContainer(newContainer);
|
|
|
|
+ } else {
|
|
|
|
+ throw new StorageContainerException("Container already exists with " +
|
|
|
|
+ "container Id " + containerID, ContainerProtos.Result
|
|
|
|
+ .CONTAINER_EXISTS);
|
|
|
|
+ }
|
|
} catch (StorageContainerException ex) {
|
|
} catch (StorageContainerException ex) {
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
|
|
+ } finally {
|
|
|
|
+ handlerLock.release();
|
|
}
|
|
}
|
|
|
|
|
|
return ContainerUtils.getSuccessResponse(request);
|
|
return ContainerUtils.getSuccessResponse(request);
|