Sfoglia il codice sorgente

AMBARI-1384 Created job history query service that uses the API framework. This is not a replacement for WorkflowJsonService. (billie)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1464989 13f79535-47bb-0310-9956-ffa450edef68
Billie Rinaldi 12 anni fa
parent
commit
5db3355dd9
28 ha cambiato i file con 3058 aggiunte e 4 eliminazioni
  1. 3 0
      CHANGES.txt
  2. 1 0
      ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java
  3. 54 0
      ambari-server/src/main/java/org/apache/ambari/server/api/resources/JobResourceDefinition.java
  4. 12 0
      ambari-server/src/main/java/org/apache/ambari/server/api/resources/ResourceInstanceFactoryImpl.java
  5. 44 0
      ambari-server/src/main/java/org/apache/ambari/server/api/resources/TaskAttemptResourceDefinition.java
  6. 54 0
      ambari-server/src/main/java/org/apache/ambari/server/api/resources/WorkflowResourceDefinition.java
  7. 8 0
      ambari-server/src/main/java/org/apache/ambari/server/api/services/ClusterService.java
  8. 121 0
      ambari-server/src/main/java/org/apache/ambari/server/api/services/JobService.java
  9. 129 0
      ambari-server/src/main/java/org/apache/ambari/server/api/services/TaskAttemptService.java
  10. 112 0
      ambari-server/src/main/java/org/apache/ambari/server/api/services/WorkflowService.java
  11. 150 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractJDBCResourceProvider.java
  12. 17 2
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DefaultProviderModule.java
  13. 353 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/JobResourceProvider.java
  14. 360 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TaskAttemptResourceProvider.java
  15. 331 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/WorkflowResourceProvider.java
  16. 68 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/jdbc/JobHistoryPostgresConnectionFactory.java
  17. 4 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/spi/Resource.java
  18. 48 1
      ambari-server/src/main/resources/properties.json
  19. 52 0
      ambari-server/src/test/java/org/apache/ambari/server/api/resources/JobResourceDefinitionTest.java
  20. 47 0
      ambari-server/src/test/java/org/apache/ambari/server/api/resources/TaskAttemptResourceDefinitionTest.java
  21. 51 0
      ambari-server/src/test/java/org/apache/ambari/server/api/resources/WorkflowResourceDefinitionTest.java
  22. 93 0
      ambari-server/src/test/java/org/apache/ambari/server/api/services/JobServiceTest.java
  23. 98 0
      ambari-server/src/test/java/org/apache/ambari/server/api/services/TaskAttemptServiceTest.java
  24. 90 0
      ambari-server/src/test/java/org/apache/ambari/server/api/services/WorkflowServiceTest.java
  25. 143 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AbstractJDBCResourceProviderTest.java
  26. 267 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/JobResourceProviderTest.java
  27. 181 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/TaskAttemptResourceProviderTest.java
  28. 167 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/WorkflowResourceProviderTest.java

+ 3 - 0
CHANGES.txt

@@ -553,6 +553,9 @@ Trunk (unreleased changes):
  AMBARI-1797. For global site properties, need property to services effected
  map. (mahadev)
 
+ AMBARI-1384. WorkflowJsonService service doesn't use the API framework and is
+ inconsistent with other API's. (billie)
+
  BUG FIXES
 
  AMBARI-1791. Can not specify request context for smoke test request. (swagle)

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/resources/ClusterResourceDefinition.java

@@ -53,6 +53,7 @@ public class ClusterResourceDefinition extends BaseResourceDefinition {
     setChildren.add(new SubResourceDefinition(Resource.Type.Host));
     setChildren.add(new SubResourceDefinition(Resource.Type.Configuration));
     setChildren.add(new SubResourceDefinition(Resource.Type.Request));
+    setChildren.add(new SubResourceDefinition(Resource.Type.Workflow));
 
     return setChildren;
   }

+ 54 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/resources/JobResourceDefinition.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.resources;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Job resource definition.
+ */
+public class JobResourceDefinition extends BaseResourceDefinition {
+
+  /**
+   * Constructor.
+   */
+  public JobResourceDefinition() {
+    super(Resource.Type.Job);
+  }
+
+  @Override
+  public String getPluralName() {
+    return "jobs";
+  }
+
+  @Override
+  public String getSingularName() {
+    return "job";
+  }
+
+  @Override
+  public Set<SubResourceDefinition> getSubResourceDefinitions() {
+    Set<SubResourceDefinition> setChildren = new HashSet<SubResourceDefinition>();
+    setChildren.add(new SubResourceDefinition(Resource.Type.TaskAttempt));
+    return setChildren;
+  }
+}

+ 12 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/resources/ResourceInstanceFactoryImpl.java

@@ -119,6 +119,18 @@ public class ResourceInstanceFactoryImpl implements ResourceInstanceFactory {
         resourceDefinition = new InstanceResourceDefinition();
         break;
 
+      case Workflow:
+        resourceDefinition = new WorkflowResourceDefinition();
+        break;
+
+      case Job:
+        resourceDefinition = new JobResourceDefinition();
+        break;
+
+      case TaskAttempt:
+        resourceDefinition = new TaskAttemptResourceDefinition();
+        break;
+
       default:
         throw new IllegalArgumentException("Unsupported resource type: " + type);
     }

+ 44 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/resources/TaskAttemptResourceDefinition.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.resources;
+
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Task attempt resource definition.
+ */
+public class TaskAttemptResourceDefinition extends BaseResourceDefinition {
+
+  /**
+   * Constructor.
+   */
+  public TaskAttemptResourceDefinition() {
+    super(Resource.Type.TaskAttempt);
+  }
+
+  @Override
+  public String getPluralName() {
+    return "taskattempts";
+  }
+
+  @Override
+  public String getSingularName() {
+    return "taskattempt";
+  }
+}

+ 54 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/resources/WorkflowResourceDefinition.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.resources;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Workflow resource definition.
+ */
+public class WorkflowResourceDefinition extends BaseResourceDefinition {
+
+  /**
+   * Constructor.
+   */
+  public WorkflowResourceDefinition() {
+    super(Resource.Type.Workflow);
+  }
+
+  @Override
+  public String getPluralName() {
+    return "workflows";
+  }
+
+  @Override
+  public String getSingularName() {
+    return "workflow";
+  }
+
+  @Override
+  public Set<SubResourceDefinition> getSubResourceDefinitions() {
+    Set<SubResourceDefinition> setChildren = new HashSet<SubResourceDefinition>();
+    setChildren.add(new SubResourceDefinition(Resource.Type.Job));
+    return setChildren;
+  }
+}

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/services/ClusterService.java

@@ -171,6 +171,14 @@ public class ClusterService extends BaseService {
     return new HostComponentService(clusterName, null);
   }
 
+  /**
+   * Gets the workflows sub-resource.
+   */
+  @Path("{clusterName}/workflows")
+  public WorkflowService getWorkflowHandler(@PathParam("clusterName") String clusterName) {
+    return new WorkflowService(clusterName);
+  }
+
   /**
    * Create a cluster resource instance.
    *

+ 121 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/services/JobService.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Service responsible for job resource requests.
+ */
+public class JobService extends BaseService {
+  private String workflowId;
+  private String clusterName;
+
+  /**
+   * Constructor.
+   * 
+   * @param clusterName
+   *          cluster name
+   * @param workflowId
+   *          workflow id
+   */
+  public JobService(String clusterName, String workflowId) {
+    this.clusterName = clusterName;
+    this.workflowId = workflowId;
+  }
+
+  /**
+   * Handles: GET /workflows/{workflowId}/jobs/{jobId} Get a specific job.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * @param jobId
+   *          job id
+   * @return job instance representation
+   */
+  @GET
+  @Path("{jobId}")
+  @Produces("text/plain")
+  public Response getJob(@Context HttpHeaders headers, @Context UriInfo ui,
+      @PathParam("jobId") String jobId) {
+    return handleRequest(headers, null, ui, Request.Type.GET,
+        createJobResource(clusterName, workflowId, jobId));
+  }
+
+  /**
+   * Handles: GET /workflows/{workflowId}/jobs Get all jobs.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * @return job collection resource representation
+   */
+  @GET
+  @Produces("text/plain")
+  public Response getJobs(@Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(headers, null, ui, Request.Type.GET,
+        createJobResource(clusterName, workflowId, null));
+  }
+
+  /**
+   * Gets the task attempts sub-resource.
+   */
+  @Path("{jobId}/taskattempts")
+  public TaskAttemptService getTaskAttemptHandler(
+      @PathParam("jobId") String jobId) {
+    return new TaskAttemptService(clusterName, workflowId, jobId);
+  }
+
+  /**
+   * Create a job resource instance.
+   * 
+   * @param clusterName
+   *          cluster name
+   * @param workflowId
+   *          workflow id
+   * @param jobId
+   *          job id
+   * 
+   * @return a job resource instance
+   */
+  ResourceInstance createJobResource(String clusterName, String workflowId,
+      String jobId) {
+    Map<Resource.Type,String> mapIds = new HashMap<Resource.Type,String>();
+    mapIds.put(Resource.Type.Cluster, clusterName);
+    mapIds.put(Resource.Type.Workflow, workflowId);
+    mapIds.put(Resource.Type.Job, jobId);
+    return createResource(Resource.Type.Job, mapIds);
+  }
+}

+ 129 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/services/TaskAttemptService.java

