|
@@ -349,17 +349,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
|
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
|
reader = getLegacyBlockReaderLocal();
|
|
reader = getLegacyBlockReaderLocal();
|
|
if (reader != null) {
|
|
if (reader != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": returning new legacy block reader local.");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: returning new legacy block reader local.", this);
|
|
return reader;
|
|
return reader;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
reader = getBlockReaderLocal();
|
|
reader = getBlockReaderLocal();
|
|
if (reader != null) {
|
|
if (reader != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": returning new block reader local.");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: returning new block reader local.", this);
|
|
return reader;
|
|
return reader;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -367,10 +363,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
if (scConf.isDomainSocketDataTraffic()) {
|
|
if (scConf.isDomainSocketDataTraffic()) {
|
|
reader = getRemoteBlockReaderFromDomain();
|
|
reader = getRemoteBlockReaderFromDomain();
|
|
if (reader != null) {
|
|
if (reader != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": returning new remote block reader using " +
|
|
|
|
- "UNIX domain socket on " + pathInfo.getPath());
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: returning new remote block reader using UNIX domain "
|
|
|
|
+ + "socket on {}", this, pathInfo.getPath());
|
|
return reader;
|
|
return reader;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -405,10 +399,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
setVisibleLength(visibleLength).
|
|
setVisibleLength(visibleLength).
|
|
build();
|
|
build();
|
|
if (accessor == null) {
|
|
if (accessor == null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": No ReplicaAccessor created by " +
|
|
|
|
- cls.getName());
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: No ReplicaAccessor created by {}",
|
|
|
|
+ this, cls.getName());
|
|
} else {
|
|
} else {
|
|
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
|
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
|
}
|
|
}
|
|
@@ -427,14 +419,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
* first introduced in HDFS-2246.
|
|
* first introduced in HDFS-2246.
|
|
*/
|
|
*/
|
|
private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
|
private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
|
|
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
|
|
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
|
|
|
|
- "the address " + inetSocketAddress + " is not local");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
|
|
|
|
+ + "{} is not local", this, inetSocketAddress);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
if (clientContext.getDisableLegacyBlockReaderLocal()) {
|
|
if (clientContext.getDisableLegacyBlockReaderLocal()) {
|
|
@@ -470,10 +458,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
}
|
|
}
|
|
|
|
|
|
private BlockReader getBlockReaderLocal() throws InvalidToken {
|
|
private BlockReader getBlockReaderLocal() throws InvalidToken {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": trying to construct a BlockReaderLocal " +
|
|
|
|
- "for short-circuit reads.");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
|
|
|
|
+ + " reads.", this);
|
|
if (pathInfo == null) {
|
|
if (pathInfo == null) {
|
|
pathInfo = clientContext.getDomainSocketFactory()
|
|
pathInfo = clientContext.getDomainSocketFactory()
|
|
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
|
|
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
|
|
@@ -488,10 +474,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
|
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
|
InvalidToken exc = info.getInvalidTokenException();
|
|
InvalidToken exc = info.getInvalidTokenException();
|
|
if (exc != null) {
|
|
if (exc != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": got InvalidToken exception while trying to " +
|
|
|
|
- "construct BlockReaderLocal via " + pathInfo.getPath());
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: got InvalidToken exception while trying to construct "
|
|
|
|
+ + "BlockReaderLocal via {}", this, pathInfo.getPath());
|
|
throw exc;
|
|
throw exc;
|
|
}
|
|
}
|
|
if (info.getReplica() == null) {
|
|
if (info.getReplica() == null) {
|
|
@@ -527,9 +511,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
|
|
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
|
|
if (info != null) return info;
|
|
if (info != null) return info;
|
|
}
|
|
}
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
|
|
BlockReaderPeer curPeer;
|
|
BlockReaderPeer curPeer;
|
|
while (true) {
|
|
while (true) {
|
|
curPeer = nextDomainPeer();
|
|
curPeer = nextDomainPeer();
|
|
@@ -544,10 +526,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
|
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
|
|
clientName);
|
|
clientName);
|
|
if (usedPeer.booleanValue()) {
|
|
if (usedPeer.booleanValue()) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": allocShmSlot used up our previous socket " +
|
|
|
|
- peer.getDomainSocket() + ". Allocating a new one...");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: allocShmSlot used up our previous socket {}. "
|
|
|
|
+ + "Allocating a new one...", this, peer.getDomainSocket());
|
|
curPeer = nextDomainPeer();
|
|
curPeer = nextDomainPeer();
|
|
if (curPeer == null) break;
|
|
if (curPeer == null) break;
|
|
peer = (DomainPeer)curPeer.peer;
|
|
peer = (DomainPeer)curPeer.peer;
|
|
@@ -562,9 +542,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
if (curPeer.fromCache) {
|
|
if (curPeer.fromCache) {
|
|
// Handle an I/O error we got when using a cached socket.
|
|
// Handle an I/O error we got when using a cached socket.
|
|
// These are considered less serious, because the socket may be stale.
|
|
// These are considered less serious, because the socket may be stale.
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug(this + ": closing stale domain peer " + peer, e);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("{}: closing stale domain peer {}", this, peer, e);
|
|
IOUtilsClient.cleanup(LOG, peer);
|
|
IOUtilsClient.cleanup(LOG, peer);
|
|
} else {
|
|
} else {
|
|
// Handle an I/O error we got when using a newly created socket.
|
|
// Handle an I/O error we got when using a newly created socket.
|
|
@@ -617,7 +595,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
ExtendedBlockId key =
|
|
ExtendedBlockId key =
|
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
|
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
|
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
|
- LOG.trace("Sending receipt verification byte for slot " + slot);
|
|
|
|
|
|
+ LOG.trace("Sending receipt verification byte for slot {}", slot);
|
|
sock.getOutputStream().write(0);
|
|
sock.getOutputStream().write(0);
|
|
}
|
|
}
|
|
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
|
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
|
@@ -650,9 +628,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
String msg = "access control error while " +
|
|
String msg = "access control error while " +
|
|
"attempting to set up short-circuit access to " +
|
|
"attempting to set up short-circuit access to " +
|
|
fileName + resp.getMessage();
|
|
fileName + resp.getMessage();
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug(this + ":" + msg);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("{}:{}", this, msg);
|
|
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
|
|
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
|
|
default:
|
|
default:
|
|
LOG.warn(this + ": unknown response code " + resp.getStatus() +
|
|
LOG.warn(this + ": unknown response code " + resp.getStatus() +
|
|
@@ -684,10 +660,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
" is not usable.", this, pathInfo);
|
|
" is not usable.", this, pathInfo);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": trying to create a remote block reader from the " +
|
|
|
|
- "UNIX domain socket at " + pathInfo.getPath());
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
|
|
|
|
+ + "socket at {}", this, pathInfo.getPath());
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
BlockReaderPeer curPeer = nextDomainPeer();
|
|
BlockReaderPeer curPeer = nextDomainPeer();
|
|
@@ -701,19 +675,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
IOUtilsClient.cleanup(LOG, peer);
|
|
IOUtilsClient.cleanup(LOG, peer);
|
|
if (isSecurityException(ioe)) {
|
|
if (isSecurityException(ioe)) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": got security exception while constructing " +
|
|
|
|
- "a remote block reader from the unix domain socket at " +
|
|
|
|
- pathInfo.getPath(), ioe);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: got security exception while constructing a remote "
|
|
|
|
+ + " block reader from the unix domain socket at {}",
|
|
|
|
+ this, pathInfo.getPath(), ioe);
|
|
throw ioe;
|
|
throw ioe;
|
|
}
|
|
}
|
|
if (curPeer.fromCache) {
|
|
if (curPeer.fromCache) {
|
|
// Handle an I/O error we got when using a cached peer. These are
|
|
// Handle an I/O error we got when using a cached peer. These are
|
|
// considered less serious, because the underlying socket may be stale.
|
|
// considered less serious, because the underlying socket may be stale.
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Closed potentially stale domain peer " + peer, ioe);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
|
|
} else {
|
|
} else {
|
|
// Handle an I/O error we got when using a newly created domain peer.
|
|
// Handle an I/O error we got when using a newly created domain peer.
|
|
// We temporarily disable the domain socket path for a few minutes in
|
|
// We temporarily disable the domain socket path for a few minutes in
|
|
@@ -747,10 +717,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
* If there was another problem.
|
|
* If there was another problem.
|
|
*/
|
|
*/
|
|
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
|
|
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": trying to create a remote block reader from a " +
|
|
|
|
- "TCP socket");
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: trying to create a remote block reader from a TCP socket",
|
|
|
|
+ this);
|
|
BlockReader blockReader = null;
|
|
BlockReader blockReader = null;
|
|
while (true) {
|
|
while (true) {
|
|
BlockReaderPeer curPeer = null;
|
|
BlockReaderPeer curPeer = null;
|
|
@@ -763,19 +731,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
return blockReader;
|
|
return blockReader;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
if (isSecurityException(ioe)) {
|
|
if (isSecurityException(ioe)) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace(this + ": got security exception while constructing " +
|
|
|
|
- "a remote block reader from " + peer, ioe);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("{}: got security exception while constructing a remote "
|
|
|
|
+ + "block reader from {}", this, peer, ioe);
|
|
throw ioe;
|
|
throw ioe;
|
|
}
|
|
}
|
|
if ((curPeer != null) && curPeer.fromCache) {
|
|
if ((curPeer != null) && curPeer.fromCache) {
|
|
// Handle an I/O error we got when using a cached peer. These are
|
|
// Handle an I/O error we got when using a cached peer. These are
|
|
// considered less serious, because the underlying socket may be
|
|
// considered less serious, because the underlying socket may be
|
|
// stale.
|
|
// stale.
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Closed potentially stale remote peer " + peer, ioe);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
|
|
} else {
|
|
} else {
|
|
// Handle an I/O error we got when using a newly created peer.
|
|
// Handle an I/O error we got when using a newly created peer.
|
|
LOG.warn("I/O error constructing remote block reader.", ioe);
|
|
LOG.warn("I/O error constructing remote block reader.", ioe);
|
|
@@ -808,9 +772,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
if (remainingCacheTries > 0) {
|
|
if (remainingCacheTries > 0) {
|
|
Peer peer = clientContext.getPeerCache().get(datanode, true);
|
|
Peer peer = clientContext.getPeerCache().get(datanode, true);
|
|
if (peer != null) {
|
|
if (peer != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("nextDomainPeer: reusing existing peer " + peer);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
|
|
return new BlockReaderPeer(peer, true);
|
|
return new BlockReaderPeer(peer, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -832,24 +794,18 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|
if (remainingCacheTries > 0) {
|
|
if (remainingCacheTries > 0) {
|
|
Peer peer = clientContext.getPeerCache().get(datanode, false);
|
|
Peer peer = clientContext.getPeerCache().get(datanode, false);
|
|
if (peer != null) {
|
|
if (peer != null) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("nextTcpPeer: reusing existing peer " + peer);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
|
|
return new BlockReaderPeer(peer, true);
|
|
return new BlockReaderPeer(peer, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
|
|
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
|
|
datanode);
|
|
datanode);
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
|
|
return new BlockReaderPeer(peer, false);
|
|
return new BlockReaderPeer(peer, false);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
|
|
|
|
- "connected to " + datanode);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
|
|
|
|
+ + "{}", datanode);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
}
|