|
@@ -31,6 +31,7 @@ import java.nio.channels.FileChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -721,96 +722,116 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
static class FSVolumeSet {
|
|
|
- FSVolume[] volumes = null;
|
|
|
- int curVolume = 0;
|
|
|
+ /*
|
|
|
+ * Read access to this unmodifiable list is not synchronized.
|
|
|
+ * This list is replaced on modification holding "this" lock.
|
|
|
+ */
|
|
|
+ private volatile List<FSVolume> volumes = null;
|
|
|
+ private int curVolume = 0; // Synchronized using "this"
|
|
|
|
|
|
FSVolumeSet(FSVolume[] volumes) {
|
|
|
- this.volumes = volumes;
|
|
|
+ List<FSVolume> list = Arrays.asList(volumes);
|
|
|
+ this.volumes = Collections.unmodifiableList(list);
|
|
|
}
|
|
|
|
|
|
private int numberOfVolumes() {
|
|
|
- return volumes.length;
|
|
|
+ return volumes.size();
|
|
|
}
|
|
|
|
|
|
- synchronized FSVolume getNextVolume(long blockSize) throws IOException {
|
|
|
-
|
|
|
- if(volumes.length < 1) {
|
|
|
- throw new DiskOutOfSpaceException("No more available volumes");
|
|
|
- }
|
|
|
-
|
|
|
- // since volumes could've been removed because of the failure
|
|
|
- // make sure we are not out of bounds
|
|
|
- if(curVolume >= volumes.length) {
|
|
|
- curVolume = 0;
|
|
|
- }
|
|
|
-
|
|
|
- int startVolume = curVolume;
|
|
|
-
|
|
|
- while (true) {
|
|
|
- FSVolume volume = volumes[curVolume];
|
|
|
- curVolume = (curVolume + 1) % volumes.length;
|
|
|
- if (volume.getAvailable() > blockSize) { return volume; }
|
|
|
- if (curVolume == startVolume) {
|
|
|
- throw new DiskOutOfSpaceException("Insufficient space for an additional block");
|
|
|
+ /**
|
|
|
+ * Get next volume. Synchronized to ensure {@link #curVolume} is updated
|
|
|
+ * by a single thread and next volume is chosen with no concurrent
|
|
|
+ * update to {@link #volumes}.
|
|
|
+ * @param blockSize free space needed on the volume
|
|
|
+ * @return next volume to store the block in.
|
|
|
+ */
|
|
|
+ FSVolume getNextVolume(long blockSize) throws IOException {
|
|
|
+ synchronized(this) {
|
|
|
+ if(volumes.size() < 1) {
|
|
|
+ throw new DiskOutOfSpaceException("No more available volumes");
|
|
|
+ }
|
|
|
+ // since volumes could've been removed because of the failure
|
|
|
+ // make sure we are not out of bounds
|
|
|
+ if(curVolume >= volumes.size()) {
|
|
|
+ curVolume = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ int startVolume = curVolume;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ FSVolume volume = volumes.get(curVolume);
|
|
|
+ curVolume = (curVolume + 1) % volumes.size();
|
|
|
+ if (volume.getAvailable() > blockSize) { return volume; }
|
|
|
+ if (curVolume == startVolume) {
|
|
|
+ throw new DiskOutOfSpaceException(
|
|
|
+ "Insufficient space for an additional block");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- long getDfsUsed() throws IOException {
|
|
|
+ private long getDfsUsed() throws IOException {
|
|
|
long dfsUsed = 0L;
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- dfsUsed += volumes[idx].getDfsUsed();
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ dfsUsed += vol.getDfsUsed();
|
|
|
}
|
|
|
return dfsUsed;
|
|
|
}
|
|
|
|
|
|
- long getBlockPoolUsed(String bpid) throws IOException {
|
|
|
+ private long getBlockPoolUsed(String bpid) throws IOException {
|
|
|
long dfsUsed = 0L;
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- dfsUsed += volumes[idx].getBlockPoolUsed(bpid);
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ dfsUsed += vol.getBlockPoolUsed(bpid);
|
|
|
}
|
|
|
return dfsUsed;
|
|
|
}
|
|
|
|
|
|
- long getCapacity() throws IOException {
|
|
|
+ private long getCapacity() throws IOException {
|
|
|
long capacity = 0L;
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- capacity += volumes[idx].getCapacity();
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ capacity += vol.getCapacity();
|
|
|
}
|
|
|
return capacity;
|
|
|
}
|
|
|
|
|
|
- long getRemaining() throws IOException {
|
|
|
+ private long getRemaining() throws IOException {
|
|
|
long remaining = 0L;
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- remaining += volumes[idx].getAvailable();
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ remaining += vol.getAvailable();
|
|
|
}
|
|
|
return remaining;
|
|
|
}
|
|
|
|
|
|
- synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- volumes[idx].getVolumeMap(volumeMap);
|
|
|
+ private void getVolumeMap(ReplicasMap volumeMap)
|
|
|
+ throws IOException {
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ vol.getVolumeMap(volumeMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized void getVolumeMap(String bpid, ReplicasMap volumeMap)
|
|
|
+ private void getVolumeMap(String bpid, ReplicasMap volumeMap)
|
|
|
throws IOException {
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- volumes[idx].getVolumeMap(bpid, volumeMap);
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ vol.getVolumeMap(bpid, volumeMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Calls {@link FSVolume#checkDirs()} on each volume, removing any
|
|
|
* volumes from the active list that result in a DiskErrorException.
|
|
|
+ *
|
|
|
+ * This method is synchronized to allow only one instance of checkDirs()
|
|
|
+ * call
|
|
|
* @return list of all the removed volumes.
|
|
|
*/
|
|
|
- synchronized List<FSVolume> checkDirs() {
|
|
|
- ArrayList<FSVolume> removedVols = null;
|
|
|
+ private synchronized List<FSVolume> checkDirs() {
|
|
|
+ ArrayList<FSVolume> removedVols = null;
|
|
|
+
|
|
|
+ // Make a copy of volumes for performing modification
|
|
|
+ List<FSVolume> volumeList = new ArrayList<FSVolume>(getVolumes());
|
|
|
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- FSVolume fsv = volumes[idx];
|
|
|
+ for (int idx = 0; idx < volumeList.size(); idx++) {
|
|
|
+ FSVolume fsv = volumeList.get(idx);
|
|
|
try {
|
|
|
fsv.checkDirs();
|
|
|
} catch (DiskErrorException e) {
|
|
@@ -818,21 +839,20 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
if (removedVols == null) {
|
|
|
removedVols = new ArrayList<FSVolume>(1);
|
|
|
}
|
|
|
- removedVols.add(volumes[idx]);
|
|
|
- volumes[idx] = null; // Remove the volume
|
|
|
+ removedVols.add(volumeList.get(idx));
|
|
|
+ volumeList.set(idx, null); // Remove the volume
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Remove null volumes from the volumes array
|
|
|
if (removedVols != null && removedVols.size() > 0) {
|
|
|
- FSVolume newVols[] = new FSVolume[volumes.length - removedVols.size()];
|
|
|
- int i = 0;
|
|
|
- for (FSVolume vol : volumes) {
|
|
|
+ List<FSVolume> newVols = new ArrayList<FSVolume>();
|
|
|
+ for (FSVolume vol : volumeList) {
|
|
|
if (vol != null) {
|
|
|
- newVols[i++] = vol;
|
|
|
+ newVols.add(vol);
|
|
|
}
|
|
|
}
|
|
|
- volumes = newVols; // Replace array of volumes
|
|
|
+ volumes = Collections.unmodifiableList(newVols); // Replace volume list
|
|
|
DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
|
|
|
+ removedVols.size() + " volumes. List of current volumes: "
|
|
|
+ this);
|
|
@@ -842,29 +862,39 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
public String toString() {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- sb.append(volumes[idx].toString());
|
|
|
- if (idx != volumes.length - 1) { sb.append(","); }
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
+ return volumes.toString();
|
|
|
}
|
|
|
|
|
|
- public boolean isValid(FSVolume volume) {
|
|
|
- for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- if (volumes[idx] == volume) {
|
|
|
+ boolean isValid(FSVolume volume) {
|
|
|
+ for (FSVolume vol : volumes) {
|
|
|
+ if (vol == volume) {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- public void addBlockPool(String bpid, Configuration conf)
|
|
|
+ private void addBlockPool(String bpid, Configuration conf)
|
|
|
throws IOException {
|
|
|
for (FSVolume v : volumes) {
|
|
|
v.addBlockPool(bpid, conf);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return unmodifiable list of volumes
|
|
|
+ */
|
|
|
+ public List<FSVolume> getVolumes() {
|
|
|
+ return volumes;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void shutdown() {
|
|
|
+ for (FSVolume volume : volumes) {
|
|
|
+ if(volume != null) {
|
|
|
+ volume.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//////////////////////////////////////////////////////
|
|
@@ -2018,11 +2048,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
if(volumes != null) {
|
|
|
- for (FSVolume volume : volumes.volumes) {
|
|
|
- if(volume != null) {
|
|
|
- volume.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
+ volumes.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2393,28 +2419,23 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized Collection<VolumeInfo> getVolumeInfo() {
|
|
|
+ Collection<VolumeInfo> getVolumeInfo() {
|
|
|
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
|
|
|
- synchronized(volumes.volumes) {
|
|
|
- for (FSVolume volume : volumes.volumes) {
|
|
|
- long used = 0;
|
|
|
- try {
|
|
|
- used = volume.getDfsUsed();
|
|
|
- } catch (IOException e) {
|
|
|
- DataNode.LOG.warn(e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- long free= 0;
|
|
|
- try {
|
|
|
- free = volume.getAvailable();
|
|
|
- } catch (IOException e) {
|
|
|
- DataNode.LOG.warn(e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- info.add(new VolumeInfo(volume.toString(), used, free,
|
|
|
- volume.getReserved()));
|
|
|
+ for (FSVolume volume : volumes.volumes) {
|
|
|
+ long used = 0;
|
|
|
+ long free = 0;
|
|
|
+ try {
|
|
|
+ used = volume.getDfsUsed();
|
|
|
+ free = volume.getAvailable();
|
|
|
+ } catch (IOException e) {
|
|
|
+ DataNode.LOG.warn(e.getMessage());
|
|
|
+ used = 0;
|
|
|
+ free = 0;
|
|
|
}
|
|
|
- return info;
|
|
|
+
|
|
|
+ info.add(new VolumeInfo(volume.toString(), used, free,
|
|
|
+ volume.getReserved()));
|
|
|
}
|
|
|
+ return info;
|
|
|
}
|
|
|
}
|