@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Service responsible for task attempt resource requests.
+ */
+public class TaskAttemptService extends BaseService {
+  private String jobId;
+  private String workflowId;
+  private String clusterName;
+
+  /**
+   * Constructor.
+   * 
+   * @param clusterName
+   *          cluster name
+   * @param workflowId
+   *          workflow id
+   * @param jobId
+   *          job id
+   */
+  public TaskAttemptService(String clusterName, String workflowId, String jobId) {
+    this.clusterName = clusterName;
+    this.workflowId = workflowId;
+    this.jobId = jobId;
+  }
+
+  /**
+   * Handles: GET
+   * /workflows/{workflowId}/jobs/{jobId}/taskattempts/{taskattemptid} Get a
+   * specific taskattempt.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * @param taskAttemptId
+   *          task attempt id
+   * 
+   * @return task attempt instance representation
+   */
+  @GET
+  @Path("{taskAttemptId}")
+  @Produces("text/plain")
+  public Response getTaskAttempt(@Context HttpHeaders headers,
+      @Context UriInfo ui, @PathParam("taskAttemptId") String taskAttemptId) {
+    return handleRequest(
+        headers,
+        null,
+        ui,
+        Request.Type.GET,
+        createTaskAttemptResource(clusterName, workflowId, jobId, taskAttemptId));
+  }
+
+  /**
+   * Handles: GET /workflows/{workflowId}/jobs/taskattempts Get all task
+   * attempts.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * 
+   * @return task attempt collection resource representation
+   */
+  @GET
+  @Produces("text/plain")
+  public Response getTaskAttempts(@Context HttpHeaders headers,
+      @Context UriInfo ui) {
+    return handleRequest(headers, null, ui, Request.Type.GET,
+        createTaskAttemptResource(clusterName, workflowId, jobId, null));
+  }
+
+  /**
+   * Create a task attempt resource instance.
+   * 
+   * @param clusterName
+   *          cluster name
+   * @param workflowId
+   *          workflow id
+   * @param jobId
+   *          job id
+   * @param taskAttemptId
+   *          task attempt id
+   * 
+   * @return a task attempt resource instance
+   */
+  ResourceInstance createTaskAttemptResource(String clusterName,
+      String workflowId, String jobId, String taskAttemptId) {
+    Map<Resource.Type,String> mapIds = new HashMap<Resource.Type,String>();
+    mapIds.put(Resource.Type.Cluster, clusterName);
+    mapIds.put(Resource.Type.Workflow, workflowId);
+    mapIds.put(Resource.Type.Job, jobId);
+    mapIds.put(Resource.Type.TaskAttempt, taskAttemptId);
+    return createResource(Resource.Type.TaskAttempt, mapIds);
+  }
+}

+ 112 - 0
ambari-server/src/main/java/org/apache/ambari/server/api/services/WorkflowService.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.api.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.controller.spi.Resource;
+
+/**
+ * Service responsible for workflow resource requests.
+ */
+public class WorkflowService extends BaseService {
+  private String clusterName;
+
+  /**
+   * Constructor.
+   * 
+   * @param clusterName
+   *          cluster id
+   */
+  public WorkflowService(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  /**
+   * Handles: GET /workflows/{workflowId} Get a specific workflow.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * @param workflowId
+   *          workflow id
+   * @return workflow instance representation
+   */
+  @GET
+  @Path("{workflowId}")
+  @Produces("text/plain")
+  public Response getWorkflow(@Context HttpHeaders headers,
+      @Context UriInfo ui, @PathParam("workflowId") String workflowId) {
+    return handleRequest(headers, null, ui, Request.Type.GET,
+        createWorkflowResource(clusterName, workflowId));
+  }
+
+  /**
+   * Handles: GET /workflows Get all workflows.
+   * 
+   * @param headers
+   *          http headers
+   * @param ui
+   *          uri info
+   * @return workflow collection resource representation
+   */
+  @GET
+  @Produces("text/plain")
+  public Response getWorkflows(@Context HttpHeaders headers, @Context UriInfo ui) {
+    return handleRequest(headers, null, ui, Request.Type.GET,
+        createWorkflowResource(clusterName, null));
+  }
+
+  /**
+   * Gets the jobs sub-resource.
+   */
+  @Path("{workflowId}/jobs")
+  public JobService getJobHandler(@PathParam("workflowId") String workflowId) {
+    return new JobService(clusterName, workflowId);
+  }
+
+  /**
+   * Create a workflow resource instance.
+   * 
+   * @param clusterName
+   *          cluster name
+   * @param workflowId
+   *          workflow id
+   * 
+   * @return a workflow resource instance
+   */
+  ResourceInstance createWorkflowResource(String clusterName, String workflowId) {
+    Map<Resource.Type,String> mapIds = new HashMap<Resource.Type,String>();
+    mapIds.put(Resource.Type.Cluster, clusterName);
+    mapIds.put(Resource.Type.Workflow, workflowId);
+    return createResource(Resource.Type.Workflow, mapIds);
+  }
+}

+ 150 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractJDBCResourceProvider.java

@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Abstract resource provider implementation that contains helper methods for
+ * retrieving data from a result set.
+ */
+public abstract class AbstractJDBCResourceProvider<E extends Enum<E>> extends
+    AbstractResourceProvider {
+  private final Map<String,E> dbFields;
+
+  /**
+   * Create a new resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   */
+  protected AbstractJDBCResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds) {
+    super(propertyIds, keyPropertyIds);
+    this.dbFields = getDBFieldMap();
+  }
+
+  /**
+   * Gets a map from property ids to db fields.
+   * 
+   * @return the map from property ids to db fields.
+   */
+  protected abstract Map<String,E> getDBFieldMap();
+
+  /**
+   * Retrieves the db field corresponding to a property id from a result set as
+   * a string and sets the resulting string as a resource property.
+   * 
+   * @param resource
+   *          resource object to set the property on
+   * @param propertyId
+   *          the property id to retrieve from the result set
+   * @param rs
+   *          the result set
+   * @param requestedIds
+   *          the requested ids
+   * @throws SQLException
+   *           if property id cannot be retrieved from the result set
+   */
+  protected void setString(Resource resource, String propertyId, ResultSet rs,
+      Set<String> requestedIds) throws SQLException {
+    if (requestedIds.contains(propertyId))
+      setResourceProperty(resource, propertyId,
+          rs.getString(dbFields.get(propertyId).toString()), requestedIds);
+  }
+
+  /**
+   * Retrieves the db field corresponding to a property id from a result set as
+   * an int and sets the resulting int as a resource property.
+   * 
+   * @param resource
+   *          resource object to set the property on
+   * @param propertyId
+   *          the property id to retrieve from the result set
+   * @param rs
+   *          the result set
+   * @param requestedIds
+   *          the requested ids
+   * @throws SQLException
+   *           if property id cannot be retrieved from the result set
+   */
+  protected void setInt(Resource resource, String propertyId, ResultSet rs,
+      Set<String> requestedIds) throws SQLException {
+    if (requestedIds.contains(propertyId))
+      setResourceProperty(resource, propertyId,
+          rs.getInt(dbFields.get(propertyId).toString()), requestedIds);
+  }
+
+  /**
+   * Retrieves the db field corresponding to a property id from a result set as
+   * a long and sets the resulting long as a resource property.
+   * 
+   * @param resource
+   *          resource object to set the property on
+   * @param propertyId
+   *          the property id to retrieve from the result set
+   * @param rs
+   *          the result set
+   * @param requestedIds
+   *          the requested ids
+   * @throws SQLException
+   *           if property id cannot be retrieved from the result set
+   */
+  protected void setLong(Resource resource, String propertyId, ResultSet rs,
+      Set<String> requestedIds) throws SQLException {
+    if (requestedIds.contains(propertyId))
+      setResourceProperty(resource, propertyId,
+          rs.getLong(dbFields.get(propertyId).toString()), requestedIds);
+  }
+
+  /**
+   * Gets a comma-separated list of db fields corresponding to set of requested
+   * ids.
+   * 
+   * @param requestedIds
+   *          the requested ids
+   * @return a comma-separated list of db fields
+   */
+  protected String getDBFieldString(Set<String> requestedIds) {
+    String[] tmp = new String[requestedIds.size()];
+    int i = 0;
+    for (String s : requestedIds)
+      if (dbFields.containsKey(s))
+        tmp[i++] = dbFields.get(s).toString();
+    return StringUtils.join(tmp, ",", 0, i);
+  }
+
+  /**
+   * Gets a db field corresponding to a property id.
+   * 
+   * @param propertyId
+   *          the property id
+   * @return the db field enum value
+   */
+  protected E getDBField(String propertyId) {
+    return dbFields.get(propertyId);
+  }
+}

+ 17 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DefaultProviderModule.java

@@ -18,6 +18,9 @@
 
 package org.apache.ambari.server.controller.internal;
 
+import java.util.Map;
+import java.util.Set;
+
 import com.google.inject.Inject;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
@@ -48,7 +51,19 @@ public class DefaultProviderModule extends AbstractProviderModule {
 
   @Override
   protected ResourceProvider createResourceProvider(Resource.Type type) {
-    return AbstractControllerResourceProvider.getResourceProvider(type, PropertyHelper.getPropertyIds(type),
-        PropertyHelper.getKeyPropertyIds(type), managementController);
+    Set<String>               propertyIds    = PropertyHelper.getPropertyIds(type);
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper.getKeyPropertyIds(type);
+
+    switch (type) {
+      case Workflow:
+        return new WorkflowResourceProvider(propertyIds, keyPropertyIds);
+      case Job:
+        return new JobResourceProvider(propertyIds, keyPropertyIds);
+      case TaskAttempt:
+        return new TaskAttemptResourceProvider(propertyIds, keyPropertyIds);
+      default:
+        return AbstractControllerResourceProvider.getResourceProvider(type, propertyIds,
+            keyPropertyIds, managementController);
+    }
   }
 }

+ 353 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/JobResourceProvider.java

