|
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
@@ -64,7 +65,7 @@ public class DataBlockScanner implements Runnable {
|
|
|
static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
|
|
|
private static final long ONE_DAY = 24*3600*1000L;
|
|
|
|
|
|
- static DateFormat dateFormat =
|
|
|
+ static final DateFormat dateFormat =
|
|
|
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
|
|
|
|
|
|
static final String verificationLogFile = "dncp_block_verification.log";
|
|
@@ -177,6 +178,19 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Update blockMap by the given LogEntry */
|
|
|
+ private synchronized void updateBlockInfo(LogEntry e) {
|
|
|
+ BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
|
|
|
+
|
|
|
+ if(info != null && e.verificationTime > 0 &&
|
|
|
+ info.lastScanTime < e.verificationTime) {
|
|
|
+ delBlockInfo(info);
|
|
|
+ info.lastScanTime = e.verificationTime;
|
|
|
+ info.lastScanType = ScanType.VERIFICATION_SCAN;
|
|
|
+ addBlockInfo(info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void init() {
|
|
|
|
|
|
// get the list of blocks and arrange them in random order
|
|
@@ -353,12 +367,17 @@ public class DataBlockScanner implements Runnable {
|
|
|
String name = matcher.group(1);
|
|
|
String value = matcher.group(2);
|
|
|
|
|
|
- if (name.equals("id")) {
|
|
|
- entry.blockId = Long.valueOf(value);
|
|
|
- } else if (name.equals("time")) {
|
|
|
- entry.verificationTime = Long.valueOf(value);
|
|
|
- } else if (name.equals("genstamp")) {
|
|
|
- entry.genStamp = Long.valueOf(value);
|
|
|
+ try {
|
|
|
+ if (name.equals("id")) {
|
|
|
+ entry.blockId = Long.valueOf(value);
|
|
|
+ } else if (name.equals("time")) {
|
|
|
+ entry.verificationTime = Long.valueOf(value);
|
|
|
+ } else if (name.equals("genstamp")) {
|
|
|
+ entry.genStamp = Long.valueOf(value);
|
|
|
+ }
|
|
|
+ } catch(NumberFormatException nfe) {
|
|
|
+ LOG.warn("Cannot parse line: " + line, nfe);
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -456,11 +475,10 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** returns false if the process was interrupted
|
|
|
+ * because the thread is marked to exit.
|
|
|
+ */
|
|
|
private boolean assignInitialVerificationTimes() {
|
|
|
- /* returns false if the process was interrupted
|
|
|
- * because the thread is marked to exit.
|
|
|
- */
|
|
|
-
|
|
|
int numBlocks = 1;
|
|
|
synchronized (this) {
|
|
|
numBlocks = Math.max(blockMap.size(), 1);
|
|
@@ -470,7 +488,7 @@ public class DataBlockScanner implements Runnable {
|
|
|
LogFileHandler.Reader logReader = null;
|
|
|
try {
|
|
|
if (verificationLog != null) {
|
|
|
- logReader = verificationLog.newReader();
|
|
|
+ logReader = verificationLog.new Reader(false);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Could not read previous verification times : " +
|
|
@@ -482,24 +500,13 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
|
|
|
// update verification times from the verificationLog.
|
|
|
- Block tmpBlock = new Block();
|
|
|
while (logReader != null && logReader.hasNext()) {
|
|
|
if (!datanode.shouldRun || Thread.interrupted()) {
|
|
|
return false;
|
|
|
}
|
|
|
- String line = logReader.next();
|
|
|
- LogEntry entry = LogEntry.parseEntry(line);
|
|
|
- synchronized (this) {
|
|
|
- tmpBlock.set(entry.blockId, 0, entry.genStamp);
|
|
|
- BlockScanInfo info = blockMap.get(tmpBlock);
|
|
|
-
|
|
|
- if(info != null && entry.verificationTime > 0 &&
|
|
|
- info.lastScanTime < entry.verificationTime) {
|
|
|
- delBlockInfo(info);
|
|
|
- info.lastScanTime = entry.verificationTime;
|
|
|
- info.lastScanType = ScanType.VERIFICATION_SCAN;
|
|
|
- addBlockInfo(info);
|
|
|
- }
|
|
|
+ LogEntry entry = LogEntry.parseEntry(logReader.next());
|
|
|
+ if (entry != null) {
|
|
|
+ updateBlockInfo(entry);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -669,7 +676,7 @@ public class DataBlockScanner implements Runnable {
|
|
|
private static final String curFileSuffix = ".curr";
|
|
|
private static final String prevFileSuffix = ".prev";
|
|
|
|
|
|
- /// Don't roll files more aften than this
|
|
|
+ // Don't roll files more often than this
|
|
|
private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
|
|
|
private static final long minWarnPeriod = minRollingPeriod;
|
|
|
private static final int minLineLimit = 1000;
|
|
@@ -687,14 +694,14 @@ public class DataBlockScanner implements Runnable {
|
|
|
|
|
|
long lastWarningTime = 0;
|
|
|
|
|
|
- PrintStream out;
|
|
|
+ private PrintStream out;
|
|
|
|
|
|
int numReaders = 0;
|
|
|
|
|
|
/**
|
|
|
* Opens the log file for appending.
|
|
|
* Note that rolling will happen only after "updateLineCount()" is
|
|
|
- * called. This is so that line count could be updated in a seprate
|
|
|
+ * called. This is so that line count could be updated in a separate
|
|
|
* thread without delaying start up.
|
|
|
*
|
|
|
* @param dir where the logs files are located.
|
|
@@ -717,7 +724,7 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This appends "line\n". Note "\n".
|
|
|
+ * Append "\n" + line.
|
|
|
* If the log file need to be rolled, it will done after
|
|
|
* appending the text.
|
|
|
* This does not throw IOException when there is an error while
|
|
@@ -726,7 +733,8 @@ public class DataBlockScanner implements Runnable {
|
|
|
* return true if append was successful.
|
|
|
*/
|
|
|
synchronized boolean appendLine(String line) {
|
|
|
- out.println(line);
|
|
|
+ out.println();
|
|
|
+ out.print(line);
|
|
|
curNumLines += (curNumLines < 0) ? -1 : 1;
|
|
|
try {
|
|
|
rollIfRequired();
|
|
@@ -746,10 +754,8 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void openCurFile() throws IOException {
|
|
|
- if (out != null) {
|
|
|
- out.close();
|
|
|
- }
|
|
|
+ private synchronized void openCurFile() throws FileNotFoundException {
|
|
|
+ close();
|
|
|
out = new PrintStream(new FileOutputStream(curFile, true));
|
|
|
}
|
|
|
|
|
@@ -783,7 +789,7 @@ public class DataBlockScanner implements Runnable {
|
|
|
throw new IOException("Could not delete " + prevFile);
|
|
|
}
|
|
|
|
|
|
- out.close();
|
|
|
+ close();
|
|
|
|
|
|
if (!curFile.renameTo(prevFile)) {
|
|
|
openCurFile();
|
|
@@ -802,10 +808,6 @@ public class DataBlockScanner implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Reader newReader() throws IOException {
|
|
|
- return new Reader(false);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* This is used to read the lines in order.
|
|
|
* If the data is not read completely (i.e, untill hasNext() returns
|