|
@@ -0,0 +1,925 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.dfs;
|
|
|
+
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.FileReader;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.PrintStream;
|
|
|
+import java.text.DateFormat;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.TreeSet;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+
|
|
|
+import javax.servlet.http.HttpServlet;
|
|
|
+import javax.servlet.http.HttpServletRequest;
|
|
|
+import javax.servlet.http.HttpServletResponse;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.dfs.DataNode.BlockSender;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+
|
|
|
+/*
|
|
|
+ * This keeps track of blocks and their last verification times.
|
|
|
+ * Currently it does not modify the metadata for block.
|
|
|
+ */
|
|
|
+
|
|
|
+public class DataBlockScanner implements Runnable {
|
|
|
+
|
|
|
+ public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
|
|
|
+
|
|
|
+ private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
|
|
|
+ private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
|
|
|
+
|
|
|
+ static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
|
|
|
+ private static final long ONE_DAY = 24*3600*1000L;
|
|
|
+
|
|
|
+ static DateFormat dateFormat =
|
|
|
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
|
|
|
+
|
|
|
+ static final String verificationLogFile = "dncp_block_verification.log";
|
|
|
+ static final int verficationLogLimit = 5; // * numBlocks.
|
|
|
+
|
|
|
+ private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
|
|
|
+ DataNode datanode;
|
|
|
+ FSDataset dataset;
|
|
|
+
|
|
|
+ // sorted set
|
|
|
+ TreeSet<BlockScanInfo> blockInfoSet;
|
|
|
+ HashMap<Block, BlockScanInfo> blockMap;
|
|
|
+
|
|
|
+ long totalScans = 0;
|
|
|
+ long totalVerifications = 0; // includes remote verification by clients.
|
|
|
+ long totalScanErrors = 0;
|
|
|
+ long totalTransientErrors = 0;
|
|
|
+
|
|
|
+ long currentPeriodStart = System.currentTimeMillis();
|
|
|
+ long bytesLeft = 0; // Bytes to scan in this period
|
|
|
+ long totalBytesToScan = 0;
|
|
|
+
|
|
|
+ private LogFileHandler verificationLog;
|
|
|
+
|
|
|
+ Random random = new Random();
|
|
|
+
|
|
|
+ DataNode.Throttler throttler = new DataNode.Throttler(200, MAX_SCAN_RATE);
|
|
|
+
|
|
|
+ private static enum ScanType {
|
|
|
+ REMOTE_READ, // Verified when a block read by a client etc
|
|
|
+ VERIFICATION_SCAN, // scanned as part of periodic verfication
|
|
|
+ NONE,
|
|
|
+ }
|
|
|
+
|
|
|
+ static class BlockScanInfo implements Comparable<BlockScanInfo> {
|
|
|
+ Block block;
|
|
|
+ long lastScanTime = 0;
|
|
|
+ long lastLogTime = 0;
|
|
|
+ ScanType lastScanType = ScanType.NONE;
|
|
|
+ boolean lastScanOk = true;
|
|
|
+
|
|
|
+ BlockScanInfo(Block block) {
|
|
|
+ this.block = block;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int hashCode() {
|
|
|
+ return block.hashCode();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean equals(Object other) {
|
|
|
+ return other instanceof BlockScanInfo &&
|
|
|
+ compareTo((BlockScanInfo)other) == 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getLastScanTime() {
|
|
|
+ return ( lastScanType == ScanType.NONE) ? 0 : lastScanTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int compareTo(BlockScanInfo other) {
|
|
|
+ long t1 = lastScanTime;
|
|
|
+ long t2 = other.lastScanTime;
|
|
|
+ return ( t1 < t2 ) ? -1 :
|
|
|
+ (( t1 > t2 ) ? 1 : block.compareTo(other.block));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
|
|
|
+ this.datanode = datanode;
|
|
|
+ this.dataset = dataset;
|
|
|
+ scanPeriod = conf.getInt("dfs.datanode.scan.period.hours", 0);
|
|
|
+ if ( scanPeriod <= 0 ) {
|
|
|
+ scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
|
|
|
+ }
|
|
|
+ scanPeriod *= 3600 * 1000;
|
|
|
+ init();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateBytesToScan(long len, long lastScanTime) {
|
|
|
+ // len could be negative when a block is deleted.
|
|
|
+ totalBytesToScan += len;
|
|
|
+ if ( lastScanTime < currentPeriodStart ) {
|
|
|
+ bytesLeft += len;
|
|
|
+ }
|
|
|
+ // Should we change throttler bandwidth every time bytesLeft changes?
|
|
|
+ // not really required.
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void addBlockInfo(BlockScanInfo info) {
|
|
|
+ boolean added = blockInfoSet.add(info);
|
|
|
+ blockMap.put(info.block, info);
|
|
|
+
|
|
|
+ if ( added ) {
|
|
|
+ LogFileHandler log = verificationLog;
|
|
|
+ if (log != null) {
|
|
|
+ log.setMaxNumLines(blockMap.size() * verficationLogLimit);
|
|
|
+ }
|
|
|
+ updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void delBlockInfo(BlockScanInfo info) {
|
|
|
+ boolean exists = blockInfoSet.remove(info);
|
|
|
+ blockMap.remove(info.block);
|
|
|
+ if ( exists ) {
|
|
|
+ LogFileHandler log = verificationLog;
|
|
|
+ if (log != null) {
|
|
|
+ log.setMaxNumLines(blockMap.size() * verficationLogLimit);
|
|
|
+ }
|
|
|
+ updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void init() {
|
|
|
+
|
|
|
+ // get the list of blocks and arrange them in random order
|
|
|
+ Block arr[] = dataset.getBlockReport();
|
|
|
+ Collections.shuffle(Arrays.asList(arr));
|
|
|
+
|
|
|
+ blockInfoSet = new TreeSet<BlockScanInfo>();
|
|
|
+ blockMap = new HashMap<Block, BlockScanInfo>();
|
|
|
+
|
|
|
+ long scanTime = -1;
|
|
|
+ for (Block block : arr) {
|
|
|
+ BlockScanInfo info = new BlockScanInfo( block );
|
|
|
+ info.lastScanTime = scanTime--;
|
|
|
+ //still keep 'info.lastScanType' to NONE.
|
|
|
+ addBlockInfo(info);
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Pick the first directory that has any existing sanner log.
|
|
|
+ * otherwise, pick the first directory.
|
|
|
+ */
|
|
|
+ File dir = null;
|
|
|
+ FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
|
|
|
+ for(FSDataset.FSVolume vol : volumes) {
|
|
|
+ if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
|
|
|
+ dir = vol.getDir();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (dir == null) {
|
|
|
+ dir = volumes[0].getDir();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // max lines will be updated later during initialization.
|
|
|
+ verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Could not open verfication log. " +
|
|
|
+ "Verification times are not stored.");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized long getNewBlockScanTime() {
|
|
|
+ /* If there are a lot of blocks, this returns a random time with in
|
|
|
+ * the scan period. Otherwise something sooner.
|
|
|
+ */
|
|
|
+ long period = Math.min(scanPeriod,
|
|
|
+ Math.max(blockMap.size(),1) * 60 * 1000L);
|
|
|
+ return System.currentTimeMillis() - scanPeriod +
|
|
|
+ random.nextInt((int)period);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Adds block to list of blocks */
|
|
|
+ synchronized void addBlock(Block block) {
|
|
|
+
|
|
|
+ BlockScanInfo info = blockMap.get(block);
|
|
|
+ if ( info != null ) {
|
|
|
+ LOG.warn("Adding an already existing block " + block);
|
|
|
+ delBlockInfo(info);
|
|
|
+ }
|
|
|
+
|
|
|
+ info = new BlockScanInfo(block);
|
|
|
+ info.lastScanTime = getNewBlockScanTime();
|
|
|
+
|
|
|
+ addBlockInfo(info);
|
|
|
+ adjustThrottler();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Deletes the block from internal structures */
|
|
|
+ synchronized void deleteBlock(Block block) {
|
|
|
+ BlockScanInfo info = blockMap.get(block);
|
|
|
+ if ( info != null ) {
|
|
|
+ delBlockInfo(info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Deletes blocks from internal structures */
|
|
|
+ void deleteBlocks(Block[] blocks) {
|
|
|
+ for ( Block b : blocks ) {
|
|
|
+ deleteBlock(b);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void verifiedByClient(Block block) {
|
|
|
+ updateScanStatus(block, ScanType.REMOTE_READ, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void updateScanStatus(Block block,
|
|
|
+ ScanType type,
|
|
|
+ boolean scanOk) {
|
|
|
+ BlockScanInfo info = blockMap.get(block);
|
|
|
+
|
|
|
+ if ( info != null ) {
|
|
|
+ delBlockInfo(info);
|
|
|
+ } else {
|
|
|
+ // It might already be removed. Thats ok, it will be caught next time.
|
|
|
+ info = new BlockScanInfo(block);
|
|
|
+ }
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ info.lastScanType = type;
|
|
|
+ info.lastScanTime = now;
|
|
|
+ info.lastScanOk = scanOk;
|
|
|
+ addBlockInfo(info);
|
|
|
+
|
|
|
+ if (type == ScanType.REMOTE_READ) {
|
|
|
+ totalVerifications++;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Don't update meta data too often in case of REMOTE_READ
|
|
|
+ // of if the verification failed.
|
|
|
+ long diff = now - info.lastLogTime;
|
|
|
+ if (!scanOk || (type == ScanType.REMOTE_READ &&
|
|
|
+ diff < scanPeriod/3 && diff < ONE_DAY)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ info.lastLogTime = now;
|
|
|
+ LogFileHandler log = verificationLog;
|
|
|
+ if (log != null) {
|
|
|
+ log.appendLine(LogEntry.newEnry(block, now));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleScanFailure(Block block) {
|
|
|
+
|
|
|
+ LOG.info("Reporting bad block " + block + " to namenode.");
|
|
|
+
|
|
|
+ try {
|
|
|
+ DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
|
|
|
+ LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
|
|
|
+ datanode.namenode.reportBadBlocks(blocks);
|
|
|
+ } catch (IOException e){
|
|
|
+ /* One common reason is that NameNode could be in safe mode.
|
|
|
+ * Should we keep on retrying in that case?
|
|
|
+ */
|
|
|
+ LOG.warn("Failed to report bad block " + block + " to namenode : " +
|
|
|
+ " Exception : " + StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static private class LogEntry {
|
|
|
+ long blockId = -1;
|
|
|
+ long verificationTime = -1;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The format consists of single line with multiple entries. each
|
|
|
+ * entry is in the form : name="value".
|
|
|
+ * This simple text and easily extendable and easily parseable with a
|
|
|
+ * regex.
|
|
|
+ */
|
|
|
+ private static Pattern entryPattern =
|
|
|
+ Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
|
|
|
+
|
|
|
+ static String newEnry(Block block, long time) {
|
|
|
+ return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
|
|
|
+ "time=\"" + time + "\"\t " +
|
|
|
+ "id=\"" + block.getBlockId() +"\"";
|
|
|
+ }
|
|
|
+
|
|
|
+ static LogEntry parseEntry(String line) {
|
|
|
+ LogEntry entry = new LogEntry();
|
|
|
+
|
|
|
+ Matcher matcher = entryPattern.matcher(line);
|
|
|
+ while (matcher.find()) {
|
|
|
+ String name = matcher.group(1);
|
|
|
+ String value = matcher.group(2);
|
|
|
+
|
|
|
+ if (name.equals("id")) {
|
|
|
+ entry.blockId = Long.valueOf(value);
|
|
|
+ } else if (name.equals("time")) {
|
|
|
+ entry.verificationTime = Long.valueOf(value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return entry;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void adjustThrottler() {
|
|
|
+ long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
|
|
|
+ long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
|
|
|
+ throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyBlock(Block block) {
|
|
|
+
|
|
|
+ BlockSender blockSender = null;
|
|
|
+
|
|
|
+ /* In case of failure, attempt to read second time to reduce
|
|
|
+ * transient errors. How do we flush block data from kernel
|
|
|
+ * buffers before the second read?
|
|
|
+ */
|
|
|
+ for (int i=0; i<2; i++) {
|
|
|
+ boolean second = (i > 0);
|
|
|
+
|
|
|
+ try {
|
|
|
+ adjustThrottler();
|
|
|
+
|
|
|
+ blockSender = datanode.new BlockSender(block, 0, -1, false,
|
|
|
+ false, true);
|
|
|
+
|
|
|
+ DataOutputStream out =
|
|
|
+ new DataOutputStream(new IOUtils.NullOutputStream());
|
|
|
+
|
|
|
+ blockSender.sendBlock(out, throttler);
|
|
|
+
|
|
|
+ LOG.info((second ? "Second " : "") +
|
|
|
+ "Verification succeeded for " + block);
|
|
|
+
|
|
|
+ if ( second ) {
|
|
|
+ totalTransientErrors++;
|
|
|
+ }
|
|
|
+
|
|
|
+ updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
|
|
|
+
|
|
|
+ return;
|
|
|
+ } catch (IOException e) {
|
|
|
+
|
|
|
+ totalScanErrors++;
|
|
|
+ updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
|
|
|
+
|
|
|
+ // If the block does not exists anymore, then its not an error
|
|
|
+ if ( dataset.getFile(block) == null ) {
|
|
|
+ LOG.info("Verification failed for " + block + ". Its ok since " +
|
|
|
+ "it not in datanode dataset anymore.");
|
|
|
+ deleteBlock(block);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.warn((second ? "Second " : "First ") +
|
|
|
+ "Verification failed for " + block + ". Exception : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+
|
|
|
+ if (second) {
|
|
|
+ datanode.getMetrics().verificationFailures(1);
|
|
|
+ handleScanFailure(block);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(blockSender);
|
|
|
+ datanode.getMetrics().verifiedBlocks(1);
|
|
|
+ totalScans++;
|
|
|
+ totalVerifications++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized long getEarliestScanTime() {
|
|
|
+ if ( blockInfoSet.size() > 0 ) {
|
|
|
+ return blockInfoSet.first().lastScanTime;
|
|
|
+ }
|
|
|
+ return Long.MAX_VALUE;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Picks one block and verifies it
|
|
|
+ private void verifyFirstBlock() {
|
|
|
+ Block block = null;
|
|
|
+ synchronized (this) {
|
|
|
+ if ( blockInfoSet.size() > 0 ) {
|
|
|
+ block = blockInfoSet.first().block;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if ( block != null ) {
|
|
|
+ verifyBlock(block);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean assignInitialVerificationTimes() {
|
|
|
+ /* returns false if the process was interrupted
|
|
|
+ * because the thread is marked to exit.
|
|
|
+ */
|
|
|
+
|
|
|
+ int numBlocks = 1;
|
|
|
+ synchronized (this) {
|
|
|
+ numBlocks = Math.max(blockMap.size(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ //First udpates the last verification times from the log file.
|
|
|
+ LogFileHandler.Reader logReader = null;
|
|
|
+ try {
|
|
|
+ if (verificationLog != null) {
|
|
|
+ logReader = verificationLog.newReader();
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Could not read previous verification times : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (verificationLog != null) {
|
|
|
+ verificationLog.updateCurNumLines();
|
|
|
+ }
|
|
|
+
|
|
|
+ // update verification times from the verificationLog.
|
|
|
+ Block tmpBlock = new Block(0, 0);
|
|
|
+ while (logReader != null && logReader.hasNext()) {
|
|
|
+ if (!datanode.shouldRun || Thread.interrupted()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ String line = logReader.next();
|
|
|
+ LogEntry entry = LogEntry.parseEntry(line);
|
|
|
+ synchronized (this) {
|
|
|
+ tmpBlock.blkid = entry.blockId;
|
|
|
+ BlockScanInfo info = blockMap.get(tmpBlock);
|
|
|
+
|
|
|
+ if(info != null && entry.verificationTime > 0 &&
|
|
|
+ info.lastScanTime < entry.verificationTime) {
|
|
|
+ delBlockInfo(info);
|
|
|
+ info.lastScanTime = entry.verificationTime;
|
|
|
+ info.lastScanType = ScanType.VERIFICATION_SCAN;
|
|
|
+ addBlockInfo(info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Initially spread the block reads over half of
|
|
|
+ * MIN_SCAN_PERIOD so that we don't keep scanning the
|
|
|
+ * blocks too quickly when restarted.
|
|
|
+ */
|
|
|
+ long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
|
|
|
+ 10*60*1000 ));
|
|
|
+ long lastScanTime = System.currentTimeMillis() - scanPeriod;
|
|
|
+
|
|
|
+ /* Before this loop, entries in blockInfoSet that are not
|
|
|
+ * updated above have lastScanTime of <= 0 . Loop until first entry has
|
|
|
+ * lastModificationTime > 0.
|
|
|
+ */
|
|
|
+ synchronized (this) {
|
|
|
+ if (blockInfoSet.size() > 0 ) {
|
|
|
+ BlockScanInfo info;
|
|
|
+ while ((info = blockInfoSet.first()).lastScanTime < 0) {
|
|
|
+ delBlockInfo(info);
|
|
|
+ info.lastScanTime = lastScanTime;
|
|
|
+ lastScanTime += verifyInterval;
|
|
|
+ addBlockInfo(info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void startNewPeriod() {
|
|
|
+ LOG.info("Starting a new period : work left in prev period : " +
|
|
|
+ String.format("%.2f%%", (bytesLeft * 100.0)/totalBytesToScan));
|
|
|
+ // reset the byte counts :
|
|
|
+ bytesLeft = totalBytesToScan;
|
|
|
+ currentPeriodStart = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ //Read last verification times
|
|
|
+ if (!assignInitialVerificationTimes()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ adjustThrottler();
|
|
|
+
|
|
|
+ while (datanode.shouldRun && !Thread.interrupted()) {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ synchronized (this) {
|
|
|
+ if ( now >= (currentPeriodStart + scanPeriod)) {
|
|
|
+ startNewPeriod();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if ( (now - getEarliestScanTime()) >= scanPeriod ) {
|
|
|
+ verifyFirstBlock();
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ shutdown();
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ LOG.warn("RuntimeException during DataBlockScanner.run() : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ LOG.info("Exiting DataBlockScanner thread.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void shutdown() {
|
|
|
+ LogFileHandler log = verificationLog;
|
|
|
+ verificationLog = null;
|
|
|
+ if (log != null) {
|
|
|
+ log.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void printBlockReport(StringBuilder buffer,
|
|
|
+ boolean summaryOnly) {
|
|
|
+ long oneHour = 3600*1000;
|
|
|
+ long oneDay = 24*oneHour;
|
|
|
+ long oneWeek = 7*oneDay;
|
|
|
+ long fourWeeks = 4*oneWeek;
|
|
|
+
|
|
|
+ int inOneHour = 0;
|
|
|
+ int inOneDay = 0;
|
|
|
+ int inOneWeek = 0;
|
|
|
+ int inFourWeeks = 0;
|
|
|
+ int inScanPeriod = 0;
|
|
|
+ int neverScanned = 0;
|
|
|
+
|
|
|
+ int total = blockInfoSet.size();
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ Date date = new Date();
|
|
|
+
|
|
|
+ for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
|
|
|
+ BlockScanInfo info = it.next();
|
|
|
+
|
|
|
+ long scanTime = info.getLastScanTime();
|
|
|
+ long diff = now - scanTime;
|
|
|
+
|
|
|
+ if (diff <= oneHour) inOneHour++;
|
|
|
+ if (diff <= oneDay) inOneDay++;
|
|
|
+ if (diff <= oneWeek) inOneWeek++;
|
|
|
+ if (diff <= fourWeeks) inFourWeeks++;
|
|
|
+ if (diff <= scanPeriod) inScanPeriod++;
|
|
|
+ if (scanTime <= 0) neverScanned++;
|
|
|
+
|
|
|
+ if (!summaryOnly) {
|
|
|
+ date.setTime(scanTime);
|
|
|
+ String scanType =
|
|
|
+ (info.lastScanType == ScanType.REMOTE_READ) ? "remote" :
|
|
|
+ ((info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" :
|
|
|
+ "none");
|
|
|
+ buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
|
|
|
+ " scan time : " +
|
|
|
+ "%-15d %s\n", info.block,
|
|
|
+ (info.lastScanOk ? "ok" : "failed"),
|
|
|
+ scanType, scanTime,
|
|
|
+ (scanTime <= 0) ? "not yet verified" :
|
|
|
+ dateFormat.format(date)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
|
|
|
+ *100.0/scanPeriod;
|
|
|
+ double pctProgress = (totalBytesToScan == 0) ? 100 :
|
|
|
+ (totalBytesToScan-bytesLeft)*10000.0/totalBytesToScan/
|
|
|
+ (100-pctPeriodLeft+1e-10);
|
|
|
+
|
|
|
+ buffer.append(String.format("\nTotal Blocks : %6d" +
|
|
|
+ "\nVerified in last hour : %6d" +
|
|
|
+ "\nVerified in last day : %6d" +
|
|
|
+ "\nVerified in last week : %6d" +
|
|
|
+ "\nVerified in last four weeks : %6d" +
|
|
|
+ "\nVerified in SCAN_PERIOD : %6d" +
|
|
|
+ "\nNot yet verified : %6d" +
|
|
|
+ "\nVerified since restart : %6d" +
|
|
|
+ "\nScans since restart : %6d" +
|
|
|
+ "\nScan errors since restart : %6d" +
|
|
|
+ "\nTransient scan errors : %6d" +
|
|
|
+ "\nCurrent scan rate limit KBps : %6d" +
|
|
|
+ "\nProgress this period : %6.0f%%" +
|
|
|
+ "\nTime left in cur period : %6.2f%%" +
|
|
|
+ "\n",
|
|
|
+ total, inOneHour, inOneDay, inOneWeek,
|
|
|
+ inFourWeeks, inScanPeriod, neverScanned,
|
|
|
+ totalVerifications, totalScans,
|
|
|
+ totalScanErrors, totalTransientErrors,
|
|
|
+ Math.round(throttler.getBandwidth()/1024.0),
|
|
|
+ pctProgress, pctPeriodLeft));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class takes care of log file used to store the last verification
|
|
|
+ * times of the blocks. It rolls the current file when it is too big etc.
|
|
|
+ * If there is an error while writing, it stops updating with an error
|
|
|
+ * message.
|
|
|
+ */
|
|
|
+ private static class LogFileHandler {
|
|
|
+
|
|
|
+ private static final String curFileSuffix = ".curr";
|
|
|
+ private static final String prevFileSuffix = ".prev";
|
|
|
+
|
|
|
+ /// Don't roll files more aften than this
|
|
|
+ private static final long minRollingPeriod = 6 * 3600 * 1000L; // 6 hours
|
|
|
+ private static final long minWarnPeriod = minRollingPeriod;
|
|
|
+ private static final int minLineLimit = 1000;
|
|
|
+
|
|
|
+
|
|
|
+ static boolean isFilePresent(File dir, String filePrefix) {
|
|
|
+ return new File(dir, filePrefix + curFileSuffix).exists() ||
|
|
|
+ new File(dir, filePrefix + prevFileSuffix).exists();
|
|
|
+ }
|
|
|
+ private File curFile;
|
|
|
+ private File prevFile;
|
|
|
+
|
|
|
+ private int maxNumLines = -1; // not very hard limit on number of lines.
|
|
|
+ private int curNumLines = -1;
|
|
|
+
|
|
|
+ long lastWarningTime = 0;
|
|
|
+
|
|
|
+ PrintStream out;
|
|
|
+
|
|
|
+ int numReaders = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Opens the log file for appending.
|
|
|
+ * Note that rolling will happen only after "updateLineCount()" is
|
|
|
+ * called. This is so that line count could be updated in a seprate
|
|
|
+ * thread without delaying start up.
|
|
|
+ *
|
|
|
+ * @param dir where the logs files are located.
|
|
|
+ * @param filePrefix prefix of the file.
|
|
|
+ * @param maxNumLines max lines in a file (its a soft limit).
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ LogFileHandler(File dir, String filePrefix, int maxNumLines)
|
|
|
+ throws IOException {
|
|
|
+ curFile = new File(dir, filePrefix + curFileSuffix);
|
|
|
+ prevFile = new File(dir, filePrefix + prevFileSuffix);
|
|
|
+ openCurFile();
|
|
|
+ curNumLines = -1;
|
|
|
+ setMaxNumLines(maxNumLines);
|
|
|
+ }
|
|
|
+
|
|
|
+ // setting takes affect when next entry is added.
|
|
|
+ synchronized void setMaxNumLines(int maxNumLines) {
|
|
|
+ this.maxNumLines = Math.max(maxNumLines, minLineLimit);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This appends "line\n". Note "\n".
|
|
|
+ * If the log file need to be rolled, it will done after
|
|
|
+ * appending the text.
|
|
|
+ * This does not throw IOException when there is an error while
|
|
|
+ * appending. Currently does not throw an error even if rolling
|
|
|
+ * fails (may be it should?).
|
|
|
+ * return true if append was successful.
|
|
|
+ */
|
|
|
+ synchronized boolean appendLine(String line) {
|
|
|
+ out.println(line);
|
|
|
+ curNumLines += (curNumLines < 0) ? -1 : 1;
|
|
|
+ try {
|
|
|
+ rollIfRequired();
|
|
|
+ } catch (IOException e) {
|
|
|
+ warn("Rolling failed for " + curFile + " : " + e.getMessage());
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ //warns only once in a while
|
|
|
+ synchronized private void warn(String msg) {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ if ((now - lastWarningTime) >= minWarnPeriod) {
|
|
|
+ lastWarningTime = now;
|
|
|
+ LOG.warn(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void openCurFile() throws IOException {
|
|
|
+ if (out != null) {
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ out = new PrintStream(new FileOutputStream(curFile, true));
|
|
|
+ }
|
|
|
+
|
|
|
+ //This reads the current file and updates the count.
|
|
|
+ void updateCurNumLines() {
|
|
|
+ int count = 0;
|
|
|
+ try {
|
|
|
+ for(Reader it = new Reader(true); it.hasNext(); count++) {
|
|
|
+ it.next();
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ synchronized (this) {
|
|
|
+ curNumLines = count;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void rollIfRequired() throws IOException {
|
|
|
+ if (curNumLines < maxNumLines || numReaders > 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ if (now < minRollingPeriod) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!prevFile.delete() && prevFile.exists()) {
|
|
|
+ throw new IOException("Could not delete " + prevFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ if (!curFile.renameTo(prevFile)) {
|
|
|
+ openCurFile();
|
|
|
+ throw new IOException("Could not rename " + curFile +
|
|
|
+ " to " + prevFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ openCurFile();
|
|
|
+ updateCurNumLines();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void close() {
|
|
|
+ if (out != null) {
|
|
|
+ out.close();
|
|
|
+ out = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Reader newReader() throws IOException {
|
|
|
+ return new Reader(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is used to read the lines in order.
|
|
|
+ * If the data is not read completely (i.e, untill hasNext() returns
|
|
|
+ * false), it needs to be explicitly
|
|
|
+ */
|
|
|
+ class Reader implements Iterator<String>, Closeable {
|
|
|
+
|
|
|
+ BufferedReader reader;
|
|
|
+ File file;
|
|
|
+ String line;
|
|
|
+ boolean closed = false;
|
|
|
+
|
|
|
+ private Reader(boolean skipPrevFile) throws IOException {
|
|
|
+ synchronized (LogFileHandler.this) {
|
|
|
+ numReaders++;
|
|
|
+ }
|
|
|
+ reader = null;
|
|
|
+ file = (skipPrevFile) ? curFile : prevFile;
|
|
|
+ readNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean openFile() throws IOException {
|
|
|
+
|
|
|
+ for(int i=0; i<2; i++) {
|
|
|
+ if (reader != null || i > 0) {
|
|
|
+ // move to next file
|
|
|
+ file = (file == prevFile) ? curFile : null;
|
|
|
+ }
|
|
|
+ if (file == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (file.exists()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reader != null ) {
|
|
|
+ reader.close();
|
|
|
+ reader = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ reader = new BufferedReader(new FileReader(file));
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ // read next line if possible.
|
|
|
+ private void readNext() throws IOException {
|
|
|
+ line = null;
|
|
|
+ try {
|
|
|
+ if (reader != null && (line = reader.readLine()) != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (line == null) {
|
|
|
+ // move to the next file.
|
|
|
+ if (openFile()) {
|
|
|
+ readNext();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (!hasNext()) {
|
|
|
+ close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean hasNext() {
|
|
|
+ return line != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String next() {
|
|
|
+ String curLine = line;
|
|
|
+ try {
|
|
|
+ readNext();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("Could not reade next line in LogHandler : " +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
+ return curLine;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void remove() {
|
|
|
+ throw new RuntimeException("remove() is not supported.");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (!closed) {
|
|
|
+ try {
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ file = null;
|
|
|
+ reader = null;
|
|
|
+ closed = true;
|
|
|
+ synchronized (LogFileHandler.this) {
|
|
|
+ numReaders--;
|
|
|
+ assert(numReaders >= 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class Servlet extends HttpServlet {
|
|
|
+
|
|
|
+ public void doGet(HttpServletRequest request,
|
|
|
+ HttpServletResponse response) throws IOException {
|
|
|
+
|
|
|
+ response.setContentType("text/plain");
|
|
|
+
|
|
|
+ DataBlockScanner blockScanner = (DataBlockScanner)
|
|
|
+ getServletContext().getAttribute("datanode.blockScanner");
|
|
|
+
|
|
|
+ boolean summary = (request.getParameter("listblocks") == null);
|
|
|
+
|
|
|
+ StringBuilder buffer = new StringBuilder(8*1024);
|
|
|
+ if (blockScanner == null) {
|
|
|
+ buffer.append("Period block scanner is not running. " +
|
|
|
+ "Please check the datanode log if this is unexpected.");
|
|
|
+ } else {
|
|
|
+ blockScanner.printBlockReport(buffer, summary);
|
|
|
+ }
|
|
|
+ response.getWriter().write(buffer.toString()); // extra copy!
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|