@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.jdbc.JobHistoryPostgresConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resource provider for job resources.
+ */
+public class JobResourceProvider extends
+    AbstractJDBCResourceProvider<JobResourceProvider.JobFields> {
+  private static Log LOG = LogFactory.getLog(JobResourceProvider.class);
+
+  protected static final String JOB_CLUSTER_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "cluster_name");
+  protected static final String JOB_WORKFLOW_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "workflow_id");
+  protected static final String JOB_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "job_id");
+  protected static final String JOB_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "name");
+  protected static final String JOB_STATUS_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "status");
+  protected static final String JOB_USER_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "user_name");
+  protected static final String JOB_SUBMIT_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "submit_time");
+  protected static final String JOB_ELAPSED_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "elapsed_time");
+  protected static final String JOB_MAPS_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "maps");
+  protected static final String JOB_REDUCES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "reduces");
+  protected static final String JOB_INPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "input_bytes");
+  protected static final String JOB_OUTPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "output_bytes");
+  protected static final String JOB_CONF_PATH_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "conf_path");
+  protected static final String JOB_WORKFLOW_ENTITY_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Job", "workflow_entity_name");
+
+  private static final Set<String> pkPropertyIds = new HashSet<String>(
+      Arrays.asList(new String[] {JOB_CLUSTER_NAME_PROPERTY_ID,
+          JOB_WORKFLOW_ID_PROPERTY_ID, JOB_ID_PROPERTY_ID}));
+
+  protected JobFetcher jobFetcher;
+
+  /**
+   * Create a new job resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   */
+  protected JobResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds) {
+    super(propertyIds, keyPropertyIds);
+    jobFetcher = new PostgresJobFetcher(
+        new JobHistoryPostgresConnectionFactory());
+  }
+
+  /**
+   * Create a new job resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   * @param jobFetcher
+   *          job fetcher
+   */
+  protected JobResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds, JobFetcher jobFetcher) {
+    super(propertyIds, keyPropertyIds);
+    this.jobFetcher = jobFetcher;
+  }
+
+  @Override
+  public RequestStatus createResources(Request request) throws SystemException,
+      UnsupportedPropertyException, ResourceAlreadyExistsException,
+      NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Resource> getResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+
+    Set<Resource> resourceSet = new HashSet<Resource>();
+    Set<String> requestedIds = getRequestPropertyIds(request, predicate);
+
+    Set<Map<String,Object>> predicatePropertieSet = getPropertyMaps(predicate);
+    for (Map<String,Object> predicateProperties : predicatePropertieSet) {
+      String clusterName = (String) predicateProperties
+          .get(JOB_CLUSTER_NAME_PROPERTY_ID);
+      String workflowId = (String) predicateProperties
+          .get(JOB_WORKFLOW_ID_PROPERTY_ID);
+      String jobId = (String) predicateProperties.get(JOB_ID_PROPERTY_ID);
+      resourceSet.addAll(jobFetcher.fetchJobDetails(requestedIds, clusterName,
+          workflowId, jobId));
+    }
+    return resourceSet;
+  }
+
+  @Override
+  public RequestStatus updateResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RequestStatus deleteResources(Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected Set<String> getPKPropertyIds() {
+    return pkPropertyIds;
+  }
+
+  @Override
+  public Map<Type,String> getKeyPropertyIds() {
+    Map<Type,String> keyPropertyIds = new HashMap<Type,String>();
+    keyPropertyIds.put(Type.Cluster, JOB_CLUSTER_NAME_PROPERTY_ID);
+    keyPropertyIds.put(Type.Workflow, JOB_WORKFLOW_ID_PROPERTY_ID);
+    keyPropertyIds.put(Type.Job, JOB_ID_PROPERTY_ID);
+    return keyPropertyIds;
+  }
+
+  /**
+   * Simple interface for fetching jobs from db.
+   */
+  public static interface JobFetcher {
+    /**
+     * Fetch job resources.
+     * 
+     * @param requestedIds
+     *          fields to pull from db
+     * @param clusterName
+     *          the cluster name
+     * @param workflowId
+     *          the workflow id
+     * @param jobId
+     *          the job id
+     * @return a set of job resources
+     */
+    public Set<Resource> fetchJobDetails(Set<String> requestedIds,
+        String clusterName, String workflowId, String jobId);
+  }
+
+  /**
+   * A job fetcher that queries a postgres job table.
+   */
+  protected class PostgresJobFetcher implements JobFetcher {
+    private static final String JOB_TABLE_NAME = "job";
+    private ConnectionFactory connectionFactory;
+    Connection db;
+    PreparedStatement ps;
+
+    /**
+     * Create a postgres job fetcher that uses a given connection factory.
+     * 
+     * @param connectionFactory
+     *          a connection factory
+     */
+    public PostgresJobFetcher(ConnectionFactory connectionFactory) {
+      this.connectionFactory = connectionFactory;
+      this.db = null;
+      this.ps = null;
+    }
+
+    protected ResultSet getResultSet(Set<String> requestedIds,
+        String workflowId, String jobId) throws SQLException {
+      db = null;
+      ps = null;
+      db = connectionFactory.getConnection();
+      String fields = getDBFieldString(requestedIds);
+      if (requestedIds.contains(JOB_ELAPSED_TIME_PROPERTY_ID)
+          && !requestedIds.contains(JOB_SUBMIT_TIME_PROPERTY_ID))
+        fields += "," + getDBField(JOB_SUBMIT_TIME_PROPERTY_ID).toString();
+      if (jobId == null) {
+        ps = db.prepareStatement("SELECT " + fields + " FROM " + JOB_TABLE_NAME
+            + " WHERE " + JobFields.WORKFLOWID.toString() + " = ?");
+        ps.setString(1, workflowId);
+      } else {
+        ps = db.prepareStatement("SELECT " + fields + " FROM " + JOB_TABLE_NAME
+            + " WHERE " + JobFields.JOBID.toString() + " = ?");
+        ps.setString(1, jobId);
+      }
+      return ps.executeQuery();
+    }
+
+    protected void close() {
+      if (ps != null)
+        try {
+          ps.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing statment", e);
+        }
+
+      if (db != null)
+        try {
+          db.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing connection", e);
+        }
+    }
+
+    @Override
+    public Set<Resource> fetchJobDetails(Set<String> requestedIds,
+        String clusterName, String workflowId, String jobId) {
+      Set<Resource> jobs = new HashSet<Resource>();
+      ResultSet rs = null;
+      try {
+        rs = getResultSet(requestedIds, workflowId, jobId);
+        while (rs.next()) {
+          Resource resource = new ResourceImpl(Resource.Type.Job);
+          setResourceProperty(resource, JOB_CLUSTER_NAME_PROPERTY_ID,
+              clusterName, requestedIds);
+          setString(resource, JOB_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_NAME_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_STATUS_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_USER_NAME_PROPERTY_ID, rs, requestedIds);
+          if (requestedIds.contains(JOB_SUBMIT_TIME_PROPERTY_ID)
+              || requestedIds.contains(JOB_ELAPSED_TIME_PROPERTY_ID)) {
+            long submitTime = rs.getLong(JobFields.SUBMITTIME.toString());
+            if (requestedIds.contains(JOB_SUBMIT_TIME_PROPERTY_ID))
+              setResourceProperty(resource, JOB_SUBMIT_TIME_PROPERTY_ID,
+                  submitTime, requestedIds);
+            if (requestedIds.contains(JOB_ELAPSED_TIME_PROPERTY_ID)) {
+              long finishTime = rs.getLong(JobFields.FINISHTIME.toString());
+              if (finishTime > submitTime)
+                setResourceProperty(resource, JOB_ELAPSED_TIME_PROPERTY_ID,
+                    finishTime - submitTime, requestedIds);
+              else
+                setResourceProperty(resource, JOB_ELAPSED_TIME_PROPERTY_ID, 0l,
+                    requestedIds);
+            }
+          }
+          setInt(resource, JOB_MAPS_PROPERTY_ID, rs, requestedIds);
+          setInt(resource, JOB_REDUCES_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, JOB_INPUT_BYTES_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, JOB_OUTPUT_BYTES_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_CONF_PATH_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_WORKFLOW_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, JOB_WORKFLOW_ENTITY_NAME_PROPERTY_ID, rs,
+              requestedIds);
+          jobs.add(resource);
+        }
+      } catch (SQLException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Caught exception getting resource.", e);
+        return Collections.emptySet();
+      } finally {
+        if (rs != null)
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            LOG.error("Exception while closing ResultSet", e);
+          }
+
+        close();
+      }
+      return jobs;
+    }
+  }
+
+  /**
+   * Enumeration of db fields for the job table.
+   */
+  static enum JobFields {
+    JOBID,
+    JOBNAME,
+    STATUS,
+    USERNAME,
+    SUBMITTIME,
+    FINISHTIME,
+    MAPS,
+    REDUCES,
+    INPUTBYTES,
+    OUTPUTBYTES,
+    CONFPATH,
+    WORKFLOWID,
+    WORKFLOWENTITYNAME
+  }
+
+  @Override
+  protected Map<String,JobFields> getDBFieldMap() {
+    Map<String,JobFields> dbFields = new HashMap<String,JobFields>();
+    dbFields.put(JOB_WORKFLOW_ID_PROPERTY_ID, JobFields.WORKFLOWID);
+    dbFields.put(JOB_ID_PROPERTY_ID, JobFields.JOBID);
+    dbFields.put(JOB_NAME_PROPERTY_ID, JobFields.JOBNAME);
+    dbFields.put(JOB_STATUS_PROPERTY_ID, JobFields.STATUS);
+    dbFields.put(JOB_USER_NAME_PROPERTY_ID, JobFields.USERNAME);
+    dbFields.put(JOB_SUBMIT_TIME_PROPERTY_ID, JobFields.SUBMITTIME);
+    dbFields.put(JOB_ELAPSED_TIME_PROPERTY_ID, JobFields.FINISHTIME);
+    dbFields.put(JOB_MAPS_PROPERTY_ID, JobFields.MAPS);
+    dbFields.put(JOB_REDUCES_PROPERTY_ID, JobFields.REDUCES);
+    dbFields.put(JOB_INPUT_BYTES_PROPERTY_ID, JobFields.INPUTBYTES);
+    dbFields.put(JOB_OUTPUT_BYTES_PROPERTY_ID, JobFields.OUTPUTBYTES);
+    dbFields.put(JOB_CONF_PATH_PROPERTY_ID, JobFields.CONFPATH);
+    dbFields.put(JOB_WORKFLOW_ENTITY_NAME_PROPERTY_ID,
+        JobFields.WORKFLOWENTITYNAME);
+    return dbFields;
+  }
+}

+ 360 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TaskAttemptResourceProvider.java

