|
@@ -22,26 +22,81 @@ import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.ozone.OzoneAcl;
|
|
import org.apache.hadoop.ozone.OzoneAcl;
|
|
|
|
+import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
|
+import org.apache.hadoop.ozone.OzoneConsts;
|
|
import org.apache.hadoop.ozone.client.BucketArgs;
|
|
import org.apache.hadoop.ozone.client.BucketArgs;
|
|
import org.apache.hadoop.ozone.client.OzoneBucket;
|
|
import org.apache.hadoop.ozone.client.OzoneBucket;
|
|
|
|
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
|
import org.apache.hadoop.ozone.client.OzoneKey;
|
|
import org.apache.hadoop.ozone.client.OzoneKey;
|
|
import org.apache.hadoop.ozone.client.OzoneQuota;
|
|
import org.apache.hadoop.ozone.client.OzoneQuota;
|
|
import org.apache.hadoop.ozone.client.OzoneVolume;
|
|
import org.apache.hadoop.ozone.client.OzoneVolume;
|
|
import org.apache.hadoop.ozone.client.ReplicationFactor;
|
|
import org.apache.hadoop.ozone.client.ReplicationFactor;
|
|
import org.apache.hadoop.ozone.client.ReplicationType;
|
|
import org.apache.hadoop.ozone.client.ReplicationType;
|
|
|
|
+import org.apache.hadoop.ozone.client.VolumeArgs;
|
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import org.apache.hadoop.ozone.client.rest.headers.Header;
|
|
|
|
+import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
|
|
|
|
+import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
|
|
|
|
+import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
|
|
|
|
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
|
|
|
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
+import org.apache.http.HttpEntity;
|
|
|
|
+import org.apache.http.HttpHeaders;
|
|
|
|
+import org.apache.http.HttpResponse;
|
|
|
|
+import org.apache.http.client.config.RequestConfig;
|
|
|
|
+import org.apache.http.client.methods.HttpDelete;
|
|
|
|
+import org.apache.http.client.methods.HttpGet;
|
|
|
|
+import org.apache.http.client.methods.HttpPost;
|
|
|
|
+import org.apache.http.client.methods.HttpPut;
|
|
|
|
+import org.apache.http.client.methods.HttpUriRequest;
|
|
|
|
+import org.apache.http.client.utils.URIBuilder;
|
|
|
|
+import org.apache.http.entity.InputStreamEntity;
|
|
|
|
+import org.apache.http.impl.client.CloseableHttpClient;
|
|
|
|
+import org.apache.http.impl.client.HttpClients;
|
|
|
|
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
|
|
|
+import org.apache.http.util.EntityUtils;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+
|
|
|
|
+import java.io.InputStream;
|
|
|
|
+import java.io.OutputStream;
|
|
|
|
+import java.io.PipedInputStream;
|
|
|
|
+import java.io.PipedOutputStream;
|
|
|
|
+import java.net.URI;
|
|
|
|
+import java.net.URISyntaxException;
|
|
|
|
+import java.text.ParseException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Random;
|
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
|
+import java.util.concurrent.FutureTask;
|
|
|
|
+
|
|
|
|
+import static java.net.HttpURLConnection.HTTP_CREATED;
|
|
|
|
+import static java.net.HttpURLConnection.HTTP_OK;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Ozone Client REST protocol implementation. It uses REST protocol to
|
|
* Ozone Client REST protocol implementation. It uses REST protocol to
|
|
- * connect to Ozone Handler that executes client calls
|
|
|
|
|
|
+ * connect to Ozone Handler that executes client calls. RestClient uses
|
|
|
|
+ * <code>ozone.rest.servers</code> and <code>ozone.rest.client.port</code>
|
|
|
|
+ * to discover Ozone Rest Server.
|
|
*/
|
|
*/
|
|
public class RestClient implements ClientProtocol {
|
|
public class RestClient implements ClientProtocol {
|
|
|
|
|
|
|
|
+ private static final String PATH_SEPARATOR = "/";
|
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(RpcClient.class);
|
|
|
|
+
|
|
|
|
+ private final Configuration conf;
|
|
|
|
+ private final URI ozoneRestUri;
|
|
|
|
+ private final CloseableHttpClient httpClient;
|
|
|
|
+ private final UserGroupInformation ugi;
|
|
|
|
+ private final OzoneAcl.OzoneACLRights userRights;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Creates RestClient instance with the given configuration.
|
|
* Creates RestClient instance with the given configuration.
|
|
* @param conf Configuration
|
|
* @param conf Configuration
|
|
@@ -49,37 +104,186 @@ public class RestClient implements ClientProtocol {
|
|
*/
|
|
*/
|
|
public RestClient(Configuration conf)
|
|
public RestClient(Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
- Preconditions.checkNotNull(conf);
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(conf);
|
|
|
|
+ this.conf = conf;
|
|
|
|
+ int port = conf.getInt(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_CLIENT_PORT_DEFAULT);
|
|
|
|
+ URIBuilder uriBuilder = new URIBuilder()
|
|
|
|
+ .setScheme("http")
|
|
|
|
+ .setHost(getOzoneRestHandlerHost())
|
|
|
|
+ .setPort(port);
|
|
|
|
+ this.ozoneRestUri = uriBuilder.build();
|
|
|
|
+ int socketTimeout = conf.getInt(
|
|
|
|
+ OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS,
|
|
|
|
+ OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT);
|
|
|
|
+ int connectionTimeout = conf.getInt(
|
|
|
|
+ OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS,
|
|
|
|
+ OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT);
|
|
|
|
+ int maxConnection = conf.getInt(
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_MAX,
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_DEFAULT);
|
|
|
|
+
|
|
|
|
+ int maxConnectionPerRoute = conf.getInt(
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ To make RestClient Thread safe, creating the HttpClient with
|
|
|
|
+ ThreadSafeClientConnManager.
|
|
|
|
+ */
|
|
|
|
+ PoolingHttpClientConnectionManager connManager =
|
|
|
|
+ new PoolingHttpClientConnectionManager();
|
|
|
|
+ connManager.setMaxTotal(maxConnection);
|
|
|
|
+ connManager.setDefaultMaxPerRoute(maxConnectionPerRoute);
|
|
|
|
+
|
|
|
|
+ this.httpClient = HttpClients.custom()
|
|
|
|
+ .setConnectionManager(connManager)
|
|
|
|
+ .setDefaultRequestConfig(
|
|
|
|
+ RequestConfig.custom()
|
|
|
|
+ .setSocketTimeout(socketTimeout)
|
|
|
|
+ .setConnectTimeout(connectionTimeout)
|
|
|
|
+ .build())
|
|
|
|
+ .build();
|
|
|
|
+ this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
+ this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
|
|
|
|
+ KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns the REST server host to connect to.
|
|
|
|
+ *
|
|
|
|
+ * @return hostname of REST server
|
|
|
|
+ */
|
|
|
|
+ private String getOzoneRestHandlerHost() {
|
|
|
|
+ List<String> servers = new ArrayList<>(conf.getTrimmedStringCollection(
|
|
|
|
+ OzoneConfigKeys.OZONE_REST_SERVERS));
|
|
|
|
+ if(servers.isEmpty()) {
|
|
|
|
+ throw new IllegalArgumentException(OzoneConfigKeys.OZONE_REST_SERVERS +
|
|
|
|
+ " must be defined. See" +
|
|
|
|
+ " https://wiki.apache.org/hadoop/Ozone#Configuration for" +
|
|
|
|
+ " details on configuring Ozone.");
|
|
|
|
+ }
|
|
|
|
+ return servers.get(new Random().nextInt(servers.size()));
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void createVolume(String volumeName) throws IOException {
|
|
public void createVolume(String volumeName) throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ createVolume(volumeName, VolumeArgs.newBuilder().build());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void createVolume(
|
|
|
|
- String volumeName, org.apache.hadoop.ozone.client.VolumeArgs args)
|
|
|
|
|
|
+ public void createVolume(String volumeName, VolumeArgs volArgs)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ String owner = volArgs.getOwner() == null ?
|
|
|
|
+ ugi.getUserName() : volArgs.getOwner();
|
|
|
|
+ //TODO: support for ACLs has to be done in OzoneHandler (rest server)
|
|
|
|
+ /**
|
|
|
|
+ List<OzoneAcl> listOfAcls = new ArrayList<>();
|
|
|
|
+ //User ACL
|
|
|
|
+ listOfAcls.add(new OzoneAcl(OzoneAcl.OzoneACLType.USER,
|
|
|
|
+ owner, userRights));
|
|
|
|
+ //ACLs from VolumeArgs
|
|
|
|
+ if(volArgs.getAcls() != null) {
|
|
|
|
+ listOfAcls.addAll(volArgs.getAcls());
|
|
|
|
+ }
|
|
|
|
+ */
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName);
|
|
|
|
+
|
|
|
|
+ String quota = volArgs.getQuota();
|
|
|
|
+ if(quota != null) {
|
|
|
|
+ builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ HttpPost httpPost = new HttpPost(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPost);
|
|
|
|
+ //use admin from VolumeArgs, if it's present
|
|
|
|
+ if(volArgs.getAdmin() != null) {
|
|
|
|
+ httpPost.removeHeaders(HttpHeaders.AUTHORIZATION);
|
|
|
|
+ httpPost.addHeader(HttpHeaders.AUTHORIZATION,
|
|
|
|
+ Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
|
|
|
|
+ volArgs.getAdmin());
|
|
|
|
+ }
|
|
|
|
+ httpPost.addHeader(Header.OZONE_USER, owner);
|
|
|
|
+ LOG.info("Creating Volume: {}, with {} as owner and quota set to {}.",
|
|
|
|
+ volumeName, owner, quota == null ? "default" : quota);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPost));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void setVolumeOwner(String volumeName, String owner)
|
|
public void setVolumeOwner(String volumeName, String owner)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(owner);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+ httpPut.addHeader(Header.OZONE_USER, owner);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void setVolumeQuota(String volumeName, OzoneQuota quota)
|
|
public void setVolumeQuota(String volumeName, OzoneQuota quota)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(quota);
|
|
|
|
+ String quotaString = quota.toString();
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName);
|
|
|
|
+ builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quotaString);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public OzoneVolume getVolumeDetails(String volumeName)
|
|
public OzoneVolume getVolumeDetails(String volumeName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName);
|
|
|
|
+ builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
|
|
|
|
+ Header.OZONE_INFO_QUERY_VOLUME);
|
|
|
|
+ HttpGet httpGet = new HttpGet(builder.build());
|
|
|
|
+ addOzoneHeaders(httpGet);
|
|
|
|
+ HttpEntity response = executeHttpRequest(httpGet);
|
|
|
|
+ VolumeInfo volInfo =
|
|
|
|
+ VolumeInfo.parse(EntityUtils.toString(response));
|
|
|
|
+ //TODO: OzoneHandler in datanode has to be modified to send ACLs
|
|
|
|
+ OzoneVolume volume = new OzoneVolume(conf,
|
|
|
|
+ this,
|
|
|
|
+ volInfo.getVolumeName(),
|
|
|
|
+ volInfo.getCreatedBy(),
|
|
|
|
+ volInfo.getOwner().getName(),
|
|
|
|
+ volInfo.getQuota().sizeInBytes(),
|
|
|
|
+ OzoneClientUtils.formatDateTime(volInfo.getCreatedOn()),
|
|
|
|
+ null);
|
|
|
|
+ EntityUtils.consume(response);
|
|
|
|
+ return volume;
|
|
|
|
+ } catch (URISyntaxException | ParseException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -90,7 +294,16 @@ public class RestClient implements ClientProtocol {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void deleteVolume(String volumeName) throws IOException {
|
|
public void deleteVolume(String volumeName) throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName);
|
|
|
|
+ HttpDelete httpDelete = new HttpDelete(builder.build());
|
|
|
|
+ addOzoneHeaders(httpDelete);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpDelete));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -110,48 +323,161 @@ public class RestClient implements ClientProtocol {
|
|
@Override
|
|
@Override
|
|
public void createBucket(String volumeName, String bucketName)
|
|
public void createBucket(String volumeName, String bucketName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ createBucket(volumeName, bucketName, BucketArgs.newBuilder().build());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void createBucket(
|
|
public void createBucket(
|
|
String volumeName, String bucketName, BucketArgs bucketArgs)
|
|
String volumeName, String bucketName, BucketArgs bucketArgs)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(bucketArgs);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ OzoneConsts.Versioning versioning = OzoneConsts.Versioning.DISABLED;
|
|
|
|
+ if(bucketArgs.getVersioning() != null &&
|
|
|
|
+ bucketArgs.getVersioning()) {
|
|
|
|
+ versioning = OzoneConsts.Versioning.ENABLED;
|
|
|
|
+ }
|
|
|
|
+ StorageType storageType = bucketArgs.getStorageType() == null ?
|
|
|
|
+ StorageType.DEFAULT : bucketArgs.getStorageType();
|
|
|
|
+
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpPost httpPost = new HttpPost(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPost);
|
|
|
|
+
|
|
|
|
+ //ACLs from BucketArgs
|
|
|
|
+ if(bucketArgs.getAcls() != null) {
|
|
|
|
+ for (OzoneAcl acl : bucketArgs.getAcls()) {
|
|
|
|
+ httpPost.addHeader(
|
|
|
|
+ Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ httpPost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
|
|
|
|
+ httpPost.addHeader(Header.OZONE_BUCKET_VERSIONING,
|
|
|
|
+ versioning.toString());
|
|
|
|
+ LOG.info("Creating Bucket: {}/{}, with Versioning {} and Storage Type" +
|
|
|
|
+ " set to {}", volumeName, bucketName, versioning,
|
|
|
|
+ storageType);
|
|
|
|
+
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPost));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void addBucketAcls(
|
|
public void addBucketAcls(
|
|
String volumeName, String bucketName, List<OzoneAcl> addAcls)
|
|
String volumeName, String bucketName, List<OzoneAcl> addAcls)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(addAcls);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+
|
|
|
|
+ for (OzoneAcl acl : addAcls) {
|
|
|
|
+ httpPut.addHeader(
|
|
|
|
+ Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl.toString());
|
|
|
|
+ }
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void removeBucketAcls(
|
|
public void removeBucketAcls(
|
|
String volumeName, String bucketName, List<OzoneAcl> removeAcls)
|
|
String volumeName, String bucketName, List<OzoneAcl> removeAcls)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(removeAcls);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+
|
|
|
|
+ for (OzoneAcl acl : removeAcls) {
|
|
|
|
+ httpPut.addHeader(
|
|
|
|
+ Header.OZONE_ACLS, Header.OZONE_ACL_REMOVE + " " + acl.toString());
|
|
|
|
+ }
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void setBucketVersioning(
|
|
public void setBucketVersioning(
|
|
String volumeName, String bucketName, Boolean versioning)
|
|
String volumeName, String bucketName, Boolean versioning)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(versioning);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+
|
|
|
|
+ httpPut.addHeader(Header.OZONE_BUCKET_VERSIONING,
|
|
|
|
+ getBucketVersioning(versioning).toString());
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void setBucketStorageType(
|
|
public void setBucketStorageType(
|
|
String volumeName, String bucketName, StorageType storageType)
|
|
String volumeName, String bucketName, StorageType storageType)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(storageType);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpPut httpPut = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(httpPut);
|
|
|
|
+
|
|
|
|
+ httpPut.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpPut));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void deleteBucket(String volumeName, String bucketName)
|
|
public void deleteBucket(String volumeName, String bucketName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ HttpDelete httpDelete = new HttpDelete(builder.build());
|
|
|
|
+ addOzoneHeaders(httpDelete);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpDelete));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -163,7 +489,32 @@ public class RestClient implements ClientProtocol {
|
|
@Override
|
|
@Override
|
|
public OzoneBucket getBucketDetails(String volumeName, String bucketName)
|
|
public OzoneBucket getBucketDetails(String volumeName, String bucketName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName);
|
|
|
|
+ builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
|
|
|
|
+ Header.OZONE_INFO_QUERY_BUCKET);
|
|
|
|
+ HttpGet httpGet = new HttpGet(builder.build());
|
|
|
|
+ addOzoneHeaders(httpGet);
|
|
|
|
+ HttpEntity response = executeHttpRequest(httpGet);
|
|
|
|
+ BucketInfo bucketInfo =
|
|
|
|
+ BucketInfo.parse(EntityUtils.toString(response));
|
|
|
|
+ OzoneBucket bucket = new OzoneBucket(conf,
|
|
|
|
+ this,
|
|
|
|
+ bucketInfo.getVolumeName(),
|
|
|
|
+ bucketInfo.getBucketName(),
|
|
|
|
+ bucketInfo.getAcls(),
|
|
|
|
+ bucketInfo.getStorageType(),
|
|
|
|
+ getBucketVersioningFlag(bucketInfo.getVersioning()),
|
|
|
|
+ OzoneClientUtils.formatDateTime(bucketInfo.getCreatedOn()));
|
|
|
|
+ EntityUtils.consume(response);
|
|
|
|
+ return bucket;
|
|
|
|
+ } catch (URISyntaxException | ParseException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -188,20 +539,109 @@ public class RestClient implements ClientProtocol {
|
|
String volumeName, String bucketName, String keyName, long size,
|
|
String volumeName, String bucketName, String keyName, long size,
|
|
ReplicationType type, ReplicationFactor factor)
|
|
ReplicationType type, ReplicationFactor factor)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ // TODO: Once ReplicationType and ReplicationFactor are supported in
|
|
|
|
+ // OzoneHandler (in Datanode), set them in header.
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(keyName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName +
|
|
|
|
+ PATH_SEPARATOR + keyName);
|
|
|
|
+ HttpPut putRequest = new HttpPut(builder.build());
|
|
|
|
+ addOzoneHeaders(putRequest);
|
|
|
|
+ PipedInputStream in = new PipedInputStream();
|
|
|
|
+ OutputStream out = new PipedOutputStream(in);
|
|
|
|
+ putRequest.setEntity(new InputStreamEntity(in, size));
|
|
|
|
+ FutureTask<HttpEntity> futureTask =
|
|
|
|
+ new FutureTask<>(() -> executeHttpRequest(putRequest));
|
|
|
|
+ new Thread(futureTask).start();
|
|
|
|
+ OzoneOutputStream outputStream = new OzoneOutputStream(
|
|
|
|
+ new OutputStream() {
|
|
|
|
+ @Override
|
|
|
|
+ public void write(int b) throws IOException {
|
|
|
|
+ out.write(b);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ out.close();
|
|
|
|
+ EntityUtils.consume(futureTask.get());
|
|
|
|
+ } catch (ExecutionException | InterruptedException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ return outputStream;
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public OzoneInputStream getKey(
|
|
public OzoneInputStream getKey(
|
|
String volumeName, String bucketName, String keyName)
|
|
String volumeName, String bucketName, String keyName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(keyName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName +
|
|
|
|
+ PATH_SEPARATOR + keyName);
|
|
|
|
+ HttpGet getRequest = new HttpGet(builder.build());
|
|
|
|
+ addOzoneHeaders(getRequest);
|
|
|
|
+ HttpEntity entity = executeHttpRequest(getRequest);
|
|
|
|
+ PipedInputStream in = new PipedInputStream();
|
|
|
|
+ OutputStream out = new PipedOutputStream(in);
|
|
|
|
+ FutureTask<Void> futureTask =
|
|
|
|
+ new FutureTask<>(() -> {
|
|
|
|
+ entity.writeTo(out);
|
|
|
|
+ out.close();
|
|
|
|
+ return null;
|
|
|
|
+ });
|
|
|
|
+ new Thread(futureTask).start();
|
|
|
|
+ OzoneInputStream inputStream = new OzoneInputStream(
|
|
|
|
+ new InputStream() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int read() throws IOException {
|
|
|
|
+ return in.read();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void close() throws IOException {
|
|
|
|
+ in.close();
|
|
|
|
+ EntityUtils.consume(entity);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ return inputStream;
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void deleteKey(String volumeName, String bucketName, String keyName)
|
|
public void deleteKey(String volumeName, String bucketName, String keyName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(keyName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName + PATH_SEPARATOR + keyName);
|
|
|
|
+ HttpDelete httpDelete = new HttpDelete(builder.build());
|
|
|
|
+ addOzoneHeaders(httpDelete);
|
|
|
|
+ EntityUtils.consume(executeHttpRequest(httpDelete));
|
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -216,10 +656,113 @@ public class RestClient implements ClientProtocol {
|
|
public OzoneKey getKeyDetails(
|
|
public OzoneKey getKeyDetails(
|
|
String volumeName, String bucketName, String keyName)
|
|
String volumeName, String bucketName, String keyName)
|
|
throws IOException {
|
|
throws IOException {
|
|
- throw new UnsupportedOperationException("Not yet implemented.");
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(volumeName);
|
|
|
|
+ Preconditions.checkNotNull(bucketName);
|
|
|
|
+ Preconditions.checkNotNull(keyName);
|
|
|
|
+ URIBuilder builder = new URIBuilder(ozoneRestUri);
|
|
|
|
+ builder.setPath(PATH_SEPARATOR + volumeName +
|
|
|
|
+ PATH_SEPARATOR + bucketName + PATH_SEPARATOR + keyName);
|
|
|
|
+ builder.setParameter(Header.OZONE_INFO_QUERY_TAG,
|
|
|
|
+ Header.OZONE_INFO_QUERY_KEY);
|
|
|
|
+ HttpGet httpGet = new HttpGet(builder.build());
|
|
|
|
+ addOzoneHeaders(httpGet);
|
|
|
|
+ HttpEntity response = executeHttpRequest(httpGet);
|
|
|
|
+ KeyInfo keyInfo =
|
|
|
|
+ KeyInfo.parse(EntityUtils.toString(response));
|
|
|
|
+ OzoneKey key = new OzoneKey(volumeName,
|
|
|
|
+ bucketName,
|
|
|
|
+ keyInfo.getKeyName(),
|
|
|
|
+ keyInfo.getSize(),
|
|
|
|
+ OzoneClientUtils.formatDateTime(keyInfo.getCreatedOn()),
|
|
|
|
+ OzoneClientUtils.formatDateTime(keyInfo.getModifiedOn()));
|
|
|
|
+ EntityUtils.consume(response);
|
|
|
|
+ return key;
|
|
|
|
+ } catch (URISyntaxException | ParseException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds Ozone headers to http request.
|
|
|
|
+ *
|
|
|
|
+ * @param httpRequest Http Request
|
|
|
|
+ */
|
|
|
|
+ private void addOzoneHeaders(HttpUriRequest httpRequest) {
|
|
|
|
+ httpRequest.addHeader(HttpHeaders.AUTHORIZATION,
|
|
|
|
+ Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
|
|
|
|
+ ugi.getUserName());
|
|
|
|
+ httpRequest.addHeader(HttpHeaders.DATE,
|
|
|
|
+ OzoneClientUtils.formatDateTime(Time.monotonicNow()));
|
|
|
|
+ httpRequest.addHeader(Header.OZONE_VERSION_HEADER,
|
|
|
|
+ Header.OZONE_V1_VERSION_HEADER);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sends the http request to server and returns the response HttpEntity.
|
|
|
|
+ * It's responsibility of the caller to consume and close response HttpEntity
|
|
|
|
+ * by calling {@code EntityUtils.consume}
|
|
|
|
+ *
|
|
|
|
+ * @param httpUriRequest http request
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private HttpEntity executeHttpRequest(HttpUriRequest httpUriRequest)
|
|
|
|
+ throws IOException {
|
|
|
|
+ HttpResponse response = httpClient.execute(httpUriRequest);
|
|
|
|
+ int errorCode = response.getStatusLine().getStatusCode();
|
|
|
|
+ HttpEntity entity = response.getEntity();
|
|
|
|
+ if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+ if (entity != null) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ OzoneException.parse(EntityUtils.toString(entity)));
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Unexpected null in http payload," +
|
|
|
|
+ " while processing request");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Converts OzoneConts.Versioning to boolean.
|
|
|
|
+ *
|
|
|
|
+ * @param version
|
|
|
|
+ * @return corresponding boolean value
|
|
|
|
+ */
|
|
|
|
+ private Boolean getBucketVersioningFlag(
|
|
|
|
+ OzoneConsts.Versioning version) {
|
|
|
|
+ if(version != null) {
|
|
|
|
+ switch(version) {
|
|
|
|
+ case ENABLED:
|
|
|
|
+ return true;
|
|
|
|
+ case NOT_DEFINED:
|
|
|
|
+ case DISABLED:
|
|
|
|
+ default:
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Converts Bucket versioning flag into OzoneConts.Versioning.
|
|
|
|
+ *
|
|
|
|
+ * @param flag versioning flag
|
|
|
|
+ * @return corresponding OzoneConts.Versionin
|
|
|
|
+ */
|
|
|
|
+ private OzoneConsts.Versioning getBucketVersioning(Boolean flag) {
|
|
|
|
+ if(flag != null) {
|
|
|
|
+ if(flag) {
|
|
|
|
+ return OzoneConsts.Versioning.ENABLED;
|
|
|
|
+ } else {
|
|
|
|
+ return OzoneConsts.Versioning.DISABLED;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return OzoneConsts.Versioning.NOT_DEFINED;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void close() throws IOException {
|
|
public void close() throws IOException {
|
|
|
|
+ httpClient.close();
|
|
}
|
|
}
|
|
}
|
|
}
|