|
@@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
final long latestGenerationStamp,
|
|
|
DataChecksum requestedChecksum,
|
|
|
CachingStrategy cachingStrategy,
|
|
|
- final boolean allowLazyPersist) throws IOException {
|
|
|
+ final boolean allowLazyPersist,
|
|
|
+ final boolean pinning,
|
|
|
+ final boolean[] targetPinnings) throws IOException {
|
|
|
previousOpClientName = clientname;
|
|
|
updateCurrentThreadName("Receiving block " + block);
|
|
|
final boolean isDatanode = clientname.length() == 0;
|
|
@@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
throw new IOException(stage + " does not support multiple targets "
|
|
|
+ Arrays.asList(targets));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
|
|
|
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
|
|
|
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
|
|
|
+ "\n targets=" + Arrays.asList(targets)
|
|
|
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
|
|
|
- );
|
|
|
+ + ", pinning=" + pinning);
|
|
|
LOG.debug("isDatanode=" + isDatanode
|
|
|
+ ", isClient=" + isClient
|
|
|
+ ", isTransfer=" + isTransfer);
|
|
@@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
peer.getLocalAddressString(),
|
|
|
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
|
|
|
clientname, srcDataNode, datanode, requestedChecksum,
|
|
|
- cachingStrategy, allowLazyPersist);
|
|
|
+ cachingStrategy, allowLazyPersist, pinning);
|
|
|
|
|
|
storageUuid = blockReceiver.getStorageUuid();
|
|
|
} else {
|
|
@@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
mirrorIn = new DataInputStream(unbufMirrorIn);
|
|
|
|
|
|
// Do not propagate allowLazyPersist to downstream DataNodes.
|
|
|
- new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
|
|
+ if (targetPinnings != null && targetPinnings.length > 0) {
|
|
|
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
|
|
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
|
|
|
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
|
|
|
- latestGenerationStamp, requestedChecksum, cachingStrategy, false);
|
|
|
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
|
|
|
+ false, targetPinnings[0], targetPinnings);
|
|
|
+ } else {
|
|
|
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
|
|
|
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
|
|
|
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
|
|
|
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
|
|
|
+ false, false, targetPinnings);
|
|
|
+ }
|
|
|
|
|
|
mirrorOut.flush();
|
|
|
|
|
@@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ if (datanode.data.getPinning(block)) {
|
|
|
+ String msg = "Not able to copy block " + block.getBlockId() + " " +
|
|
|
+ "to " + peer.getRemoteAddressString() + " because it's pinned ";
|
|
|
+ LOG.info(msg);
|
|
|
+ sendResponse(ERROR, msg);
|
|
|
+ }
|
|
|
+
|
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
|
String msg = "Not able to copy block " + block.getBlockId() + " " +
|
|
|
"to " + peer.getRemoteAddressString() + " because threads " +
|
|
@@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
|
|
proxySock.getLocalSocketAddress().toString(),
|
|
|
null, 0, 0, 0, "", null, datanode, remoteChecksum,
|
|
|
- CachingStrategy.newDropBehind(), false);
|
|
|
+ CachingStrategy.newDropBehind(), false, false);
|
|
|
|
|
|
// receive a block
|
|
|
blockReceiver.receiveBlock(null, null, replyOut, null,
|