@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.jdbc.JobHistoryPostgresConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resource provider for task attempt resources.
+ */
+public class TaskAttemptResourceProvider extends
+    AbstractJDBCResourceProvider<TaskAttemptResourceProvider.TaskAttemptFields> {
+  private static Log LOG = LogFactory.getLog(TaskAttemptResourceProvider.class);
+
+  protected static final String TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "cluster_name");
+  protected static final String TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "workflow_id");
+  protected static final String TASK_ATTEMPT_JOB_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "job_id");
+  protected static final String TASK_ATTEMPT_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "task_attempt_id");
+  protected static final String TASK_ATTEMPT_TYPE_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "type");
+  protected static final String TASK_ATTEMPT_START_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "start_time");
+  protected static final String TASK_ATTEMPT_FINISH_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "finish_time");
+  protected static final String TASK_ATTEMPT_MAP_FINISH_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "map_finish_time");
+  protected static final String TASK_ATTEMPT_SHUFFLE_FINISH_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "shuffle_finish_time");
+  protected static final String TASK_ATTEMPT_SORT_FINISH_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "sort_finish_fime");
+  protected static final String TASK_ATTEMPT_INPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "input_bytes");
+  protected static final String TASK_ATTEMPT_OUTPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "output_bytes");
+  protected static final String TASK_ATTEMPT_STATUS_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "status");
+  protected static final String TASK_ATTEMPT_LOCALITY_PROPERTY_ID = PropertyHelper
+      .getPropertyId("TaskAttempt", "locality");
+
+  private static final Set<String> pkPropertyIds = new HashSet<String>(
+      Arrays.asList(new String[] {TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID,
+          TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID,
+          TASK_ATTEMPT_JOB_ID_PROPERTY_ID, TASK_ATTEMPT_ID_PROPERTY_ID}));
+
+  protected TaskAttemptFetcher taskAttemptFetcher;
+
+  /**
+   * Create a new task attempt resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   */
+  protected TaskAttemptResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds) {
+    super(propertyIds, keyPropertyIds);
+    taskAttemptFetcher = new PostgresTaskAttemptFetcher(
+        new JobHistoryPostgresConnectionFactory());
+  }
+
+  /**
+   * Create a new task attempt resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   * @param taskAttemptFetcher
+   *          task attempt fetcher
+   */
+  protected TaskAttemptResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds, TaskAttemptFetcher taskAttemptFetcher) {
+    super(propertyIds, keyPropertyIds);
+    this.taskAttemptFetcher = taskAttemptFetcher;
+  }
+
+  @Override
+  public RequestStatus createResources(Request request) throws SystemException,
+      UnsupportedPropertyException, ResourceAlreadyExistsException,
+      NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Resource> getResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+
+    Set<Resource> resourceSet = new HashSet<Resource>();
+    Set<String> requestedIds = getRequestPropertyIds(request, predicate);
+
+    Set<Map<String,Object>> predicatePropertieSet = getPropertyMaps(predicate);
+    for (Map<String,Object> predicateProperties : predicatePropertieSet) {
+      String clusterName = (String) predicateProperties
+          .get(TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID);
+      String workflowId = (String) predicateProperties
+          .get(TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID);
+      String jobId = (String) predicateProperties
+          .get(TASK_ATTEMPT_JOB_ID_PROPERTY_ID);
+      String taskAttemptId = (String) predicateProperties
+          .get(TASK_ATTEMPT_ID_PROPERTY_ID);
+      resourceSet.addAll(taskAttemptFetcher.fetchTaskAttemptDetails(
+          requestedIds, clusterName, workflowId, jobId, taskAttemptId));
+    }
+    return resourceSet;
+  }
+
+  @Override
+  public RequestStatus updateResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RequestStatus deleteResources(Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected Set<String> getPKPropertyIds() {
+    return pkPropertyIds;
+  }
+
+  @Override
+  public Map<Type,String> getKeyPropertyIds() {
+    Map<Type,String> keyPropertyIds = new HashMap<Type,String>();
+    keyPropertyIds.put(Type.Cluster, TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID);
+    keyPropertyIds.put(Type.Workflow, TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID);
+    keyPropertyIds.put(Type.Job, TASK_ATTEMPT_JOB_ID_PROPERTY_ID);
+    keyPropertyIds.put(Type.TaskAttempt, TASK_ATTEMPT_ID_PROPERTY_ID);
+    return keyPropertyIds;
+  }
+
+  /**
+   * Simple interface for fetching task attempts from db.
+   */
+  public static interface TaskAttemptFetcher {
+    /**
+     * Fetch task attempt resources
+     * 
+     * @param requestedIds
+     *          fields to pull from db
+     * @param clusterName
+     *          the cluster name
+     * @param workflowId
+     *          the workflow id
+     * @param jobId
+     *          the job id
+     * @param taskAttemptId
+     *          the task attempt id
+     * @return a set of task attempt resources
+     */
+    public Set<Resource> fetchTaskAttemptDetails(Set<String> requestedIds,
+        String clusterName, String workflowId, String jobId,
+        String taskAttemptId);
+  }
+
+  /**
+   * A task attempt fetcher that queries a postgres task attempt table.
+   */
+  protected class PostgresTaskAttemptFetcher implements TaskAttemptFetcher {
+    private static final String TASK_ATTEMPT_TABLE_NAME = "taskattempt";
+    private ConnectionFactory connectionFactory;
+    Connection db;
+    PreparedStatement ps;
+
+    /**
+     * Create a postgres task attempt fetcher that uses a given connection
+     * factory.
+     * 
+     * @param connectionFactory
+     *          a connection factory
+     */
+    public PostgresTaskAttemptFetcher(ConnectionFactory connectionFactory) {
+      this.connectionFactory = connectionFactory;
+      this.db = null;
+      this.ps = null;
+    }
+
+    protected ResultSet getResultSet(Set<String> requestedIds,
+        String workflowId, String jobId, String taskAttemptId)
+        throws SQLException {
+      db = null;
+      ps = null;
+      db = connectionFactory.getConnection();
+      if (taskAttemptId == null) {
+        ps = db.prepareStatement("SELECT " + getDBFieldString(requestedIds)
+            + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE "
+            + TaskAttemptFields.JOBID + " = ? ");
+        ps.setString(1, jobId);
+      } else {
+        ps = db.prepareStatement("SELECT " + getDBFieldString(requestedIds)
+            + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE "
+            + TaskAttemptFields.TASKATTEMPTID + " = ? ");
+        ps.setString(1, taskAttemptId);
+      }
+      return ps.executeQuery();
+    }
+
+    protected void close() {
+      if (ps != null)
+        try {
+          ps.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing statment", e);
+        }
+
+      if (db != null)
+        try {
+          db.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing connection", e);
+        }
+    }
+
+    @Override
+    public Set<Resource> fetchTaskAttemptDetails(Set<String> requestedIds,
+        String clusterName, String workflowId, String jobId,
+        String taskAttemptId) {
+      Set<Resource> taskAttempts = new HashSet<Resource>();
+      ResultSet rs = null;
+      try {
+        rs = getResultSet(requestedIds, workflowId, jobId, taskAttemptId);
+        while (rs.next()) {
+          Resource resource = new ResourceImpl(Resource.Type.TaskAttempt);
+          setResourceProperty(resource, TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID,
+              clusterName, requestedIds);
+          setResourceProperty(resource, TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID,
+              workflowId, requestedIds);
+          setString(resource, TASK_ATTEMPT_JOB_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, TASK_ATTEMPT_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, TASK_ATTEMPT_TYPE_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, TASK_ATTEMPT_START_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_FINISH_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_MAP_FINISH_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_SHUFFLE_FINISH_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_SORT_FINISH_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_INPUT_BYTES_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, TASK_ATTEMPT_OUTPUT_BYTES_PROPERTY_ID, rs,
+              requestedIds);
+          setString(resource, TASK_ATTEMPT_STATUS_PROPERTY_ID, rs, requestedIds);
+          setString(resource, TASK_ATTEMPT_LOCALITY_PROPERTY_ID, rs,
+              requestedIds);
+          taskAttempts.add(resource);
+        }
+      } catch (SQLException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Caught exception getting resource.", e);
+        return Collections.emptySet();
+      } finally {
+        if (rs != null)
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            LOG.error("Exception while closing ResultSet", e);
+          }
+
+        close();
+      }
+      return taskAttempts;
+    }
+  }
+
+  /**
+   * Enumeration of db fields for the task attempt table.
+   */
+  static enum TaskAttemptFields {
+    JOBID,
+    TASKATTEMPTID,
+    TASKTYPE,
+    STARTTIME,
+    FINISHTIME,
+    MAPFINISHTIME,
+    SHUFFLEFINISHTIME,
+    SORTFINISHTIME,
+    INPUTBYTES,
+    OUTPUTBYTES,
+    STATUS,
+    LOCALITY
+  }
+
+  @Override
+  protected Map<String,TaskAttemptFields> getDBFieldMap() {
+    Map<String,TaskAttemptFields> dbFields = new HashMap<String,TaskAttemptFields>();
+    dbFields.put(TASK_ATTEMPT_JOB_ID_PROPERTY_ID, TaskAttemptFields.JOBID);
+    dbFields.put(TASK_ATTEMPT_ID_PROPERTY_ID, TaskAttemptFields.TASKATTEMPTID);
+    dbFields.put(TASK_ATTEMPT_TYPE_PROPERTY_ID, TaskAttemptFields.TASKTYPE);
+    dbFields.put(TASK_ATTEMPT_START_TIME_PROPERTY_ID,
+        TaskAttemptFields.STARTTIME);
+    dbFields.put(TASK_ATTEMPT_FINISH_TIME_PROPERTY_ID,
+        TaskAttemptFields.FINISHTIME);
+    dbFields.put(TASK_ATTEMPT_MAP_FINISH_TIME_PROPERTY_ID,
+        TaskAttemptFields.MAPFINISHTIME);
+    dbFields.put(TASK_ATTEMPT_SHUFFLE_FINISH_TIME_PROPERTY_ID,
+        TaskAttemptFields.SHUFFLEFINISHTIME);
+    dbFields.put(TASK_ATTEMPT_SORT_FINISH_TIME_PROPERTY_ID,
+        TaskAttemptFields.SORTFINISHTIME);
+    dbFields.put(TASK_ATTEMPT_INPUT_BYTES_PROPERTY_ID,
+        TaskAttemptFields.INPUTBYTES);
+    dbFields.put(TASK_ATTEMPT_OUTPUT_BYTES_PROPERTY_ID,
+        TaskAttemptFields.OUTPUTBYTES);
+    dbFields.put(TASK_ATTEMPT_STATUS_PROPERTY_ID, TaskAttemptFields.STATUS);
+    dbFields.put(TASK_ATTEMPT_LOCALITY_PROPERTY_ID, TaskAttemptFields.LOCALITY);
+    return dbFields;
+  }
+}

+ 331 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/WorkflowResourceProvider.java

