DataNode.java 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069
  1. /**
  2. * Copyright 2005 The Apache Software Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.hadoop.dfs;
  17. import org.apache.commons.logging.*;
  18. import org.apache.hadoop.ipc.*;
  19. import org.apache.hadoop.conf.*;
  20. import org.apache.hadoop.metrics.Metrics;
  21. import org.apache.hadoop.util.*;
  22. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  23. import org.apache.hadoop.mapred.StatusHttpServer;
  24. import java.io.*;
  25. import java.net.*;
  26. import java.util.*;
  27. import org.apache.hadoop.metrics.MetricsRecord;
  28. /**********************************************************
  29. * DataNode is a class (and program) that stores a set of
  30. * blocks for a DFS deployment. A single deployment can
  31. * have one or many DataNodes. Each DataNode communicates
  32. * regularly with a single NameNode. It also communicates
  33. * with client code and other DataNodes from time to time.
  34. *
  35. * DataNodes store a series of named blocks. The DataNode
  36. * allows client code to read these blocks, or to write new
  37. * block data. The DataNode may also, in response to instructions
  38. * from its NameNode, delete blocks or copy blocks to/from other
  39. * DataNodes.
  40. *
  41. * The DataNode maintains just one critical table:
  42. * block-> stream of bytes (of BLOCK_SIZE or less)
  43. *
  44. * This info is stored on a local disk. The DataNode
  45. * reports the table's contents to the NameNode upon startup
  46. * and every so often afterwards.
  47. *
  48. * DataNodes spend their lives in an endless loop of asking
  49. * the NameNode for something to do. A NameNode cannot connect
  50. * to a DataNode directly; a NameNode simply returns values from
  51. * functions invoked by a DataNode.
  52. *
  53. * DataNodes maintain an open server socket so that client code
  54. * or other DataNodes can read/write data. The host/port for
  55. * this server is reported to the NameNode, which then sends that
  56. * information to clients or other DataNodes that might be interested.
  57. *
  58. * @author Mike Cafarella
  59. **********************************************************/
  60. public class DataNode implements FSConstants, Runnable {
  61. public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
  62. //
  63. // REMIND - mjc - I might bring "maxgigs" back so user can place
  64. // artificial limit on space
  65. //private static final long GIGABYTE = 1024 * 1024 * 1024;
  66. //private static long numGigs = Configuration.get().getLong("dfs.datanode.maxgigs", 100);
  67. //
  68. /**
  69. * Util method to build socket addr from string
  70. */
  71. public static InetSocketAddress createSocketAddr(String s) throws IOException {
  72. String target = s;
  73. int colonIndex = target.indexOf(':');
  74. if (colonIndex < 0) {
  75. throw new RuntimeException("Not a host:port pair: " + s);
  76. }
  77. String host = target.substring(0, colonIndex);
  78. int port = Integer.parseInt(target.substring(colonIndex + 1));
  79. return new InetSocketAddress(host, port);
  80. }
  81. private static Map subDataNodeList = null;
  82. DatanodeProtocol namenode;
  83. FSDataset data;
  84. DatanodeRegistration dnRegistration;
  85. boolean shouldRun = true;
  86. Vector receivedBlockList = new Vector();
  87. int xmitsInProgress = 0;
  88. Daemon dataXceiveServer = null;
  89. long blockReportInterval;
  90. private DataStorage storage = null;
  91. private StatusHttpServer infoServer;
  92. private static int infoPort;
  93. private static int port;
  94. private static String localMachine;
  95. private static InetSocketAddress nameNodeAddr;
  96. private static DataNode datanodeObject = null;
  97. static Date startTime = new Date(System.currentTimeMillis());
  98. private class DataNodeMetrics {
  99. private MetricsRecord metricsRecord = null;
  100. private long bytesWritten = 0L;
  101. private long bytesRead = 0L;
  102. private long blocksWritten = 0L;
  103. private long blocksRead = 0L;
  104. private long blocksReplicated = 0L;
  105. private long blocksRemoved = 0L;
  106. DataNodeMetrics() {
  107. metricsRecord = Metrics.createRecord("dfs", "datanode");
  108. }
  109. synchronized void readBytes(int nbytes) {
  110. bytesRead += nbytes;
  111. Metrics.report(metricsRecord, "bytes-read", bytesRead);
  112. }
  113. synchronized void wroteBytes(int nbytes) {
  114. bytesWritten += nbytes;
  115. Metrics.report(metricsRecord, "bytes-written", bytesWritten);
  116. }
  117. synchronized void readBlocks(int nblocks) {
  118. blocksRead += nblocks;
  119. Metrics.report(metricsRecord, "blocks-read", blocksRead);
  120. }
  121. synchronized void wroteBlocks(int nblocks) {
  122. blocksWritten += nblocks;
  123. Metrics.report(metricsRecord, "blocks-written", blocksWritten);
  124. }
  125. synchronized void replicatedBlocks(int nblocks) {
  126. blocksReplicated += nblocks;
  127. Metrics.report(metricsRecord, "blocks-replicated", blocksReplicated);
  128. }
  129. synchronized void removedBlocks(int nblocks) {
  130. blocksRemoved += nblocks;
  131. Metrics.report(metricsRecord, "blocks-removed", blocksRemoved);
  132. }
  133. }
  134. DataNodeMetrics myMetrics = new DataNodeMetrics();
  135. /**
  136. * Create the DataNode given a configuration and a dataDir.
  137. * 'dataDir' is where the blocks are stored.
  138. */
  139. public DataNode(Configuration conf, String datadir) throws IOException {
  140. this(InetAddress.getLocalHost().getHostName(),
  141. new File(datadir),
  142. createSocketAddr(conf.get("fs.default.name", "local")), conf);
  143. // register datanode
  144. register();
  145. infoPort = conf.getInt("dfs.datanode.info.port", 50075);
  146. this.infoServer = new StatusHttpServer("datanode", infoPort, false);
  147. //create a servlet to serve full-file content
  148. try {
  149. this.infoServer.addServlet(null, "/streamFile/*",
  150. "org.apache.hadoop.dfs.StreamFile", null);
  151. } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
  152. this.infoServer.start();
  153. datanodeObject = this;
  154. }
  155. /**
  156. * A DataNode can also be created with configuration information
  157. * explicitly given.
  158. *
  159. * @see DataStorage
  160. */
  161. private DataNode(String machineName,
  162. File datadir,
  163. InetSocketAddress nameNodeAddr,
  164. Configuration conf ) throws IOException {
  165. // get storage info and lock the data dir
  166. storage = new DataStorage( datadir );
  167. // connect to name node
  168. this.namenode = (DatanodeProtocol)
  169. RPC.waitForProxy(DatanodeProtocol.class,
  170. DatanodeProtocol.versionID,
  171. nameNodeAddr,
  172. conf);
  173. // find free port
  174. ServerSocket ss = null;
  175. int tmpPort = conf.getInt("dfs.datanode.port", 50010);
  176. while (ss == null) {
  177. try {
  178. ss = new ServerSocket(tmpPort);
  179. LOG.info("Opened server at " + tmpPort);
  180. } catch (IOException ie) {
  181. LOG.info("Could not open server at " + tmpPort + ", trying new port");
  182. tmpPort++;
  183. }
  184. }
  185. // construct registration
  186. this.dnRegistration = new DatanodeRegistration(
  187. DFS_CURRENT_VERSION,
  188. machineName + ":" + tmpPort,
  189. storage.getStorageID(),
  190. "" );
  191. // initialize data node internal structure
  192. this.data = new FSDataset(datadir, conf);
  193. this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
  194. long blockReportIntervalBasis =
  195. conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
  196. this.blockReportInterval =
  197. blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
  198. localMachine = machineName;
  199. this.nameNodeAddr = nameNodeAddr;
  200. port = tmpPort;
  201. }
  202. /** Return the DataNode object
  203. *
  204. */
  205. public static DataNode getDataNode() {
  206. return datanodeObject;
  207. }
  208. public String getDataNodeMachine() {
  209. return localMachine;
  210. }
  211. public int getDataNodePort() {
  212. return port;
  213. }
  214. public int getDataNodeInfoPort() {
  215. return infoPort;
  216. }
  217. public InetSocketAddress getNameNodeAddr() {
  218. return nameNodeAddr;
  219. }
  220. public InetSocketAddress getDataNodeAddr() {
  221. return new InetSocketAddress(localMachine, port);
  222. }
  223. public Date getStartTime() {
  224. return startTime;
  225. }
  226. /**
  227. * Return the namenode's identifier
  228. */
  229. public String getNamenode() {
  230. //return namenode.toString();
  231. return "<namenode>";
  232. }
  233. /**
  234. * Register datanode
  235. * <p>
  236. * The datanode needs to register with the namenode on startup in order
  237. * 1) to report which storage it is serving now and
  238. * 2) to receive a registrationID
  239. * issued by the namenode to recognize registered datanodes.
  240. *
  241. * @see FSNamesystem#registerDatanode(DatanodeRegistration)
  242. * @throws IOException
  243. */
  244. private void register() throws IOException {
  245. dnRegistration = namenode.register( dnRegistration );
  246. if( storage.getStorageID().equals("") ) {
  247. storage.setStorageID( dnRegistration.getStorageID());
  248. storage.write();
  249. }
  250. }
  251. /**
  252. * Shut down this instance of the datanode.
  253. * Returns only after shutdown is complete.
  254. */
  255. public void shutdown() {
  256. try {
  257. infoServer.stop();
  258. } catch (Exception e) {
  259. }
  260. this.shouldRun = false;
  261. ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
  262. try {
  263. this.storage.close();
  264. } catch (IOException ie) {
  265. }
  266. }
  267. /**
  268. * Shut down all datanodes that where started via the run(conf) method.
  269. * Returns only after shutdown is complete.
  270. */
  271. public static void shutdownAll(){
  272. if(subDataNodeList != null && !subDataNodeList.isEmpty()){
  273. for (Iterator iterator = subDataNodeList.keySet().iterator(); iterator.hasNext();) {
  274. DataNode dataNode = (DataNode) iterator.next();
  275. dataNode.shutdown();
  276. }
  277. }
  278. }
  279. void handleDiskError( String errMsgr ) {
  280. LOG.warn( "DataNode is shutting down.\n" + errMsgr );
  281. try {
  282. namenode.errorReport(
  283. dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
  284. } catch( IOException ignored) {
  285. }
  286. shutdown();
  287. }
  288. private static class Count {
  289. int value = 0;
  290. Count(int init) { value = init; }
  291. synchronized void incr() { value++; }
  292. synchronized void decr() { value--; }
  293. public String toString() { return Integer.toString(value); }
  294. public int getValue() { return value; }
  295. }
  296. Count xceiverCount = new Count(0);
  297. /**
  298. * Main loop for the DataNode. Runs until shutdown,
  299. * forever calling remote NameNode functions.
  300. */
  301. public void offerService() throws Exception {
  302. long lastHeartbeat = 0, lastBlockReport = 0;
  303. LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
  304. //
  305. // Now loop for a long time....
  306. //
  307. try {
  308. while (shouldRun) {
  309. long now = System.currentTimeMillis();
  310. //
  311. // Every so often, send heartbeat or block-report
  312. //
  313. if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
  314. //
  315. // All heartbeat messages include following info:
  316. // -- Datanode name
  317. // -- data transfer port
  318. // -- Total capacity
  319. // -- Bytes remaining
  320. //
  321. BlockCommand cmd = namenode.sendHeartbeat(dnRegistration,
  322. data.getCapacity(),
  323. data.getRemaining(),
  324. xmitsInProgress,
  325. xceiverCount.getValue());
  326. //LOG.info("Just sent heartbeat, with name " + localName);
  327. lastHeartbeat = now;
  328. if( cmd != null ) {
  329. data.checkDataDir();
  330. if (cmd.transferBlocks()) {
  331. //
  332. // Send a copy of a block to another datanode
  333. //
  334. Block blocks[] = cmd.getBlocks();
  335. DatanodeInfo xferTargets[][] = cmd.getTargets();
  336. for (int i = 0; i < blocks.length; i++) {
  337. if (!data.isValidBlock(blocks[i])) {
  338. String errStr = "Can't send invalid block " + blocks[i];
  339. LOG.info(errStr);
  340. namenode.errorReport( dnRegistration,
  341. DatanodeProtocol.INVALID_BLOCK,
  342. errStr);
  343. break;
  344. } else {
  345. if (xferTargets[i].length > 0) {
  346. LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
  347. new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
  348. }
  349. }
  350. }
  351. } else if (cmd.invalidateBlocks()) {
  352. //
  353. // Some local block(s) are obsolete and can be
  354. // safely garbage-collected.
  355. //
  356. Block toDelete[] = cmd.getBlocks();
  357. data.invalidate(toDelete);
  358. myMetrics.removedBlocks(toDelete.length);
  359. } else if( cmd.shutdownNode()) {
  360. // shut down the data node
  361. this.shutdown();
  362. continue;
  363. }
  364. }
  365. }
  366. // send block report
  367. if (now - lastBlockReport > blockReportInterval) {
  368. // before send block report, check if data directory is healthy
  369. data.checkDataDir();
  370. //
  371. // Send latest blockinfo report if timer has expired.
  372. // Get back a list of local block(s) that are obsolete
  373. // and can be safely GC'ed.
  374. //
  375. Block toDelete[] = namenode.blockReport(dnRegistration,
  376. data.getBlockReport());
  377. data.invalidate(toDelete);
  378. lastBlockReport = now;
  379. continue;
  380. }
  381. // check if there are newly received blocks
  382. Block [] blockArray=null;
  383. synchronized( receivedBlockList ) {
  384. if (receivedBlockList.size() > 0) {
  385. //
  386. // Send newly-received blockids to namenode
  387. //
  388. blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
  389. receivedBlockList.removeAllElements();
  390. }
  391. }
  392. if( blockArray != null ) {
  393. namenode.blockReceived( dnRegistration, blockArray );
  394. }
  395. //
  396. // There is no work to do; sleep until hearbeat timer elapses,
  397. // or work arrives, and then iterate again.
  398. //
  399. long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
  400. synchronized( receivedBlockList ) {
  401. if (waitTime > 0 && receivedBlockList.size() == 0) {
  402. try {
  403. receivedBlockList.wait(waitTime);
  404. } catch (InterruptedException ie) {
  405. }
  406. }
  407. } // synchronized
  408. } // while (shouldRun)
  409. } catch(DiskErrorException e) {
  410. handleDiskError(e.getLocalizedMessage());
  411. } catch( RemoteException re ) {
  412. String reClass = re.getClassName();
  413. if( UnregisteredDatanodeException.class.getName().equals( reClass )) {
  414. LOG.warn( "DataNode is shutting down: " +
  415. StringUtils.stringifyException(re));
  416. shutdown();
  417. return;
  418. }
  419. throw re;
  420. }
  421. } // offerService
  422. /**
  423. * Server used for receiving/sending a block of data.
  424. * This is created to listen for requests from clients or
  425. * other DataNodes. This small server does not use the
  426. * Hadoop IPC mechanism.
  427. */
  428. class DataXceiveServer implements Runnable {
  429. boolean shouldListen = true;
  430. ServerSocket ss;
  431. public DataXceiveServer(ServerSocket ss) {
  432. this.ss = ss;
  433. }
  434. /**
  435. */
  436. public void run() {
  437. try {
  438. while (shouldListen) {
  439. Socket s = ss.accept();
  440. //s.setSoTimeout(READ_TIMEOUT);
  441. data.checkDataDir();
  442. xceiverCount.incr();
  443. new Daemon(new DataXceiver(s)).start();
  444. }
  445. ss.close();
  446. } catch (DiskErrorException de ) {
  447. String errMsgr = de.getMessage();
  448. LOG.warn("Exiting DataXceiveServer due to "+ errMsgr );
  449. handleDiskError(errMsgr);
  450. } catch (IOException ie) {
  451. LOG.info("Exiting DataXceiveServer due to " + ie.toString());
  452. }
  453. }
  454. public void kill() {
  455. this.shouldListen = false;
  456. try {
  457. this.ss.close();
  458. } catch (IOException iex) {
  459. }
  460. }
  461. }
  462. /**
  463. * Thread for processing incoming/outgoing data stream
  464. */
  465. class DataXceiver implements Runnable {
  466. Socket s;
  467. public DataXceiver(Socket s) {
  468. this.s = s;
  469. LOG.debug("Number of active connections is: "+xceiverCount);
  470. }
  471. /**
  472. * Read/write data from/to the DataXceiveServer.
  473. */
  474. public void run() {
  475. try {
  476. DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream()));
  477. try {
  478. byte op = (byte) in.read();
  479. if (op == OP_WRITE_BLOCK) {
  480. writeBlock(in);
  481. } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
  482. readBlock(in, op);
  483. } else {
  484. while (op >= 0) {
  485. System.out.println("Faulty op: " + op);
  486. op = (byte) in.read();
  487. }
  488. throw new IOException("Unknown opcode for incoming data stream");
  489. }
  490. } finally {
  491. in.close();
  492. }
  493. } catch (IOException ie) {
  494. LOG.warn("DataXCeiver", ie);
  495. } finally {
  496. try {
  497. xceiverCount.decr();
  498. LOG.debug("Number of active connections is: "+xceiverCount);
  499. s.close();
  500. } catch (IOException ie2) {
  501. }
  502. }
  503. }
  504. /**
  505. * Read a block from the disk
  506. * @param in The stream to read from
  507. * @param op OP_READ_BLOCK or OP_READ_SKIPBLOCK
  508. * @throws IOException
  509. */
  510. private void readBlock(DataInputStream in, byte op) throws IOException {
  511. //
  512. // Read in the header
  513. //
  514. Block b = new Block();
  515. b.readFields(in);
  516. long toSkip = 0;
  517. if (op == OP_READSKIP_BLOCK) {
  518. toSkip = in.readLong();
  519. }
  520. //
  521. // Open reply stream
  522. //
  523. DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
  524. try {
  525. //
  526. // Write filelen of -1 if error
  527. //
  528. if (! data.isValidBlock(b)) {
  529. out.writeLong(-1);
  530. } else {
  531. //
  532. // Get blockdata from disk
  533. //
  534. long len = data.getLength(b);
  535. DataInputStream in2 = new DataInputStream(data.getBlockData(b));
  536. out.writeLong(len);
  537. if (op == OP_READSKIP_BLOCK) {
  538. if (toSkip > len) {
  539. toSkip = len;
  540. }
  541. long amtSkipped = 0;
  542. try {
  543. amtSkipped = in2.skip(toSkip);
  544. } catch (IOException iex) {
  545. shutdown();
  546. throw iex;
  547. }
  548. out.writeLong(amtSkipped);
  549. }
  550. byte buf[] = new byte[BUFFER_SIZE];
  551. try {
  552. int bytesRead = 0;
  553. try {
  554. bytesRead = in2.read(buf);
  555. myMetrics.readBytes(bytesRead);
  556. } catch (IOException iex) {
  557. shutdown();
  558. throw iex;
  559. }
  560. while (bytesRead >= 0) {
  561. out.write(buf, 0, bytesRead);
  562. len -= bytesRead;
  563. try {
  564. bytesRead = in2.read(buf);
  565. myMetrics.readBytes(bytesRead);
  566. } catch (IOException iex) {
  567. shutdown();
  568. throw iex;
  569. }
  570. }
  571. } catch (SocketException se) {
  572. // This might be because the reader
  573. // closed the stream early
  574. } finally {
  575. try {
  576. in2.close();
  577. } catch (IOException iex) {
  578. shutdown();
  579. throw iex;
  580. }
  581. }
  582. }
  583. myMetrics.readBlocks(1);
  584. LOG.info("Served block " + b + " to " + s.getInetAddress());
  585. } finally {
  586. out.close();
  587. }
  588. }
  589. /**
  590. * Write a block to disk.
  591. * @param in The stream to read from
  592. * @throws IOException
  593. */
  594. private void writeBlock(DataInputStream in) throws IOException {
  595. //
  596. // Read in the header
  597. //
  598. DataOutputStream reply =
  599. new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
  600. try {
  601. boolean shouldReportBlock = in.readBoolean();
  602. Block b = new Block();
  603. b.readFields(in);
  604. int numTargets = in.readInt();
  605. if (numTargets <= 0) {
  606. throw new IOException("Mislabelled incoming datastream.");
  607. }
  608. DatanodeInfo targets[] = new DatanodeInfo[numTargets];
  609. for (int i = 0; i < targets.length; i++) {
  610. DatanodeInfo tmp = new DatanodeInfo();
  611. tmp.readFields(in);
  612. targets[i] = tmp;
  613. }
  614. byte encodingType = (byte) in.read();
  615. long len = in.readLong();
  616. //
  617. // Make sure curTarget is equal to this machine
  618. //
  619. DatanodeInfo curTarget = targets[0];
  620. //
  621. // Track all the places we've successfully written the block
  622. //
  623. Vector mirrors = new Vector();
  624. //
  625. // Open local disk out
  626. //
  627. DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b)));
  628. InetSocketAddress mirrorTarget = null;
  629. String mirrorNode = null;
  630. try {
  631. //
  632. // Open network conn to backup machine, if
  633. // appropriate
  634. //
  635. DataInputStream in2 = null;
  636. DataOutputStream out2 = null;
  637. if (targets.length > 1) {
  638. // Connect to backup machine
  639. mirrorNode = targets[1].getName();
  640. mirrorTarget = createSocketAddr(mirrorNode);
  641. try {
  642. Socket s2 = new Socket();
  643. s2.connect(mirrorTarget, READ_TIMEOUT);
  644. s2.setSoTimeout(READ_TIMEOUT);
  645. out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
  646. in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
  647. // Write connection header
  648. out2.write(OP_WRITE_BLOCK);
  649. out2.writeBoolean(shouldReportBlock);
  650. b.write(out2);
  651. out2.writeInt(targets.length - 1);
  652. for (int i = 1; i < targets.length; i++) {
  653. targets[i].write(out2);
  654. }
  655. out2.write(encodingType);
  656. out2.writeLong(len);
  657. myMetrics.replicatedBlocks(1);
  658. } catch (IOException ie) {
  659. if (out2 != null) {
  660. LOG.info("Exception connecting to mirror " + mirrorNode
  661. + "\n" + StringUtils.stringifyException(ie));
  662. try {
  663. out2.close();
  664. in2.close();
  665. } catch (IOException out2close) {
  666. } finally {
  667. out2 = null;
  668. in2 = null;
  669. }
  670. }
  671. }
  672. }
  673. //
  674. // Process incoming data, copy to disk and
  675. // maybe to network.
  676. //
  677. boolean anotherChunk = len != 0;
  678. byte buf[] = new byte[BUFFER_SIZE];
  679. while (anotherChunk) {
  680. while (len > 0) {
  681. int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
  682. if (bytesRead < 0) {
  683. throw new EOFException("EOF reading from "+s.toString());
  684. }
  685. if (bytesRead > 0) {
  686. try {
  687. out.write(buf, 0, bytesRead);
  688. myMetrics.wroteBytes(bytesRead);
  689. } catch (IOException iex) {
  690. shutdown();
  691. throw iex;
  692. }
  693. if (out2 != null) {
  694. try {
  695. out2.write(buf, 0, bytesRead);
  696. } catch (IOException out2e) {
  697. LOG.info("Exception writing to mirror " + mirrorNode
  698. + "\n" + StringUtils.stringifyException(out2e));
  699. //
  700. // If stream-copy fails, continue
  701. // writing to disk. We shouldn't
  702. // interrupt client write.
  703. //
  704. try {
  705. out2.close();
  706. in2.close();
  707. } catch (IOException out2close) {
  708. } finally {
  709. out2 = null;
  710. in2 = null;
  711. }
  712. }
  713. }
  714. len -= bytesRead;
  715. }
  716. }
  717. if (encodingType == RUNLENGTH_ENCODING) {
  718. anotherChunk = false;
  719. } else if (encodingType == CHUNKED_ENCODING) {
  720. len = in.readLong();
  721. if (out2 != null) {
  722. try {
  723. out2.writeLong(len);
  724. } catch (IOException ie) {
  725. LOG.info("Exception writing to mirror " + mirrorNode
  726. + "\n" + StringUtils.stringifyException(ie));
  727. try {
  728. out2.close();
  729. in2.close();
  730. } catch (IOException ie2) {
  731. // NOTHING
  732. } finally {
  733. out2 = null;
  734. in2 = null;
  735. }
  736. }
  737. }
  738. if (len == 0) {
  739. anotherChunk = false;
  740. }
  741. }
  742. }
  743. if (out2 != null) {
  744. try {
  745. out2.flush();
  746. long complete = in2.readLong();
  747. if (complete != WRITE_COMPLETE) {
  748. LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
  749. }
  750. LocatedBlock newLB = new LocatedBlock();
  751. newLB.readFields(in2);
  752. in2.close();
  753. out2.close();
  754. DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
  755. for (int k = 0; k < mirrorsSoFar.length; k++) {
  756. mirrors.add(mirrorsSoFar[k]);
  757. }
  758. } catch (IOException ie) {
  759. LOG.info("Exception writing to mirror " + mirrorNode
  760. + "\n" + StringUtils.stringifyException(ie));
  761. try {
  762. out2.close();
  763. in2.close();
  764. } catch (IOException ie2) {
  765. // NOTHING
  766. } finally {
  767. out2 = null;
  768. in2 = null;
  769. }
  770. }
  771. }
  772. if (out2 == null) {
  773. LOG.info("Received block " + b + " from " +
  774. s.getInetAddress());
  775. } else {
  776. LOG.info("Received block " + b + " from " +
  777. s.getInetAddress() +
  778. " and mirrored to " + mirrorTarget);
  779. }
  780. } finally {
  781. try {
  782. out.close();
  783. } catch (IOException iex) {
  784. shutdown();
  785. throw iex;
  786. }
  787. }
  788. data.finalizeBlock(b);
  789. myMetrics.wroteBlocks(1);
  790. //
  791. // Tell the namenode that we've received this block
  792. // in full, if we've been asked to. This is done
  793. // during NameNode-directed block transfers, but not
  794. // client writes.
  795. //
  796. if (shouldReportBlock) {
  797. synchronized (receivedBlockList) {
  798. receivedBlockList.add(b);
  799. receivedBlockList.notifyAll();
  800. }
  801. }
  802. //
  803. // Tell client job is done, and reply with
  804. // the new LocatedBlock.
  805. //
  806. reply.writeLong(WRITE_COMPLETE);
  807. mirrors.add(curTarget);
  808. LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
  809. newLB.write(reply);
  810. } finally {
  811. reply.close();
  812. }
  813. }
  814. }
  815. /**
  816. * Used for transferring a block of data. This class
  817. * sends a piece of data to another DataNode.
  818. */
  819. class DataTransfer implements Runnable {
  820. InetSocketAddress curTarget;
  821. DatanodeInfo targets[];
  822. Block b;
  823. byte buf[];
  824. /**
  825. * Connect to the first item in the target list. Pass along the
  826. * entire target list, the block, and the data.
  827. */
  828. public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
  829. this.curTarget = createSocketAddr(targets[0].getName());
  830. this.targets = targets;
  831. this.b = b;
  832. this.buf = new byte[BUFFER_SIZE];
  833. }
  834. /**
  835. * Do the deed, write the bytes
  836. */
  837. public void run() {
  838. xmitsInProgress++;
  839. try {
  840. Socket s = new Socket();
  841. s.connect(curTarget, READ_TIMEOUT);
  842. s.setSoTimeout(READ_TIMEOUT);
  843. DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
  844. try {
  845. long filelen = data.getLength(b);
  846. DataInputStream in = new DataInputStream(new BufferedInputStream(data.getBlockData(b)));
  847. try {
  848. //
  849. // Header info
  850. //
  851. out.write(OP_WRITE_BLOCK);
  852. out.writeBoolean(true);
  853. b.write(out);
  854. out.writeInt(targets.length);
  855. for (int i = 0; i < targets.length; i++) {
  856. targets[i].write(out);
  857. }
  858. out.write(RUNLENGTH_ENCODING);
  859. out.writeLong(filelen);
  860. //
  861. // Write the data
  862. //
  863. while (filelen > 0) {
  864. int bytesRead = in.read(buf, 0, (int) Math.min(filelen, buf.length));
  865. out.write(buf, 0, bytesRead);
  866. filelen -= bytesRead;
  867. }
  868. } finally {
  869. in.close();
  870. }
  871. } finally {
  872. out.close();
  873. }
  874. LOG.info("Transmitted block " + b + " to " + curTarget);
  875. } catch (IOException ie) {
  876. LOG.warn("Failed to transfer "+b+" to "+curTarget, ie);
  877. } finally {
  878. xmitsInProgress--;
  879. }
  880. }
  881. }
  882. /**
  883. * No matter what kind of exception we get, keep retrying to offerService().
  884. * That's the loop that connects to the NameNode and provides basic DataNode
  885. * functionality.
  886. *
  887. * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
  888. */
  889. public void run() {
  890. LOG.info("Starting DataNode in: "+data.data);
  891. // start dataXceiveServer
  892. dataXceiveServer.start();
  893. while (shouldRun) {
  894. try {
  895. offerService();
  896. } catch (Exception ex) {
  897. LOG.info("Exception: " + ex);
  898. if (shouldRun) {
  899. LOG.info("Lost connection to namenode. Retrying...");
  900. try {
  901. Thread.sleep(5000);
  902. } catch (InterruptedException ie) {
  903. }
  904. }
  905. }
  906. }
  907. // wait for dataXceiveServer to terminate
  908. try {
  909. this.dataXceiveServer.join();
  910. } catch (InterruptedException ie) {
  911. }
  912. LOG.info("Finishing DataNode in: "+data.data);
  913. }
  914. /** Start datanode daemons.
  915. * Start a datanode daemon for each comma separated data directory
  916. * specified in property dfs.data.dir
  917. */
  918. public static void run(Configuration conf) throws IOException {
  919. String[] dataDirs = conf.getStrings("dfs.data.dir");
  920. subDataNodeList = new HashMap(dataDirs.length);
  921. for (int i = 0; i < dataDirs.length; i++) {
  922. DataNode dn = makeInstanceForDir(dataDirs[i], conf);
  923. if (dn != null) {
  924. Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);
  925. t.setDaemon(true); // needed for JUnit testing
  926. t.start();
  927. subDataNodeList.put(dn,t);
  928. }
  929. }
  930. }
  931. /** Start datanode daemons.
  932. * Start a datanode daemon for each comma separated data directory
  933. * specified in property dfs.data.dir and wait for them to finish.
  934. * If this thread is specifically interrupted, it will stop waiting.
  935. */
  936. private static void runAndWait(Configuration conf) throws IOException {
  937. run(conf);
  938. // Wait for sub threads to exit
  939. for (Iterator iterator = subDataNodeList.entrySet().iterator(); iterator.hasNext();) {
  940. Thread threadDataNode = (Thread) ((Map.Entry) iterator.next()).getValue();
  941. try {
  942. threadDataNode.join();
  943. } catch (InterruptedException e) {
  944. if (Thread.currentThread().isInterrupted()) {
  945. // did someone knock?
  946. return;
  947. }
  948. }
  949. }
  950. }
  951. /**
  952. * Make an instance of DataNode after ensuring that given data directory
  953. * (and parent directories, if necessary) can be created.
  954. * @param dataDir where the new DataNode instance should keep its files.
  955. * @param conf Configuration instance to use.
  956. * @return DataNode instance for given data dir and conf, or null if directory
  957. * cannot be created.
  958. * @throws IOException
  959. */
  960. static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {
  961. DataNode dn = null;
  962. File data = new File(dataDir);
  963. try {
  964. DiskChecker.checkDir( data );
  965. dn = new DataNode(conf, dataDir);
  966. return dn;
  967. } catch( DiskErrorException e ) {
  968. LOG.warn("Can't start DataNode because " + e.getMessage() );
  969. return null;
  970. }
  971. }
  972. public String toString() {
  973. return "DataNode{" +
  974. "data=" + data +
  975. ", localName='" + dnRegistration.getName() + "'" +
  976. ", storageID='" + dnRegistration.getStorageID() + "'" +
  977. ", xmitsInProgress=" + xmitsInProgress +
  978. "}";
  979. }
  980. /**
  981. */
  982. public static void main(String args[]) throws IOException {
  983. Configuration conf = new Configuration();
  984. runAndWait(conf);
  985. }
  986. }