|
@@ -17,24 +17,40 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.FilenameFilter;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.RandomAccessFile;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.TreeSet;
|
|
|
|
|
|
import javax.management.NotCompliantMBeanException;
|
|
|
import javax.management.ObjectName;
|
|
|
import javax.management.StandardMBean;
|
|
|
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.DF;
|
|
|
+import org.apache.hadoop.fs.DU;
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
import org.apache.hadoop.metrics.util.MBeanUtil;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
+import org.mortbay.log.Log;
|
|
|
|
|
|
/**************************************************
|
|
|
* FSDataset manages a set of data blocks. Each block
|
|
@@ -485,9 +501,25 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
FSVolumeSet(FSVolume[] volumes) {
|
|
|
this.volumes = volumes;
|
|
|
}
|
|
|
+
|
|
|
+ private int numberOfVolumes() {
|
|
|
+ return volumes.length;
|
|
|
+ }
|
|
|
|
|
|
synchronized FSVolume getNextVolume(long blockSize) throws IOException {
|
|
|
+
|
|
|
+ if(volumes.length < 1) {
|
|
|
+ throw new DiskOutOfSpaceException("No more available volumes");
|
|
|
+ }
|
|
|
+
|
|
|
+ // since volumes could've been removed because of the failure
|
|
|
+ // make sure we are not out of bounds
|
|
|
+ if(curVolume >= volumes.length) {
|
|
|
+ curVolume = 0;
|
|
|
+ }
|
|
|
+
|
|
|
int startVolume = curVolume;
|
|
|
+
|
|
|
while (true) {
|
|
|
FSVolume volume = volumes[curVolume];
|
|
|
curVolume = (curVolume + 1) % volumes.length;
|
|
@@ -534,10 +566,46 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized void checkDirs() throws DiskErrorException {
|
|
|
+ /**
|
|
|
+ * goes over all the volumes and checkDir eachone of them
|
|
|
+ * if one throws DiskErrorException - removes from the list of active
|
|
|
+ * volumes.
|
|
|
+ * @return list of all the removed volumes
|
|
|
+ */
|
|
|
+ synchronized List<FSVolume> checkDirs() {
|
|
|
+
|
|
|
+ ArrayList<FSVolume> removed_vols = null;
|
|
|
+
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
- volumes[idx].checkDirs();
|
|
|
+ FSVolume fsv = volumes[idx];
|
|
|
+ try {
|
|
|
+ fsv.checkDirs();
|
|
|
+ } catch (DiskErrorException e) {
|
|
|
+ DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
|
|
|
+ if(removed_vols == null) {
|
|
|
+ removed_vols = new ArrayList<FSVolume>(1);
|
|
|
+ }
|
|
|
+ removed_vols.add(volumes[idx]);
|
|
|
+ volumes[idx] = null; //remove the volume
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // repair array - copy non null elements
|
|
|
+ int removed_size = (removed_vols==null)? 0 : removed_vols.size();
|
|
|
+ if(removed_size > 0) {
|
|
|
+ FSVolume fsvs[] = new FSVolume [volumes.length-removed_size];
|
|
|
+ for(int idx=0,idy=0; idx<volumes.length; idx++) {
|
|
|
+ if(volumes[idx] != null) {
|
|
|
+ fsvs[idy] = volumes[idx];
|
|
|
+ idy++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ volumes = fsvs; // replace array of volumes
|
|
|
}
|
|
|
+ Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size +
|
|
|
+ "volumes. List of current volumes: " + toString());
|
|
|
+
|
|
|
+ return removed_vols;
|
|
|
}
|
|
|
|
|
|
public String toString() {
|
|
@@ -700,7 +768,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
public long getDfsUsed() throws IOException {
|
|
|
return volumes.getDfsUsed();
|
|
|
}
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Return true - if there are still valid volumes
|
|
|
+ * on the DataNode
|
|
|
+ */
|
|
|
+ public boolean hasEnoughResource(){
|
|
|
+ return volumes.numberOfVolumes() >= MIN_NUM_OF_VALID_VOLUMES;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Return total capacity, used and unused
|
|
|
*/
|
|
@@ -1226,17 +1301,32 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Check whether the given block is a valid one.
|
|
|
*/
|
|
|
public boolean isValidBlock(Block b) {
|
|
|
- return validateBlockFile(b) != null;
|
|
|
+ File f = null;;
|
|
|
+ try {
|
|
|
+ f = validateBlockFile(b);
|
|
|
+ } catch(IOException e) {
|
|
|
+ Log.warn("Block " + b + " is not valid:",e);
|
|
|
+ }
|
|
|
+
|
|
|
+ return f != null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Find the file corresponding to the block and return it if it exists.
|
|
|
*/
|
|
|
- File validateBlockFile(Block b) {
|
|
|
+ File validateBlockFile(Block b) throws IOException {
|
|
|
//Should we check for metadata file too?
|
|
|
File f = getFile(b);
|
|
|
- if(f != null && f.exists())
|
|
|
- return f;
|
|
|
+
|
|
|
+ if(f != null ) {
|
|
|
+ if(f.exists())
|
|
|
+ return f;
|
|
|
+
|
|
|
+ // if file is not null, but doesn't exist - possibly disk failed
|
|
|
+ DataNode datanode = DataNode.getDataNode();
|
|
|
+ datanode.checkDiskError();
|
|
|
+ }
|
|
|
+
|
|
|
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
|
InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
|
|
|
}
|
|
@@ -1375,10 +1465,51 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/**
|
|
|
* check if a data directory is healthy
|
|
|
+ * if some volumes failed - make sure to remove all the blocks that belong
|
|
|
+ * to these volumes
|
|
|
* @throws DiskErrorException
|
|
|
*/
|
|
|
public void checkDataDir() throws DiskErrorException {
|
|
|
- volumes.checkDirs();
|
|
|
+ long total_blocks=0, removed_blocks=0;
|
|
|
+ List<FSVolume> failed_vols = volumes.checkDirs();
|
|
|
+
|
|
|
+ //if there no failed volumes return
|
|
|
+ if(failed_vols == null)
|
|
|
+ return;
|
|
|
+
|
|
|
+ // else
|
|
|
+ // remove related blocks
|
|
|
+ long mlsec = System.currentTimeMillis();
|
|
|
+ synchronized (this) {
|
|
|
+ Iterator<Block> ib = volumeMap.keySet().iterator();
|
|
|
+ while(ib.hasNext()) {
|
|
|
+ Block b = ib.next();
|
|
|
+ total_blocks ++;
|
|
|
+ // check if the volume block belongs to still valid
|
|
|
+ FSVolume vol = volumeMap.get(b).getVolume();
|
|
|
+ for(FSVolume fv: failed_vols) {
|
|
|
+ if(vol == fv) {
|
|
|
+ DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol "
|
|
|
+ + vol.dataDir.dir.getAbsolutePath());
|
|
|
+ ib.remove();
|
|
|
+ removed_blocks++;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } // end of sync
|
|
|
+ mlsec = System.currentTimeMillis() - mlsec;
|
|
|
+ DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
|
|
|
+ "(took " + mlsec + " millisecs)");
|
|
|
+
|
|
|
+ // report the error
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for(FSVolume fv : failed_vols) {
|
|
|
+ sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
|
|
|
+ }
|
|
|
+
|
|
|
+ throw new DiskErrorException("DataNode failed volumes:" + sb);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|