@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.internal;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.jdbc.JobHistoryPostgresConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resource provider for workflow resources.
+ */
+public class WorkflowResourceProvider extends
+    AbstractJDBCResourceProvider<WorkflowResourceProvider.WorkflowFields> {
+  private static Log LOG = LogFactory.getLog(WorkflowResourceProvider.class);
+
+  protected static final String WORKFLOW_CLUSTER_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "cluster_name");
+  protected static final String WORKFLOW_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "workflow_id");
+  protected static final String WORKFLOW_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "name");
+  protected static final String WORKFLOW_USER_NAME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "user_name");
+  protected static final String WORKFLOW_START_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "start_time");
+  protected static final String WORKFLOW_LAST_UPDATE_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "last_update_time");
+  protected static final String WORKFLOW_ELAPSED_TIME_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "elapsed_time");
+  protected static final String WORKFLOW_INPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "input_bytes");
+  protected static final String WORKFLOW_OUTPUT_BYTES_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "output_bytes");
+  protected static final String WORKFLOW_NUM_JOBS_TOTAL_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "num_jobs_total");
+  protected static final String WORKFLOW_NUM_JOBS_COMPLETED_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "num_jobs_completed");
+  protected static final String WORKFLOW_PARENT_ID_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "parent_id");
+  protected static final String WORKFLOW_CONTEXT_PROPERTY_ID = PropertyHelper
+      .getPropertyId("Workflow", "context");
+
+  private static final Set<String> pkPropertyIds = new HashSet<String>(
+      Arrays.asList(new String[] {WORKFLOW_CLUSTER_NAME_PROPERTY_ID,
+          WORKFLOW_ID_PROPERTY_ID}));
+
+  protected WorkflowFetcher workflowFetcher;
+
+  /**
+   * Create a new workflow resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   */
+  protected WorkflowResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds) {
+    super(propertyIds, keyPropertyIds);
+    this.workflowFetcher = new PostgresWorkflowFetcher(
+        new JobHistoryPostgresConnectionFactory());
+  }
+
+  /**
+   * Create a new workflow resource provider.
+   * 
+   * @param propertyIds
+   *          the property ids
+   * @param keyPropertyIds
+   *          the key property ids
+   * @param workflowFetcher
+   *          workflow fetcher
+   */
+  protected WorkflowResourceProvider(Set<String> propertyIds,
+      Map<Type,String> keyPropertyIds, WorkflowFetcher workflowFetcher) {
+    super(propertyIds, keyPropertyIds);
+    this.workflowFetcher = workflowFetcher;
+  }
+
+  @Override
+  public RequestStatus createResources(Request request) throws SystemException,
+      UnsupportedPropertyException, ResourceAlreadyExistsException,
+      NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Resource> getResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+
+    Set<Resource> resourceSet = new HashSet<Resource>();
+    Set<String> requestedIds = getRequestPropertyIds(request, predicate);
+
+    Set<Map<String,Object>> predicatePropertieSet = getPropertyMaps(predicate);
+    for (Map<String,Object> predicateProperties : predicatePropertieSet) {
+      String clusterName = (String) predicateProperties
+          .get(WORKFLOW_CLUSTER_NAME_PROPERTY_ID);
+      String workflowId = (String) predicateProperties
+          .get(WORKFLOW_ID_PROPERTY_ID);
+      resourceSet.addAll(workflowFetcher.fetchWorkflows(requestedIds,
+          clusterName, workflowId));
+    }
+    return resourceSet;
+  }
+
+  @Override
+  public RequestStatus updateResources(Request request, Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RequestStatus deleteResources(Predicate predicate)
+      throws SystemException, UnsupportedPropertyException,
+      NoSuchResourceException, NoSuchParentResourceException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected Set<String> getPKPropertyIds() {
+    return pkPropertyIds;
+  }
+
+  @Override
+  public Map<Type,String> getKeyPropertyIds() {
+    Map<Type,String> keyPropertyIds = new HashMap<Type,String>();
+    keyPropertyIds.put(Type.Cluster, WORKFLOW_CLUSTER_NAME_PROPERTY_ID);
+    keyPropertyIds.put(Type.Workflow, WORKFLOW_ID_PROPERTY_ID);
+    return keyPropertyIds;
+  }
+
+  /**
+   * Simple interface for fetching workflows from db.
+   */
+  public static interface WorkflowFetcher {
+    /**
+     * Fetch workflow resources.
+     * 
+     * @param requestedIds
+     *          fields to pull from db
+     * @param clusterName
+     *          the cluster name
+     * @param workflowId
+     *          the workflow id
+     * @return a set of workflow resources
+     */
+    public Set<Resource> fetchWorkflows(Set<String> requestedIds,
+        String clusterName, String workflowId);
+  }
+
+  /**
+   * A workflow fetcher that queries a postgres workflow table.
+   */
+  protected class PostgresWorkflowFetcher implements WorkflowFetcher {
+    private static final String WORKFLOW_TABLE_NAME = "workflow";
+    private ConnectionFactory connectionFactory;
+    private Connection db;
+    private PreparedStatement ps;
+
+    /**
+     * Create a postgres workflow fetcher that uses a given connection factory.
+     * 
+     * @param connectionFactory
+     *          a connection factory
+     */
+    public PostgresWorkflowFetcher(ConnectionFactory connectionFactory) {
+      this.connectionFactory = connectionFactory;
+      this.db = null;
+      this.ps = null;
+    }
+
+    protected ResultSet getResultSet(Set<String> requestedIds, String workflowId)
+        throws SQLException {
+      db = null;
+      ps = null;
+      db = connectionFactory.getConnection();
+      if (workflowId == null) {
+        ps = db.prepareStatement("SELECT " + getDBFieldString(requestedIds)
+            + " FROM " + WORKFLOW_TABLE_NAME);
+      } else {
+        ps = db.prepareStatement("SELECT " + getDBFieldString(requestedIds)
+            + " FROM " + WORKFLOW_TABLE_NAME + " WHERE "
+            + WorkflowFields.WORKFLOWID + " = ?");
+        ps.setString(1, workflowId);
+      }
+      return ps.executeQuery();
+    }
+
+    protected void close() {
+      if (ps != null)
+        try {
+          ps.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing statment", e);
+        }
+
+      if (db != null)
+        try {
+          db.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing connection", e);
+        }
+    }
+
+    @Override
+    public Set<Resource> fetchWorkflows(Set<String> requestedIds,
+        String clusterName, String workflowId) {
+      Set<Resource> workflows = new HashSet<Resource>();
+      ResultSet rs = null;
+      try {
+        rs = getResultSet(requestedIds, workflowId);
+        while (rs.next()) {
+          Resource resource = new ResourceImpl(Resource.Type.Workflow);
+          setResourceProperty(resource, WORKFLOW_CLUSTER_NAME_PROPERTY_ID,
+              clusterName, requestedIds);
+          setString(resource, WORKFLOW_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, WORKFLOW_NAME_PROPERTY_ID, rs, requestedIds);
+          setString(resource, WORKFLOW_USER_NAME_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, WORKFLOW_START_TIME_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, WORKFLOW_LAST_UPDATE_TIME_PROPERTY_ID, rs,
+              requestedIds);
+          setLong(resource, WORKFLOW_ELAPSED_TIME_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, WORKFLOW_INPUT_BYTES_PROPERTY_ID, rs, requestedIds);
+          setLong(resource, WORKFLOW_OUTPUT_BYTES_PROPERTY_ID, rs, requestedIds);
+          setInt(resource, WORKFLOW_NUM_JOBS_TOTAL_PROPERTY_ID, rs,
+              requestedIds);
+          setInt(resource, WORKFLOW_NUM_JOBS_COMPLETED_PROPERTY_ID, rs,
+              requestedIds);
+          setString(resource, WORKFLOW_PARENT_ID_PROPERTY_ID, rs, requestedIds);
+          setString(resource, WORKFLOW_CONTEXT_PROPERTY_ID, rs, requestedIds);
+          workflows.add(resource);
+        }
+      } catch (SQLException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Caught exception getting resource.", e);
+        return Collections.emptySet();
+      } finally {
+        if (rs != null)
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            LOG.error("Exception while closing ResultSet", e);
+          }
+
+        close();
+      }
+      return workflows;
+    }
+  }
+
+  /**
+   * Enumeration of db fields for the workflow table.
+   */
+  static enum WorkflowFields {
+    WORKFLOWID,
+    WORKFLOWNAME,
+    USERNAME,
+    STARTTIME,
+    LASTUPDATETIME,
+    DURATION,
+    NUMJOBSTOTAL,
+    NUMJOBSCOMPLETED,
+    INPUTBYTES,
+    OUTPUTBYTES,
+    PARENTWORKFLOWID,
+    WORKFLOWCONTEXT
+  }
+
+  @Override
+  protected Map<String,WorkflowFields> getDBFieldMap() {
+    Map<String,WorkflowFields> dbFields = new HashMap<String,WorkflowFields>();
+    dbFields.put(WORKFLOW_ID_PROPERTY_ID, WorkflowFields.WORKFLOWID);
+    dbFields.put(WORKFLOW_NAME_PROPERTY_ID, WorkflowFields.WORKFLOWNAME);
+    dbFields.put(WORKFLOW_USER_NAME_PROPERTY_ID, WorkflowFields.USERNAME);
+    dbFields.put(WORKFLOW_START_TIME_PROPERTY_ID, WorkflowFields.STARTTIME);
+    dbFields.put(WORKFLOW_LAST_UPDATE_TIME_PROPERTY_ID,
+        WorkflowFields.LASTUPDATETIME);
+    dbFields.put(WORKFLOW_ELAPSED_TIME_PROPERTY_ID, WorkflowFields.DURATION);
+    dbFields.put(WORKFLOW_INPUT_BYTES_PROPERTY_ID, WorkflowFields.INPUTBYTES);
+    dbFields.put(WORKFLOW_OUTPUT_BYTES_PROPERTY_ID, WorkflowFields.OUTPUTBYTES);
+    dbFields.put(WORKFLOW_NUM_JOBS_TOTAL_PROPERTY_ID,
+        WorkflowFields.NUMJOBSTOTAL);
+    dbFields.put(WORKFLOW_NUM_JOBS_COMPLETED_PROPERTY_ID,
+        WorkflowFields.NUMJOBSCOMPLETED);
+    dbFields.put(WORKFLOW_PARENT_ID_PROPERTY_ID,
+        WorkflowFields.PARENTWORKFLOWID);
+    dbFields.put(WORKFLOW_CONTEXT_PROPERTY_ID, WorkflowFields.WORKFLOWCONTEXT);
+    return dbFields;
+  }
+}

+ 68 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/jdbc/JobHistoryPostgresConnectionFactory.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * Connection factory implementation for job history postgres db.
+ */
+public class JobHistoryPostgresConnectionFactory implements ConnectionFactory {
+  private static final String DEFAULT_HOSTNAME = "localhost";
+  private static final String DEFAULT_DBNAME = "ambarirca";
+  private static final String DEFAULT_USERNAME = "mapred";
+  private static final String DEFAULT_PASSWORD = "mapred";
+
+  private String url;
+  private String username;
+  private String password;
+
+  /**
+   * Create a connection factory with default parameters.
+   */
+  public JobHistoryPostgresConnectionFactory() {
+    this(DEFAULT_HOSTNAME, DEFAULT_DBNAME, DEFAULT_USERNAME, DEFAULT_PASSWORD);
+  }
+
+  /**
+   * Create a connection factory with given parameters.
+   * 
+   * @param hostname host running postgres
+   * @param dbname name of the postgres db
+   * @param username username for postgres db
+   * @param password password for postgres db
+   */
+  public JobHistoryPostgresConnectionFactory(String hostname, String dbname, String username, String password) {
+    url = "jdbc:postgresql://" + hostname + "/" + dbname;
+    this.username = username;
+    this.password = password;
+    try {
+      Class.forName("org.postgresql.Driver");
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Can't load postgresql", e);
+    }
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    return DriverManager.getConnection(url, username, password);
+  }
+}

