|
@@ -30,6 +30,7 @@ import org.apache.ambari.view.hive.resources.uploads.parsers.DataParser;
|
|
|
import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions;
|
|
|
import org.apache.ambari.view.hive.resources.uploads.parsers.PreviewData;
|
|
|
import org.apache.ambari.view.hive.resources.uploads.query.*;
|
|
|
+import org.apache.ambari.view.hive.utils.HiveClientFormattedException;
|
|
|
import org.apache.ambari.view.hive.utils.ServiceFormattedException;
|
|
|
import org.apache.ambari.view.hive.utils.SharedObjectsFactory;
|
|
|
import org.apache.ambari.view.utils.ambari.AmbariApi;
|
|
@@ -37,6 +38,8 @@ import org.apache.commons.io.input.ReaderInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.json.simple.JSONObject;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import javax.ws.rs.*;
|
|
|
import javax.ws.rs.core.MediaType;
|
|
@@ -61,13 +64,16 @@ import java.util.*;
|
|
|
*/
|
|
|
public class UploadService extends BaseService {
|
|
|
|
|
|
+ private final static Logger LOG =
|
|
|
+ LoggerFactory.getLogger(UploadService.class);
|
|
|
+
|
|
|
private AmbariApi ambariApi;
|
|
|
protected JobResourceManager resourceManager;
|
|
|
|
|
|
final private static String HIVE_METASTORE_LOCATION_KEY = "hive.metastore.warehouse.dir";
|
|
|
final private static String HIVE_SITE = "hive-site";
|
|
|
final private static String HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY = HIVE_METASTORE_LOCATION_KEY;
|
|
|
- private static final String HIVE_DEFAULT_METASTORE_LOCATION = "/apps/hive/warehouse" ;
|
|
|
+ private static final String HIVE_DEFAULT_METASTORE_LOCATION = "/apps/hive/warehouse";
|
|
|
final private static String HIVE_DEFAULT_DB = "default";
|
|
|
|
|
|
public void validateForUploadFile(UploadFromHdfsInput input){
|
|
@@ -107,17 +113,19 @@ public class UploadService extends BaseService {
|
|
|
@Consumes(MediaType.APPLICATION_JSON)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
public Response uploadForPreviewFromHDFS(UploadFromHdfsInput input) {
|
|
|
-
|
|
|
InputStream uploadedInputStream = null;
|
|
|
try {
|
|
|
uploadedInputStream = getHDFSFileStream(input.getHdfsPath());
|
|
|
this.validateForPreview(input);
|
|
|
PreviewData pd = generatePreview(input.getIsFirstRowHeader(), input.getInputFileType(), uploadedInputStream);
|
|
|
String tableName = getBasenameFromPath(input.getHdfsPath());
|
|
|
- return createPreviewResponse(pd, input.getIsFirstRowHeader(),tableName);
|
|
|
+ return createPreviewResponse(pd, input.getIsFirstRowHeader(), tableName);
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Exception occurred while generating preview for hdfs file : " + input.getHdfsPath(), e);
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
} finally {
|
|
|
if (null != uploadedInputStream) {
|
|
|
try {
|
|
@@ -133,10 +141,10 @@ public class UploadService extends BaseService {
|
|
|
@Path("/preview")
|
|
|
@Consumes(MediaType.MULTIPART_FORM_DATA)
|
|
|
public Response uploadForPreview(
|
|
|
- @FormDataParam("file") InputStream uploadedInputStream,
|
|
|
- @FormDataParam("file") FormDataContentDisposition fileDetail,
|
|
|
- @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader,
|
|
|
- @FormDataParam("inputFileType") String inputFileType
|
|
|
+ @FormDataParam("file") InputStream uploadedInputStream,
|
|
|
+ @FormDataParam("file") FormDataContentDisposition fileDetail,
|
|
|
+ @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader,
|
|
|
+ @FormDataParam("inputFileType") String inputFileType
|
|
|
) {
|
|
|
try {
|
|
|
if( null == inputFileType)
|
|
@@ -146,10 +154,13 @@ public class UploadService extends BaseService {
|
|
|
isFirstRowHeader = false;
|
|
|
|
|
|
PreviewData pd = generatePreview(isFirstRowHeader, inputFileType, uploadedInputStream);
|
|
|
- return createPreviewResponse(pd, isFirstRowHeader,getBasename(fileDetail.getFileName()));
|
|
|
+ return createPreviewResponse(pd, isFirstRowHeader, getBasename(fileDetail.getFileName()));
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Exception occurred while generating preview for local file", e);
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -158,7 +169,7 @@ public class UploadService extends BaseService {
|
|
|
@POST
|
|
|
@Consumes(MediaType.APPLICATION_JSON)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
- public Response createTable(TableInput tableInput) {
|
|
|
+ public Job createTable(TableInput tableInput) {
|
|
|
try {
|
|
|
tableInput.validate();
|
|
|
List<ColumnDescriptionImpl> header = tableInput.getHeader();
|
|
@@ -170,20 +181,17 @@ public class UploadService extends BaseService {
|
|
|
|
|
|
TableInfo ti = new TableInfo(databaseName, tableName, header, hiveFileType);
|
|
|
String tableCreationQuery = generateCreateQuery(ti);
|
|
|
-
|
|
|
LOG.info("tableCreationQuery : {}", tableCreationQuery);
|
|
|
|
|
|
- Job actualTableJob = createJob(tableCreationQuery, databaseName);
|
|
|
- String actualTableJobId = actualTableJob.getId();
|
|
|
-
|
|
|
- JSONObject jobObject = new JSONObject();
|
|
|
- jobObject.put("jobId", actualTableJobId);
|
|
|
-
|
|
|
- LOG.info("table creation jobId {}", actualTableJobId);
|
|
|
- return Response.ok(jobObject).status(201).build();
|
|
|
+ Job job = createJob(tableCreationQuery, databaseName);
|
|
|
+ LOG.info("job created for table creation {}", job);
|
|
|
+ return job;
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Exception occurred while creating table with input : " + tableInput, e);
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -191,40 +199,41 @@ public class UploadService extends BaseService {
|
|
|
@POST
|
|
|
@Consumes(MediaType.APPLICATION_JSON)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
- public Response uploadFileFromHdfs(UploadFromHdfsInput input ) {
|
|
|
- this.validateForUploadFile(input);
|
|
|
-
|
|
|
- if (ParseOptions.InputFileType.CSV.toString().equals(input.getInputFileType()) && Boolean.FALSE.equals(input.getIsFirstRowHeader())) {
|
|
|
- // upload using the LOAD query
|
|
|
- LoadQueryInput loadQueryInput = new LoadQueryInput(input.getHdfsPath(), input.getDatabaseName(), input.getTableName());
|
|
|
- String loadQuery = new QueryGenerator().generateLoadQuery(loadQueryInput);
|
|
|
-
|
|
|
+ public Response uploadFileFromHdfs(UploadFromHdfsInput input) {
|
|
|
+ if (ParseOptions.InputFileType.CSV.toString().equals(input.getInputFileType()) && input.getIsFirstRowHeader().equals(Boolean.FALSE)) {
|
|
|
try {
|
|
|
- Job job = createJob(loadQuery, input.getDatabaseName());
|
|
|
+ // upload using the LOAD query
|
|
|
+ LoadQueryInput loadQueryInput = new LoadQueryInput(input.getHdfsPath(), input.getDatabaseName(), input.getTableName());
|
|
|
+ String loadQuery = new QueryGenerator().generateLoadQuery(loadQueryInput);
|
|
|
+ Job job = createJob(loadQuery, input.getDatabaseName());
|
|
|
|
|
|
JSONObject jo = new JSONObject();
|
|
|
jo.put("jobId", job.getId());
|
|
|
-
|
|
|
return Response.ok(jo).build();
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Exception occurred while creating job for Load From HDFS query : " + loadQuery, e);
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
-
|
|
|
} else {
|
|
|
// create stream and upload
|
|
|
InputStream hdfsStream = null;
|
|
|
try {
|
|
|
hdfsStream = getHDFSFileStream(input.getHdfsPath());
|
|
|
- String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(),input.getInputFileType(),input.getTableName(), input.getDatabaseName());
|
|
|
+ String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(), input.getInputFileType(), input.getTableName(), input.getDatabaseName());
|
|
|
|
|
|
JSONObject jo = new JSONObject();
|
|
|
jo.put("uploadedPath", path);
|
|
|
|
|
|
return Response.ok(jo).build();
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Exception occurred while uploading the file from HDFS with path : " + input.getHdfsPath(), e);
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
} finally {
|
|
|
if (null != hdfsStream)
|
|
|
try {
|
|
@@ -241,22 +250,25 @@ public class UploadService extends BaseService {
|
|
|
@Consumes(MediaType.MULTIPART_FORM_DATA)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
public Response uploadFile(
|
|
|
- @FormDataParam("file") InputStream uploadedInputStream,
|
|
|
- @FormDataParam("file") FormDataContentDisposition fileDetail,
|
|
|
- @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader,
|
|
|
- @FormDataParam("inputFileType") String inputFileType, // the format of the file uploaded. CSV/JSON etc.
|
|
|
- @FormDataParam("tableName") String tableName,
|
|
|
- @FormDataParam("databaseName") String databaseName
|
|
|
+ @FormDataParam("file") InputStream uploadedInputStream,
|
|
|
+ @FormDataParam("file") FormDataContentDisposition fileDetail,
|
|
|
+ @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader,
|
|
|
+ @FormDataParam("inputFileType") String inputFileType, // the format of the file uploaded. CSV/JSON etc.
|
|
|
+ @FormDataParam("tableName") String tableName,
|
|
|
+ @FormDataParam("databaseName") String databaseName
|
|
|
) {
|
|
|
try {
|
|
|
-
|
|
|
- String path = uploadFileFromStream(uploadedInputStream,isFirstRowHeader,inputFileType,tableName,databaseName);
|
|
|
+ String path = uploadFileFromStream(uploadedInputStream, isFirstRowHeader, inputFileType, tableName, databaseName);
|
|
|
|
|
|
JSONObject jo = new JSONObject();
|
|
|
jo.put("uploadedPath", path);
|
|
|
return Response.ok(jo).build();
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -264,19 +276,20 @@ public class UploadService extends BaseService {
|
|
|
@POST
|
|
|
@Consumes(MediaType.APPLICATION_JSON)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
- public Response insertFromTempTable(InsertFromQueryInput input) {
|
|
|
+ public Job insertFromTempTable(InsertFromQueryInput input) {
|
|
|
try {
|
|
|
String insertQuery = generateInsertFromQuery(input);
|
|
|
LOG.info("insertQuery : {}", insertQuery);
|
|
|
|
|
|
Job job = createJob(insertQuery, "default");
|
|
|
-
|
|
|
- JSONObject jo = new JSONObject();
|
|
|
- jo.put("jobId", job.getId());
|
|
|
-
|
|
|
- return Response.ok(jo).build();
|
|
|
+ LOG.info("Job created for insert from temp table : {}", job);
|
|
|
+ return job;
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -284,19 +297,20 @@ public class UploadService extends BaseService {
|
|
|
@POST
|
|
|
@Consumes(MediaType.APPLICATION_JSON)
|
|
|
@Produces(MediaType.APPLICATION_JSON)
|
|
|
- public Response deleteTable(DeleteQueryInput input) {
|
|
|
+ public Job deleteTable(DeleteQueryInput input) {
|
|
|
try {
|
|
|
String deleteQuery = generateDeleteQuery(input);
|
|
|
LOG.info("deleteQuery : {}", deleteQuery);
|
|
|
|
|
|
Job job = createJob(deleteQuery, "default");
|
|
|
-
|
|
|
- JSONObject jo = new JSONObject();
|
|
|
- jo.put("jobId", job.getId());
|
|
|
-
|
|
|
- return Response.ok(jo).build();
|
|
|
+ LOG.info("Job created for delete temp table : {} ", job);
|
|
|
+ return job;
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -319,8 +333,12 @@ public class UploadService extends BaseService {
|
|
|
uploadFile(fullPath, new ReaderInputStream(reader));
|
|
|
|
|
|
return fullPath;
|
|
|
+ } catch (WebApplicationException e) {
|
|
|
+ LOG.error(getErrorMessage(e), e);
|
|
|
+ throw e;
|
|
|
} catch (Exception e) {
|
|
|
- throw new ServiceFormattedException(e.getMessage(), e);
|
|
|
+ LOG.error(e.getMessage(), e);
|
|
|
+ throw new ServiceFormattedException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -371,16 +389,16 @@ public class UploadService extends BaseService {
|
|
|
|
|
|
private String getHiveMetaStoreLocation() {
|
|
|
String dir = context.getProperties().get(HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY);
|
|
|
- if(dir != null && !dir.trim().isEmpty()){
|
|
|
+ if (dir != null && !dir.trim().isEmpty()) {
|
|
|
return dir;
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
LOG.debug("Neither found associated cluster nor found the view property {}. Returning default location : {}", HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY, HIVE_DEFAULT_METASTORE_LOCATION);
|
|
|
return HIVE_DEFAULT_METASTORE_LOCATION;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void uploadFile(final String filePath, InputStream uploadedInputStream)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
byte[] chunk = new byte[1024];
|
|
|
FSDataOutputStream out = getSharedObjectsFactory().getHdfsApi().create(filePath, false);
|
|
|
int n = -1;
|
|
@@ -390,6 +408,12 @@ public class UploadService extends BaseService {
|
|
|
out.close();
|
|
|
}
|
|
|
|
|
|
+ private static String getErrorMessage(WebApplicationException e) {
|
|
|
+ if (null != e.getResponse() && null != e.getResponse().getEntity())
|
|
|
+ return e.getResponse().getEntity().toString();
|
|
|
+ else return e.getMessage();
|
|
|
+ }
|
|
|
+
|
|
|
private PreviewData generatePreview(Boolean isFirstRowHeader, String inputFileType, InputStream uploadedInputStream) throws Exception {
|
|
|
ParseOptions parseOptions = new ParseOptions();
|
|
|
parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType);
|
|
@@ -426,11 +450,11 @@ public class UploadService extends BaseService {
|
|
|
}
|
|
|
|
|
|
private String uploadFileFromStream(
|
|
|
- InputStream uploadedInputStream,
|
|
|
- Boolean isFirstRowHeader,
|
|
|
- String inputFileType, // the format of the file uploaded. CSV/JSON etc.
|
|
|
- String tableName,
|
|
|
- String databaseName
|
|
|
+ InputStream uploadedInputStream,
|
|
|
+ Boolean isFirstRowHeader,
|
|
|
+ String inputFileType, // the format of the file uploaded. CSV/JSON etc.
|
|
|
+ String tableName,
|
|
|
+ String databaseName
|
|
|
|
|
|
) throws Exception {
|
|
|
LOG.info(" uploading file into databaseName {}, tableName {}", databaseName, tableName);
|
|
@@ -452,10 +476,10 @@ public class UploadService extends BaseService {
|
|
|
return getBasename(fileName);
|
|
|
}
|
|
|
|
|
|
- private String getBasename(String fileName){
|
|
|
+ private String getBasename(String fileName) {
|
|
|
int index = fileName.indexOf(".");
|
|
|
- if(index != -1){
|
|
|
- return fileName.substring(0,index);
|
|
|
+ if (index != -1) {
|
|
|
+ return fileName.substring(0, index);
|
|
|
}
|
|
|
|
|
|
return fileName;
|