|
@@ -17,7 +17,6 @@
|
|
|
package org.apache.hadoop.ozone.ksm;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
|
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
|
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
|
@@ -64,6 +63,62 @@ public class VolumeManagerImpl implements VolumeManager {
|
|
|
OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
|
|
|
}
|
|
|
|
|
|
+ // Helpers to add and delete volume from user list
|
|
|
+ private void addVolumeToOwnerList(String volume, String owner,
|
|
|
+ List<Map.Entry<byte[], byte[]>> putBatch)
|
|
|
+ throws IOException {
|
|
|
+ // Get the volume list
|
|
|
+ byte[] dbUserKey = metadataManager.getUserKey(owner);
|
|
|
+ byte[] volumeList = metadataManager.get(dbUserKey);
|
|
|
+ List<String> prevVolList = new LinkedList<>();
|
|
|
+ if (volumeList != null) {
|
|
|
+ VolumeList vlist = VolumeList.parseFrom(volumeList);
|
|
|
+ prevVolList.addAll(vlist.getVolumeNamesList());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check the volume count
|
|
|
+ if (prevVolList.size() >= maxUserVolumeCount) {
|
|
|
+ LOG.error("Too many volumes for user:{}", owner);
|
|
|
+ throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add the new volume to the list
|
|
|
+ prevVolList.add(volume);
|
|
|
+ VolumeList newVolList = VolumeList.newBuilder()
|
|
|
+ .addAllVolumeNames(prevVolList).build();
|
|
|
+ putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void delVolumeFromOwnerList(String volume, String owner,
|
|
|
+ List<Map.Entry<byte[], byte[]>> putBatch,
|
|
|
+ List<byte[]> deleteBatch)
|
|
|
+ throws IOException {
|
|
|
+ // Get the volume list
|
|
|
+ byte[] dbUserKey = metadataManager.getUserKey(owner);
|
|
|
+ byte[] volumeList = metadataManager.get(dbUserKey);
|
|
|
+ List<String> prevVolList = new LinkedList<>();
|
|
|
+ if (volumeList != null) {
|
|
|
+ VolumeList vlist = VolumeList.parseFrom(volumeList);
|
|
|
+ prevVolList.addAll(vlist.getVolumeNamesList());
|
|
|
+ } else {
|
|
|
+ throw new KSMException(ResultCodes.FAILED_USER_NOT_FOUND);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove the volume from the list
|
|
|
+ prevVolList.remove(volume);
|
|
|
+ if (prevVolList.size() == 0) {
|
|
|
+ deleteBatch.add(dbUserKey);
|
|
|
+ } else {
|
|
|
+ VolumeList newVolList = VolumeList.newBuilder()
|
|
|
+ .addAllVolumeNames(prevVolList).build();
|
|
|
+ putBatch.add(batchEntry(dbUserKey, newVolList.toByteArray()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map.Entry<byte[], byte[]> batchEntry(byte[] key, byte[] value) {
|
|
|
+ return new AbstractMap.SimpleEntry<>(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Creates a volume.
|
|
|
* @param args - KsmVolumeArgs.
|
|
@@ -74,42 +129,21 @@ public class VolumeManagerImpl implements VolumeManager {
|
|
|
metadataManager.writeLock().lock();
|
|
|
List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
|
|
|
try {
|
|
|
- byte[] volumeName = metadataManager.
|
|
|
- get(DFSUtil.string2Bytes(args.getVolume()));
|
|
|
+ byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
|
|
|
+ byte[] volumeInfo = metadataManager.get(dbVolumeKey);
|
|
|
|
|
|
// Check of the volume already exists
|
|
|
- if(volumeName != null) {
|
|
|
+ if (volumeInfo != null) {
|
|
|
LOG.error("volume:{} already exists", args.getVolume());
|
|
|
throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
|
|
|
}
|
|
|
|
|
|
- // Next count the number of volumes for the user
|
|
|
- String dbUserName = "$" + args.getOwnerName();
|
|
|
- byte[] volumeList = metadataManager
|
|
|
- .get(DFSUtil.string2Bytes(dbUserName));
|
|
|
- List prevVolList;
|
|
|
- if (volumeList != null) {
|
|
|
- VolumeList vlist = VolumeList.parseFrom(volumeList);
|
|
|
- prevVolList = vlist.getVolumeNamesList();
|
|
|
- } else {
|
|
|
- prevVolList = new LinkedList();
|
|
|
- }
|
|
|
-
|
|
|
- if (prevVolList.size() >= maxUserVolumeCount) {
|
|
|
- LOG.error("Too many volumes for user:{}", args.getOwnerName());
|
|
|
- throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
|
|
|
- }
|
|
|
-
|
|
|
- // Commit the volume information to metadataManager
|
|
|
- VolumeInfo volumeInfo = args.getProtobuf();
|
|
|
- batch.add(new AbstractMap.SimpleEntry<>(
|
|
|
- DFSUtil.string2Bytes(args.getVolume()), volumeInfo.toByteArray()));
|
|
|
+ // Write the vol info
|
|
|
+ VolumeInfo newVolumeInfo = args.getProtobuf();
|
|
|
+ batch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
|
|
|
|
|
|
- prevVolList.add(args.getVolume());
|
|
|
- VolumeList newVolList = VolumeList.newBuilder()
|
|
|
- .addAllVolumeNames(prevVolList).build();
|
|
|
- batch.add(new AbstractMap.SimpleEntry<>(
|
|
|
- DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray()));
|
|
|
+ // Add volume to user list
|
|
|
+ addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
|
|
|
metadataManager.batchPut(batch);
|
|
|
LOG.info("created volume:{} user:{}",
|
|
|
args.getVolume(), args.getOwnerName());
|
|
@@ -121,4 +155,120 @@ public class VolumeManagerImpl implements VolumeManager {
|
|
|
metadataManager.writeLock().unlock();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Changes the owner of a volume.
|
|
|
+ *
|
|
|
+ * @param volume - Name of the volume.
|
|
|
+ * @param owner - Name of the owner.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void setOwner(String volume, String owner) throws IOException {
|
|
|
+ Preconditions.checkNotNull(volume);
|
|
|
+ Preconditions.checkNotNull(owner);
|
|
|
+ List<Map.Entry<byte[], byte[]>> putbatch = new LinkedList<>();
|
|
|
+ List<byte[]> deletebatch = new LinkedList<>();
|
|
|
+ metadataManager.writeLock().lock();
|
|
|
+ try {
|
|
|
+ byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
|
|
+ byte[] volInfo = metadataManager.get(dbVolumeKey);
|
|
|
+ if (volInfo == null) {
|
|
|
+ throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
|
|
+ }
|
|
|
+
|
|
|
+ VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
|
|
+ KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
|
|
|
+ Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
|
|
|
+
|
|
|
+ delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(),
|
|
|
+ putbatch, deletebatch);
|
|
|
+ addVolumeToOwnerList(volume, owner, putbatch);
|
|
|
+
|
|
|
+ KsmVolumeArgs newVolumeArgs =
|
|
|
+ KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
|
|
|
+ .setAdminName(volumeArgs.getAdminName())
|
|
|
+ .setOwnerName(owner)
|
|
|
+ .setQuotaInBytes(volumeArgs.getQuotaInBytes())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
|
|
|
+ putbatch.add(batchEntry(dbVolumeKey, newVolumeInfo.toByteArray()));
|
|
|
+
|
|
|
+ metadataManager.batchPutDelete(putbatch, deletebatch);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Changing volume ownership failed for user:{} volume:{}",
|
|
|
+ owner, volume, ex);
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ metadataManager.writeLock().unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Changes the Quota on a volume.
|
|
|
+ *
|
|
|
+ * @param volume - Name of the volume.
|
|
|
+ * @param quota - Quota in bytes.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void setQuota(String volume, long quota) throws IOException {
|
|
|
+ Preconditions.checkNotNull(volume);
|
|
|
+ metadataManager.writeLock().lock();
|
|
|
+ try {
|
|
|
+ byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
|
|
+ byte[] volInfo = metadataManager.get(dbVolumeKey);
|
|
|
+ if (volInfo == null) {
|
|
|
+ throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
|
|
+ }
|
|
|
+
|
|
|
+ VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
|
|
+ KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
|
|
|
+ Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
|
|
|
+
|
|
|
+ KsmVolumeArgs newVolumeArgs =
|
|
|
+ KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
|
|
|
+ .setAdminName(volumeArgs.getAdminName())
|
|
|
+ .setOwnerName(volumeArgs.getOwnerName())
|
|
|
+ .setQuotaInBytes(quota)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
|
|
|
+ metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray());
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Changing volume quota failed for volume:{} quota:{}",
|
|
|
+ volume, quota, ex);
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ metadataManager.writeLock().unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets the volume information.
|
|
|
+ * @param volume - Volume name.
|
|
|
+ * @return VolumeArgs or exception is thrown.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
|
|
|
+ Preconditions.checkNotNull(volume);
|
|
|
+ metadataManager.readLock().lock();
|
|
|
+ try {
|
|
|
+ byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
|
|
+ byte[] volInfo = metadataManager.get(dbVolumeKey);
|
|
|
+ if (volInfo == null) {
|
|
|
+ throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
|
|
+ }
|
|
|
+
|
|
|
+ VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
|
|
+ KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
|
|
|
+ Preconditions.checkState(volume.equalsIgnoreCase(volumeInfo.getVolume()));
|
|
|
+ return volumeArgs;
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Info volume failed for volume:{}", volume, ex);
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ metadataManager.readLock().unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|