+ 4 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/spi/Resource.java

@@ -99,6 +99,9 @@ public interface Resource {
     StackServiceComponent,
     DRFeed,
     DRTargetCluster,
-    DRInstance
+    DRInstance,
+    Workflow,
+    Job,
+    TaskAttempt
   }
 }

+ 48 - 1
ambari-server/src/main/resources/properties.json

@@ -188,5 +188,52 @@
         "Instance/endTime",
         "Instance/details",
         "Instance/log"
-    ]
+    ],
+    "Workflow":[
+        "Workflow/cluster_name",
+        "Workflow/workflow_id",
+        "Workflow/name",
+        "Workflow/user_name",
+        "Workflow/start_time",
+        "Workflow/last_update_time",
+        "Workflow/elapsed_time",
+        "Workflow/input_bytes",
+        "Workflow/output_bytes",
+        "Workflow/num_jobs_total",
+        "Workflow/num_jobs_completed",
+        "Workflow/parent_id",
+        "Workflow/context"
+    ],
+    "Job":[
+        "Job/cluster_name",
+        "Job/workflow_id",
+        "Job/job_id",
+        "Job/name",
+        "Job/status",
+        "Job/user_name",
+        "Job/submit_time",
+        "Job/elapsed_time",
+        "Job/maps",
+        "Job/reduces",
+        "Job/input_bytes",
+        "Job/output_bytes",
+        "Job/conf_path",
+        "Job/workflow_entity_name"
+    ],
+    "TaskAttempt":[
+        "TaskAttempt/cluster_name",
+        "TaskAttempt/workflow_id",
+        "TaskAttempt/job_id",
+        "TaskAttempt/task_attempt_id",
+        "TaskAttempt/type",
+        "TaskAttempt/start_time",
+        "TaskAttempt/finish_time",
+        "TaskAttempt/map_finish_time",
+        "TaskAttempt/shuffle_finish_time",
+        "TaskAttempt/sort_finish_fime",
+        "TaskAttempt/input_bytes",
+        "TaskAttempt/output_bytes",
+        "TaskAttempt/status",
+        "TaskAttempt/locality"
+     ]
 }

+ 52 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/resources/JobResourceDefinitionTest.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.resources;
+
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for JobResourceDefinition.
+ */
+public class JobResourceDefinitionTest {
+  @Test
+  public void testGetPluralName() throws Exception {
+    JobResourceDefinition definition = new JobResourceDefinition();
+    Assert.assertEquals("jobs", definition.getPluralName());
+  }
+
+  @Test
+  public void testGetSingularName() throws Exception {
+    JobResourceDefinition definition = new JobResourceDefinition();
+    Assert.assertEquals("job", definition.getSingularName());
+  }
+
+  @Test
+  public void testGetSubResourceDefinitions() throws Exception {
+    JobResourceDefinition definition = new JobResourceDefinition();
+    Set<SubResourceDefinition> subResourceDefinitions = definition
+        .getSubResourceDefinitions();
+    Assert.assertEquals(1, subResourceDefinitions.size());
+    SubResourceDefinition subResourceDefinition = subResourceDefinitions
+        .iterator().next();
+    Assert.assertEquals(Resource.Type.TaskAttempt,
+        subResourceDefinition.getType());
+  }
+}

+ 47 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/resources/TaskAttemptResourceDefinitionTest.java

@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.resources;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for TaskAttemptResourceDefinition.
+ */
+public class TaskAttemptResourceDefinitionTest {
+  @Test
+  public void testGetPluralName() throws Exception {
+    TaskAttemptResourceDefinition definition = new TaskAttemptResourceDefinition();
+    Assert.assertEquals("taskattempts", definition.getPluralName());
+  }
+
+  @Test
+  public void testGetSingularName() throws Exception {
+    TaskAttemptResourceDefinition definition = new TaskAttemptResourceDefinition();
+    Assert.assertEquals("taskattempt", definition.getSingularName());
+  }
+
+  @Test
+  public void testGetSubResourceDefinitions() throws Exception {
+    TaskAttemptResourceDefinition definition = new TaskAttemptResourceDefinition();
+    Set<SubResourceDefinition> subResourceDefinitions = definition
+        .getSubResourceDefinitions();
+    Assert.assertEquals(0, subResourceDefinitions.size());
+  }
+}

+ 51 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/resources/WorkflowResourceDefinitionTest.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.resources;
+
+import java.util.Set;
+
+import org.apache.ambari.server.controller.spi.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for WorkflowResourceDefinition.
+ */
+public class WorkflowResourceDefinitionTest {
+  @Test
+  public void testGetPluralName() throws Exception {
+    WorkflowResourceDefinition definition = new WorkflowResourceDefinition();
+    Assert.assertEquals("workflows", definition.getPluralName());
+  }
+
+  @Test
+  public void testGetSingularName() throws Exception {
+    WorkflowResourceDefinition definition = new WorkflowResourceDefinition();
+    Assert.assertEquals("workflow", definition.getSingularName());
+  }
+
+  @Test
+  public void testGetSubResourceDefinitions() throws Exception {
+    WorkflowResourceDefinition definition = new WorkflowResourceDefinition();
+    Set<SubResourceDefinition> subResourceDefinitions = definition
+        .getSubResourceDefinitions();
+    Assert.assertEquals(1, subResourceDefinitions.size());
+    SubResourceDefinition subResourceDefinition = subResourceDefinitions
+        .iterator().next();
+    Assert.assertEquals(Resource.Type.Job, subResourceDefinition.getType());
+  }
+}

+ 93 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/services/JobServiceTest.java

@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.services;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.api.services.parsers.RequestBodyParser;
+import org.apache.ambari.server.api.services.serializers.ResultSerializer;
+
+/**
+ * Unit tests for JobService.
+ */
+public class JobServiceTest extends BaseServiceTest {
+
+  @Override
+  public List<ServiceTestInvocation> getTestInvocations() throws Exception {
+    List<ServiceTestInvocation> listInvocations = new ArrayList<ServiceTestInvocation>();
+
+    // getJob
+    JobService service = new TestJobService("clusterName", "jobId");
+    Method m = service.getClass().getMethod("getJob", HttpHeaders.class,
+        UriInfo.class, String.class);
+    Object[] args = new Object[] {getHttpHeaders(), getUriInfo(), "jobId"};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    // getJobs
+    service = new TestJobService("clusterName", null);
+    m = service.getClass().getMethod("getJobs", HttpHeaders.class,
+        UriInfo.class);
+    args = new Object[] {getHttpHeaders(), getUriInfo()};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    return listInvocations;
+  }
+
+  private class TestJobService extends JobService {
+    private String workflowId;
+    private String clusterName;
+
+    public TestJobService(String clusterName, String workflowId) {
+      super(clusterName, workflowId);
+      this.clusterName = clusterName;
+      this.workflowId = workflowId;
+    }
+
+    @Override
+    ResourceInstance createJobResource(String clusterName, String workflowId,
+        String jobId) {
+      assertEquals(this.clusterName, clusterName);
+      assertEquals(this.workflowId, workflowId);
+      return getTestResource();
+    }
+
+    @Override
+    RequestFactory getRequestFactory() {
+      return getTestRequestFactory();
+    }
+
+    @Override
+    protected RequestBodyParser getBodyParser() {
+      return getTestBodyParser();
+    }
+
+    @Override
+    protected ResultSerializer getResultSerializer() {
+      return getTestResultSerializer();
+    }
+  }
+}

+ 98 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/services/TaskAttemptServiceTest.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.services;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.api.services.parsers.RequestBodyParser;
+import org.apache.ambari.server.api.services.serializers.ResultSerializer;
+
+/**
+ * Unit tests for TaskAttemptService.
+ */
+public class TaskAttemptServiceTest extends BaseServiceTest {
+
+  @Override
+  public List<ServiceTestInvocation> getTestInvocations() throws Exception {
+    List<ServiceTestInvocation> listInvocations = new ArrayList<ServiceTestInvocation>();
+
+    // getTaskAttempt
+    TestTaskAttemptService service = new TestTaskAttemptService("clusterName",
+        "workflowId", "jobId");
+    Method m = service.getClass().getMethod("getTaskAttempt",
+        HttpHeaders.class, UriInfo.class, String.class);
+    Object[] args = new Object[] {getHttpHeaders(), getUriInfo(), "jobId"};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    // getTaskAttempts
+    service = new TestTaskAttemptService("clusterName", "workflowId", "jobId");
+    m = service.getClass().getMethod("getTaskAttempts", HttpHeaders.class,
+        UriInfo.class);
+    args = new Object[] {getHttpHeaders(), getUriInfo()};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    return listInvocations;
+  }
+
+  private class TestTaskAttemptService extends TaskAttemptService {
+    private String clusterName;
+    private String workflowId;
+    private String jobId;
+
+    public TestTaskAttemptService(String clusterName, String workflowId,
+        String jobId) {
+      super(clusterName, workflowId, jobId);
+      this.clusterName = clusterName;
+      this.workflowId = workflowId;
+      this.jobId = jobId;
+    }
+
+    @Override
+    ResourceInstance createTaskAttemptResource(String clusterName,
+        String workflowId, String jobId, String taskAttemptId) {
+      assertEquals(this.clusterName, clusterName);
+      assertEquals(this.workflowId, workflowId);
+      assertEquals(this.jobId, jobId);
+      return getTestResource();
+    }
+
+    @Override
+    RequestFactory getRequestFactory() {
+      return getTestRequestFactory();
+    }
+
+    @Override
+    protected RequestBodyParser getBodyParser() {
+      return getTestBodyParser();
+    }
+
+    @Override
+    protected ResultSerializer getResultSerializer() {
+      return getTestResultSerializer();
+    }
+  }
+}

+ 90 - 0
ambari-server/src/test/java/org/apache/ambari/server/api/services/WorkflowServiceTest.java

