|
@@ -34,7 +34,7 @@ import org.apache.hadoop.conf.*;
|
|
|
***************************************************/
|
|
|
class FSDataset implements FSConstants {
|
|
|
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* A node type that can be built into a tree reflecting the
|
|
|
* hierarchy of blocks on the local disk.
|
|
@@ -274,7 +274,7 @@ class FSDataset implements FSConstants {
|
|
|
this.volumes = volumes;
|
|
|
}
|
|
|
|
|
|
- FSVolume getNextVolume(long blockSize) throws IOException {
|
|
|
+ synchronized FSVolume getNextVolume(long blockSize) throws IOException {
|
|
|
int startVolume = curVolume;
|
|
|
while (true) {
|
|
|
FSVolume volume = volumes[curVolume];
|
|
@@ -302,25 +302,25 @@ class FSDataset implements FSConstants {
|
|
|
return remaining;
|
|
|
}
|
|
|
|
|
|
- void getBlockInfo(TreeSet<Block> blockSet) {
|
|
|
+ synchronized void getBlockInfo(TreeSet<Block> blockSet) {
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
volumes[idx].getBlockInfo(blockSet);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
|
|
|
+ synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
volumes[idx].getVolumeMap(volumeMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void getBlockMap(HashMap<Block, File> blockMap) {
|
|
|
+ synchronized void getBlockMap(HashMap<Block, File> blockMap) {
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
volumes[idx].getBlockMap(blockMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void checkDirs() throws DiskErrorException {
|
|
|
+ synchronized void checkDirs() throws DiskErrorException {
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
volumes[idx].checkDirs();
|
|
|
}
|
|
@@ -391,10 +391,11 @@ class FSDataset implements FSConstants {
|
|
|
/**
|
|
|
* Get a stream of data from the indicated block.
|
|
|
*/
|
|
|
- public InputStream getBlockData(Block b) throws IOException {
|
|
|
+ public synchronized InputStream getBlockData(Block b) throws IOException {
|
|
|
if (! isValidBlock(b)) {
|
|
|
throw new IOException("Block " + b + " is not valid.");
|
|
|
}
|
|
|
+ // File should be opened with the lock.
|
|
|
return new FileInputStream(getFile(b));
|
|
|
}
|
|
|
|
|
@@ -414,7 +415,7 @@ class FSDataset implements FSConstants {
|
|
|
// Serialize access to /tmp, and check if file already there.
|
|
|
//
|
|
|
File f = null;
|
|
|
- synchronized (ongoingCreates) {
|
|
|
+ synchronized ( this ) {
|
|
|
//
|
|
|
// Is it already in the create process?
|
|
|
//
|
|
@@ -422,11 +423,12 @@ class FSDataset implements FSConstants {
|
|
|
throw new IOException("Block " + b +
|
|
|
" has already been started (though not completed), and thus cannot be created.");
|
|
|
}
|
|
|
-
|
|
|
- FSVolume v = volumes.getNextVolume(blockSize);
|
|
|
-
|
|
|
- // create temporary file to hold block in the designated volume
|
|
|
- f = v.createTmpFile(b);
|
|
|
+ FSVolume v = null;
|
|
|
+ synchronized ( volumes ) {
|
|
|
+ v = volumes.getNextVolume(blockSize);
|
|
|
+ // create temporary file to hold block in the designated volume
|
|
|
+ f = v.createTmpFile(b);
|
|
|
+ }
|
|
|
ongoingCreates.put(b, f);
|
|
|
volumeMap.put(b, v);
|
|
|
}
|
|
@@ -450,8 +452,7 @@ class FSDataset implements FSConstants {
|
|
|
/**
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
- public void finalizeBlock(Block b) throws IOException {
|
|
|
- synchronized (ongoingCreates) {
|
|
|
+ public synchronized void finalizeBlock(Block b) throws IOException {
|
|
|
File f = ongoingCreates.get(b);
|
|
|
if (f == null || ! f.exists()) {
|
|
|
throw new IOException("No temporary file " + f + " for block " + b);
|
|
@@ -460,10 +461,12 @@ class FSDataset implements FSConstants {
|
|
|
b.setNumBytes(finalLen);
|
|
|
FSVolume v = volumeMap.get(b);
|
|
|
|
|
|
- File dest = v.addBlock(b, f);
|
|
|
+ File dest = null;
|
|
|
+ synchronized ( volumes ) {
|
|
|
+ dest = v.addBlock(b, f);
|
|
|
+ }
|
|
|
blockMap.put(b, dest);
|
|
|
ongoingCreates.remove(b);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -495,12 +498,15 @@ class FSDataset implements FSConstants {
|
|
|
*/
|
|
|
public void invalidate(Block invalidBlks[]) throws IOException {
|
|
|
for (int i = 0; i < invalidBlks.length; i++) {
|
|
|
- File f = getFile(invalidBlks[i]);
|
|
|
- if (!f.delete()) {
|
|
|
- throw new IOException("Unexpected error trying to delete block "
|
|
|
- + invalidBlks[i] + " at file " + f);
|
|
|
- }
|
|
|
- blockMap.remove(invalidBlks[i]);
|
|
|
+ synchronized ( this ) {
|
|
|
+ File f = getFile(invalidBlks[i]);
|
|
|
+ if (!f.delete()) {
|
|
|
+ throw new IOException("Unexpected error trying to delete block "
|
|
|
+ + invalidBlks[i] + " at file " + f);
|
|
|
+ }
|
|
|
+ blockMap.remove(invalidBlks[i]);
|
|
|
+ volumeMap.remove(invalidBlks[i]);
|
|
|
+ }
|
|
|
DataNode.LOG.info("Deleting block " + invalidBlks[i]);
|
|
|
}
|
|
|
}
|
|
@@ -508,7 +514,7 @@ class FSDataset implements FSConstants {
|
|
|
/**
|
|
|
* Turn the block identifier into a filename.
|
|
|
*/
|
|
|
- File getFile(Block b) {
|
|
|
+ synchronized File getFile(Block b) {
|
|
|
return blockMap.get(b);
|
|
|
}
|
|
|
|