|
@@ -23,6 +23,10 @@ import java.io.IOException;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
|
|
|
+import org.apache.hadoop.fs.InvalidRequestException;
|
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
|
+
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
|
|
/**
|
|
/**
|
|
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
|
|
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
|
|
@@ -33,7 +37,7 @@ import org.apache.hadoop.fs.BatchedRemoteIterator;
|
|
public class CacheDirectiveIterator
|
|
public class CacheDirectiveIterator
|
|
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
|
|
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
|
|
|
|
|
|
- private final CacheDirectiveInfo filter;
|
|
|
|
|
|
+ private CacheDirectiveInfo filter;
|
|
private final ClientProtocol namenode;
|
|
private final ClientProtocol namenode;
|
|
|
|
|
|
public CacheDirectiveIterator(ClientProtocol namenode,
|
|
public CacheDirectiveIterator(ClientProtocol namenode,
|
|
@@ -43,10 +47,72 @@ public class CacheDirectiveIterator
|
|
this.filter = filter;
|
|
this.filter = filter;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
|
|
|
|
+ CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter);
|
|
|
|
+ builder.setId(null);
|
|
|
|
+ return builder.build();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Used for compatibility when communicating with a server version that
|
|
|
|
+ * does not support filtering directives by ID.
|
|
|
|
+ */
|
|
|
|
+ private static class SingleEntry implements
|
|
|
|
+ BatchedEntries<CacheDirectiveEntry> {
|
|
|
|
+
|
|
|
|
+ private final CacheDirectiveEntry entry;
|
|
|
|
+
|
|
|
|
+ public SingleEntry(final CacheDirectiveEntry entry) {
|
|
|
|
+ this.entry = entry;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public CacheDirectiveEntry get(int i) {
|
|
|
|
+ if (i > 0) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ return entry;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int size() {
|
|
|
|
+ return 1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean hasMore() {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
|
|
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
|
|
throws IOException {
|
|
throws IOException {
|
|
- return namenode.listCacheDirectives(prevKey, filter);
|
|
|
|
|
|
+ BatchedEntries<CacheDirectiveEntry> entries = null;
|
|
|
|
+ try {
|
|
|
|
+ entries = namenode.listCacheDirectives(prevKey, filter);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ if (e.getMessage().contains("Filtering by ID is unsupported")) {
|
|
|
|
+ // Retry case for old servers, do the filtering client-side
|
|
|
|
+ long id = filter.getId();
|
|
|
|
+ filter = removeIdFromFilter(filter);
|
|
|
|
+ // Using id - 1 as prevId should get us a window containing the id
|
|
|
|
+ // This is somewhat brittle, since it depends on directives being
|
|
|
|
+ // returned in order of ascending ID.
|
|
|
|
+ entries = namenode.listCacheDirectives(id - 1, filter);
|
|
|
|
+ for (int i=0; i<entries.size(); i++) {
|
|
|
|
+ CacheDirectiveEntry entry = entries.get(i);
|
|
|
|
+ if (entry.getInfo().getId().equals((Long)id)) {
|
|
|
|
+ return new SingleEntry(entry);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ throw new RemoteException(InvalidRequestException.class.getName(),
|
|
|
|
+ "Did not find requested id " + id);
|
|
|
|
+ }
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ Preconditions.checkNotNull(entries);
|
|
|
|
+ return entries;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|