@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.api.services;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.ambari.server.api.resources.ResourceInstance;
+import org.apache.ambari.server.api.services.parsers.RequestBodyParser;
+import org.apache.ambari.server.api.services.serializers.ResultSerializer;
+
+/**
+ * Unit tests for WorkflowService.
+ */
+public class WorkflowServiceTest extends BaseServiceTest {
+
+  @Override
+  public List<ServiceTestInvocation> getTestInvocations() throws Exception {
+    List<ServiceTestInvocation> listInvocations = new ArrayList<ServiceTestInvocation>();
+
+    // getWorkflow
+    WorkflowService service = new TestWorkflowService("clusterName");
+    Method m = service.getClass().getMethod("getWorkflow", HttpHeaders.class,
+        UriInfo.class, String.class);
+    Object[] args = new Object[] {getHttpHeaders(), getUriInfo(), "jobId"};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    // getWorkflows
+    service = new TestWorkflowService("clusterName");
+    m = service.getClass().getMethod("getWorkflows", HttpHeaders.class,
+        UriInfo.class);
+    args = new Object[] {getHttpHeaders(), getUriInfo()};
+    listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m,
+        args, null));
+
+    return listInvocations;
+  }
+
+  private class TestWorkflowService extends WorkflowService {
+    private String clusterName;
+
+    public TestWorkflowService(String clusterName) {
+      super(clusterName);
+      this.clusterName = clusterName;
+    }
+
+    @Override
+    ResourceInstance createWorkflowResource(String clusterName,
+        String workflowId) {
+      assertEquals(this.clusterName, clusterName);
+      return getTestResource();
+    }
+
+    @Override
+    RequestFactory getRequestFactory() {
+      return getTestRequestFactory();
+    }
+
+    @Override
+    protected RequestBodyParser getBodyParser() {
+      return getTestBodyParser();
+    }
+
+    @Override
+    protected ResultSerializer getResultSerializer() {
+      return getTestResultSerializer();
+    }
+  }
+}

+ 143 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AbstractJDBCResourceProviderTest.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * AbstractJDBCResourceProvider tests.
+ */
+public class AbstractJDBCResourceProviderTest {
+  private static final String property1 = "property1";
+  private static final String property2 = "property2";
+
+  @Test
+  public void test() throws SQLException {
+    Set<String> requestedIds = new TreeSet<String>();
+    requestedIds.add(property1);
+    requestedIds.add("none1");
+    requestedIds.add(property2);
+
+    AbstractJDBCResourceProvider<TestFields> provider = new TestAbstractJDBCResourceProviderImpl(
+        requestedIds, null);
+    Assert.assertEquals(
+        TestFields.field1.toString() + "," + TestFields.field2.toString(),
+        provider.getDBFieldString(requestedIds));
+    Assert.assertEquals(TestFields.field1.toString(),
+        provider.getDBFieldString(Collections.singleton(property1)));
+    Assert.assertEquals("",
+        provider.getDBFieldString(Collections.singleton("none1")));
+    Assert.assertEquals(TestFields.field1, provider.getDBField(property1));
+    Assert.assertEquals(TestFields.field2, provider.getDBField(property2));
+
+    ResultSet rs = createMock(ResultSet.class);
+    expect(rs.getString(TestFields.field1.toString())).andReturn("1").once();
+    expect(rs.getLong(TestFields.field2.toString())).andReturn(2l).once();
+    expect(rs.getInt(TestFields.field1.toString())).andReturn(3).once();
+    replay(rs);
+    Resource r = new ResourceImpl((Resource.Type) null);
+    provider.setString(r, property1, rs, requestedIds);
+    provider.setString(r, "none2", rs, requestedIds);
+    Assert.assertEquals("1", r.getPropertyValue(property1));
+    r = new ResourceImpl((Resource.Type) null);
+    provider.setLong(r, property2, rs, requestedIds);
+    provider.setLong(r, "none2", rs, requestedIds);
+    Assert.assertEquals(2l, r.getPropertyValue(property2));
+    r = new ResourceImpl((Resource.Type) null);
+    provider.setInt(r, property1, rs, requestedIds);
+    provider.setInt(r, "none2", rs, requestedIds);
+    Assert.assertEquals(3, r.getPropertyValue(property1));
+    verify(rs);
+  }
+
+  private static enum TestFields {
+    field1, field2
+  }
+
+  private static class TestAbstractJDBCResourceProviderImpl extends
+      AbstractJDBCResourceProvider<TestFields> {
+    protected TestAbstractJDBCResourceProviderImpl(Set<String> propertyIds,
+        Map<Type,String> keyPropertyIds) {
+      super(propertyIds, keyPropertyIds);
+    }
+
+    @Override
+    public RequestStatus createResources(Request request)
+        throws SystemException, UnsupportedPropertyException,
+        ResourceAlreadyExistsException, NoSuchParentResourceException {
+      return null;
+    }
+
+    @Override
+    public Set<Resource> getResources(Request request, Predicate predicate)
+        throws SystemException, UnsupportedPropertyException,
+        NoSuchResourceException, NoSuchParentResourceException {
+      return null;
+    }
+
+    @Override
+    public RequestStatus updateResources(Request request, Predicate predicate)
+        throws SystemException, UnsupportedPropertyException,
+        NoSuchResourceException, NoSuchParentResourceException {
+      return null;
+    }
+
+    @Override
+    public RequestStatus deleteResources(Predicate predicate)
+        throws SystemException, UnsupportedPropertyException,
+        NoSuchResourceException, NoSuchParentResourceException {
+      return null;
+    }
+
+    @Override
+    protected Map<String,TestFields> getDBFieldMap() {
+      Map<String,TestFields> fields = new HashMap<String,TestFields>();
+      fields.put(property1, TestFields.field1);
+      fields.put(property2, TestFields.field2);
+      return fields;
+    }
+
+    @Override
+    protected Set<String> getPKPropertyIds() {
+      return null;
+    }
+  }
+}

+ 267 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/JobResourceProviderTest.java

@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.internal.JobResourceProvider.JobFetcher;
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * JobResourceProvider tests.
+ */
+public class JobResourceProviderTest {
+  @Test
+  public void testGetResources() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<Resource> expected = new HashSet<Resource>();
+    expected.add(createJobResponse("Cluster100", "workflow1", "job1"));
+    expected.add(createJobResponse("Cluster100", "workflow2", "job2"));
+    expected.add(createJobResponse("Cluster100", "workflow2", "job3"));
+
+    Resource.Type type = Resource.Type.Job;
+    Set<String> propertyIds = PropertyHelper.getPropertyIds(type);
+
+    JobFetcher jobFetcher = createMock(JobFetcher.class);
+    expect(jobFetcher.fetchJobDetails(propertyIds, null, "workflow2", null))
+        .andReturn(expected).once();
+    replay(jobFetcher);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(type);
+    ResourceProvider provider = new JobResourceProvider(propertyIds,
+        keyPropertyIds, jobFetcher);
+
+    Request request = PropertyHelper.getReadRequest(propertyIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(JobResourceProvider.JOB_WORKFLOW_ID_PROPERTY_ID)
+        .equals("workflow2").toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(3, resources.size());
+    Set<String> names = new HashSet<String>();
+    for (Resource resource : resources) {
+      String clusterName = (String) resource
+          .getPropertyValue(JobResourceProvider.JOB_CLUSTER_NAME_PROPERTY_ID);
+      Assert.assertEquals("Cluster100", clusterName);
+      names.add((String) resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID));
+    }
+    // Make sure that all of the response objects got moved into resources
+    for (Resource resource : expected) {
+      Assert.assertTrue(names.contains(resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID)));
+    }
+
+    verify(jobFetcher);
+  }
+
+  @Test
+  public void testJobFetcher1() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(JobResourceProvider.JOB_ID_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.Job);
+    ResourceProvider provider = new TestJobResourceProvider(requestedIds,
+        keyPropertyIds, 1);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(JobResourceProvider.JOB_ID_PROPERTY_ID).equals("job1")
+        .toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID);
+      Assert.assertEquals("job1", workflowId);
+    }
+  }
+
+  @Test
+  public void testJobFetcher2() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(JobResourceProvider.JOB_ID_PROPERTY_ID);
+    requestedIds.add(JobResourceProvider.JOB_SUBMIT_TIME_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.Job);
+    ResourceProvider provider = new TestJobResourceProvider(requestedIds,
+        keyPropertyIds, 2);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(JobResourceProvider.JOB_ID_PROPERTY_ID).equals("job1")
+        .toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID);
+      Assert.assertEquals("job1", workflowId);
+      Assert.assertEquals(42l, resource
+          .getPropertyValue(JobResourceProvider.JOB_SUBMIT_TIME_PROPERTY_ID));
+    }
+  }
+
+  @Test
+  public void testJobFetcher3() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(JobResourceProvider.JOB_ID_PROPERTY_ID);
+    requestedIds.add(JobResourceProvider.JOB_ELAPSED_TIME_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.Job);
+    ResourceProvider provider = new TestJobResourceProvider(requestedIds,
+        keyPropertyIds, 3);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(JobResourceProvider.JOB_ID_PROPERTY_ID).equals("job1")
+        .toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID);
+      Assert.assertEquals("job1", workflowId);
+      Assert.assertEquals(1l, resource
+          .getPropertyValue(JobResourceProvider.JOB_ELAPSED_TIME_PROPERTY_ID));
+    }
+  }
+
+  @Test
+  public void testJobFetcher4() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(JobResourceProvider.JOB_ID_PROPERTY_ID);
+    requestedIds.add(JobResourceProvider.JOB_SUBMIT_TIME_PROPERTY_ID);
+    requestedIds.add(JobResourceProvider.JOB_ELAPSED_TIME_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.Job);
+    ResourceProvider provider = new TestJobResourceProvider(requestedIds,
+        keyPropertyIds, 4);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(JobResourceProvider.JOB_ID_PROPERTY_ID).equals("job1")
+        .toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(JobResourceProvider.JOB_ID_PROPERTY_ID);
+      Assert.assertEquals("job1", workflowId);
+      Assert.assertEquals(42l, resource
+          .getPropertyValue(JobResourceProvider.JOB_SUBMIT_TIME_PROPERTY_ID));
+      Assert.assertEquals(0l, resource
+          .getPropertyValue(JobResourceProvider.JOB_ELAPSED_TIME_PROPERTY_ID));
+    }
+  }
+
+  private static Resource createJobResponse(String clusterName,
+      String workflowId, String jobId) {
+    Resource r = new ResourceImpl(Resource.Type.Job);
+    r.setProperty(JobResourceProvider.JOB_CLUSTER_NAME_PROPERTY_ID, clusterName);
+    r.setProperty(JobResourceProvider.JOB_WORKFLOW_ID_PROPERTY_ID, workflowId);
+    r.setProperty(JobResourceProvider.JOB_ID_PROPERTY_ID, jobId);
+    return r;
+  }
+
+  private static class TestJobResourceProvider extends JobResourceProvider {
+    protected TestJobResourceProvider(Set<String> propertyIds,
+        Map<Type,String> keyPropertyIds, int type) {
+      super(propertyIds, keyPropertyIds);
+      this.jobFetcher = new TestJobFetcher(type);
+    }
+
+    private class TestJobFetcher extends PostgresJobFetcher {
+      ResultSet rs = null;
+      int type;
+
+      public TestJobFetcher(int type) {
+        super((ConnectionFactory) null);
+        this.type = type;
+      }
+
+      @Override
+      protected ResultSet getResultSet(Set<String> requestedIds,
+          String workflowId, String jobId) throws SQLException {
+        rs = createMock(ResultSet.class);
+        expect(rs.next()).andReturn(true).once();
+        expect(rs.getString(getDBField(JOB_ID_PROPERTY_ID).toString()))
+            .andReturn("job1").once();
+        if (type > 1)
+          expect(rs.getLong(getDBField(JOB_SUBMIT_TIME_PROPERTY_ID).toString()))
+              .andReturn(42l).once();
+        if (type == 3)
+          expect(rs.getLong(JobFields.FINISHTIME.toString())).andReturn(43l)
+              .once();
+        if (type == 4)
+          expect(rs.getLong(JobFields.FINISHTIME.toString())).andReturn(41l)
+              .once();
+        expect(rs.next()).andReturn(false).once();
+        rs.close();
+        expectLastCall().once();
+        replay(rs);
+        return rs;
+      }
+
+      @Override
+      protected void close() {
+        verify(rs);
+      }
+    }
+  }
+}

+ 181 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/TaskAttemptResourceProviderTest.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.internal.TaskAttemptResourceProvider.TaskAttemptFetcher;
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * TaskAttemptResourceProvider tests
+ */
+public class TaskAttemptResourceProviderTest {
+  @Test
+  public void testGetResources() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<Resource> expected = new HashSet<Resource>();
+    expected.add(createTaskAttemptResponse("Cluster100", "workflow1", "job1",
+        "taskAttempt1"));
+    expected.add(createTaskAttemptResponse("Cluster100", "workflow2", "job2",
+        "taskAttempt2"));
+    expected.add(createTaskAttemptResponse("Cluster100", "workflow2", "job2",
+        "taskAttempt3"));
+
+    Resource.Type type = Resource.Type.TaskAttempt;
+    Set<String> propertyIds = PropertyHelper.getPropertyIds(type);
+
+    TaskAttemptFetcher taskAttemptFetcher = createMock(TaskAttemptFetcher.class);
+    expect(
+        taskAttemptFetcher.fetchTaskAttemptDetails(propertyIds, null, null,
+            "job2", null)).andReturn(expected).once();
+    replay(taskAttemptFetcher);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(type);
+    ResourceProvider provider = new TaskAttemptResourceProvider(propertyIds,
+        keyPropertyIds, taskAttemptFetcher);
+
+    Request request = PropertyHelper.getReadRequest(propertyIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(TaskAttemptResourceProvider.TASK_ATTEMPT_JOB_ID_PROPERTY_ID)
+        .equals("job2").toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(3, resources.size());
+    Set<String> names = new HashSet<String>();
+    for (Resource resource : resources) {
+      String clusterName = (String) resource
+          .getPropertyValue(TaskAttemptResourceProvider.TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID);
+      Assert.assertEquals("Cluster100", clusterName);
+      names
+          .add((String) resource
+              .getPropertyValue(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID));
+    }
+    // Make sure that all of the response objects got moved into resources
+    for (Resource resource : expected) {
+      Assert
+          .assertTrue(names.contains(resource
+              .getPropertyValue(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID)));
+    }
+
+    verify(taskAttemptFetcher);
+  }
+
+  @Test
+  public void testTaskAttemptFetcher() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.TaskAttempt);
+    ResourceProvider provider = new TestTaskAttemptResourceProvider(
+        requestedIds, keyPropertyIds);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID)
+        .equals("taskattempt1").toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID);
+      Assert.assertEquals("taskattempt1", workflowId);
+    }
+  }
+
+  private static Resource createTaskAttemptResponse(String clusterName,
+      String workflowId, String jobId, String taskAttemptId) {
+    Resource r = new ResourceImpl(Resource.Type.TaskAttempt);
+    r.setProperty(
+        TaskAttemptResourceProvider.TASK_ATTEMPT_CLUSTER_NAME_PROPERTY_ID,
+        clusterName);
+    r.setProperty(
+        TaskAttemptResourceProvider.TASK_ATTEMPT_WORKFLOW_ID_PROPERTY_ID,
+        workflowId);
+    r.setProperty(TaskAttemptResourceProvider.TASK_ATTEMPT_JOB_ID_PROPERTY_ID,
+        jobId);
+    r.setProperty(TaskAttemptResourceProvider.TASK_ATTEMPT_ID_PROPERTY_ID,
+        taskAttemptId);
+    return r;
+  }
+
+  private static class TestTaskAttemptResourceProvider extends
+      TaskAttemptResourceProvider {
+    protected TestTaskAttemptResourceProvider(Set<String> propertyIds,
+        Map<Type,String> keyPropertyIds) {
+      super(propertyIds, keyPropertyIds, null);
+      this.taskAttemptFetcher = new TestTaskAttemptFetcher();
+    }
+
+    private class TestTaskAttemptFetcher extends PostgresTaskAttemptFetcher {
+      ResultSet rs = null;
+
+      public TestTaskAttemptFetcher() {
+        super((ConnectionFactory) null);
+      }
+
+      @Override
+      protected ResultSet getResultSet(Set<String> requestedIds,
+          String workflowId, String jobId, String taskAttemptId)
+          throws SQLException {
+        rs = createMock(ResultSet.class);
+        expect(rs.next()).andReturn(true).once();
+        expect(rs.getString(getDBField(TASK_ATTEMPT_ID_PROPERTY_ID).toString()))
+            .andReturn("taskattempt1").once();
+        expect(rs.next()).andReturn(false).once();
+        rs.close();
+        expectLastCall().once();
+        replay(rs);
+        return rs;
+      }
+
+      @Override
+      protected void close() {
+        verify(rs);
+      }
+    }
+  }
+}

+ 167 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/WorkflowResourceProviderTest.java

@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.controller.internal.WorkflowResourceProvider.WorkflowFetcher;
+import org.apache.ambari.server.controller.jdbc.ConnectionFactory;
+import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
+import org.apache.ambari.server.controller.spi.NoSuchResourceException;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.Resource.Type;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * WorkflowResourceProvider tests.
+ */
+public class WorkflowResourceProviderTest {
+  @Test
+  public void testGetResources() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<Resource> expected = new HashSet<Resource>();
+    expected.add(createWorkflowResponse("Cluster100", "workflow1"));
+    expected.add(createWorkflowResponse("Cluster100", "workflow2"));
+    expected.add(createWorkflowResponse("Cluster100", "workflow3"));
+
+    Resource.Type type = Resource.Type.Workflow;
+    Set<String> propertyIds = PropertyHelper.getPropertyIds(type);
+
+    WorkflowFetcher workflowFetcher = createMock(WorkflowFetcher.class);
+    expect(workflowFetcher.fetchWorkflows(propertyIds, "Cluster100", null))
+        .andReturn(expected).once();
+    replay(workflowFetcher);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(type);
+    ResourceProvider provider = new WorkflowResourceProvider(propertyIds,
+        keyPropertyIds, workflowFetcher);
+
+    Request request = PropertyHelper.getReadRequest(propertyIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(WorkflowResourceProvider.WORKFLOW_CLUSTER_NAME_PROPERTY_ID)
+        .equals("Cluster100").toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(3, resources.size());
+    Set<String> names = new HashSet<String>();
+    for (Resource resource : resources) {
+      String clusterName = (String) resource
+          .getPropertyValue(WorkflowResourceProvider.WORKFLOW_CLUSTER_NAME_PROPERTY_ID);
+      Assert.assertEquals("Cluster100", clusterName);
+      names.add((String) resource
+          .getPropertyValue(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID));
+    }
+    // Make sure that all of the response objects got moved into resources
+    for (Resource resource : expected) {
+      Assert.assertTrue(names.contains(resource
+          .getPropertyValue(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID)));
+    }
+
+    verify(workflowFetcher);
+  }
+
+  @Test
+  public void testWorkflowFetcher() throws SystemException,
+      UnsupportedPropertyException, NoSuchResourceException,
+      NoSuchParentResourceException {
+    Set<String> requestedIds = new HashSet<String>();
+    requestedIds.add(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID);
+
+    Map<Resource.Type,String> keyPropertyIds = PropertyHelper
+        .getKeyPropertyIds(Resource.Type.Workflow);
+    ResourceProvider provider = new TestWorkflowResourceProvider(requestedIds,
+        keyPropertyIds);
+
+    Request request = PropertyHelper.getReadRequest(requestedIds);
+    Predicate predicate = new PredicateBuilder()
+        .property(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID)
+        .equals("workflow1").toPredicate();
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String workflowId = (String) resource
+          .getPropertyValue(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID);
+      Assert.assertEquals("workflow1", workflowId);
+    }
+  }
+
+  private static Resource createWorkflowResponse(String clusterName,
+      String workflowId) {
+    Resource r = new ResourceImpl(Resource.Type.Workflow);
+    r.setProperty(WorkflowResourceProvider.WORKFLOW_CLUSTER_NAME_PROPERTY_ID,
+        clusterName);
+    r.setProperty(WorkflowResourceProvider.WORKFLOW_ID_PROPERTY_ID, workflowId);
+    return r;
+  }
+
+  private static class TestWorkflowResourceProvider extends
+      WorkflowResourceProvider {
+    protected TestWorkflowResourceProvider(Set<String> propertyIds,
+        Map<Type,String> keyPropertyIds) {
+      super(propertyIds, keyPropertyIds, null);
+      this.workflowFetcher = new TestWorkflowFetcher();
+    }
+
+    private class TestWorkflowFetcher extends PostgresWorkflowFetcher {
+      ResultSet rs = null;
+
+      public TestWorkflowFetcher() {
+        super((ConnectionFactory) null);
+      }
+
+      @Override
+      protected ResultSet getResultSet(Set<String> requestedIds,
+          String workflowId) throws SQLException {
+        rs = createMock(ResultSet.class);
+        expect(rs.next()).andReturn(true).once();
+        expect(rs.getString(getDBField(WORKFLOW_ID_PROPERTY_ID).toString()))
+            .andReturn("workflow1").once();
+        expect(rs.next()).andReturn(false).once();
+        rs.close();
+        expectLastCall().once();
+        replay(rs);
+        return rs;
+      }
+
+      @Override
+      protected void close() {
+        verify(rs);
+      }
+    }
+  }
+}