|
@@ -20,8 +20,12 @@ package org.apache.ambari.logsearch.dao;
|
|
|
|
|
|
import org.apache.ambari.logsearch.conf.SolrPropsConfig;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.http.HttpResponse;
|
|
|
+import org.apache.http.client.methods.HttpGet;
|
|
|
+import org.apache.http.impl.client.CloseableHttpClient;
|
|
|
import org.apache.solr.client.solrj.SolrServerException;
|
|
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
|
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
|
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|
|
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
|
|
import org.apache.solr.common.SolrException;
|
|
@@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import static org.apache.ambari.logsearch.solr.SolrConstants.CommonLogConstants.ROUTER_FIELD;
|
|
|
|
|
|
import javax.inject.Named;
|
|
|
+import javax.ws.rs.core.Response;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -45,6 +50,8 @@ class SolrCollectionDao {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(SolrCollectionDao.class);
|
|
|
|
|
|
private static final int SETUP_RETRY_SECOND = 30;
|
|
|
+ private static final String MODIFY_COLLECTION_QUERY = "/admin/collections?action=MODIFYCOLLECTION&collection=%s&%s=%d";
|
|
|
+ private static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
|
|
|
|
|
|
/**
|
|
|
* This will try to get the collections from the Solr. Ping doesn't work if
|
|
@@ -189,7 +196,16 @@ class SolrCollectionDao {
|
|
|
}
|
|
|
} else {
|
|
|
LOG.info("Collection " + solrPropsConfig.getCollection() + " is already there. Will check whether it has the required shards");
|
|
|
- Collection<String> existingShards = getShards(solrClient, solrPropsConfig);
|
|
|
+ Collection<Slice> slices = getSlices(solrClient, solrPropsConfig);
|
|
|
+ Collection<String> existingShards = getShards(slices, solrPropsConfig);
|
|
|
+ if (existingShards.size() < shardsList.size()) {
|
|
|
+ try {
|
|
|
+ updateMaximumNumberOfShardsPerCore(slices, solrPropsConfig);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ returnValue = false;
|
|
|
+ LOG.error(String.format("Exception during updating collection (%s)", t));
|
|
|
+ }
|
|
|
+ }
|
|
|
for (String shard : shardsList) {
|
|
|
if (!existingShards.contains(shard)) {
|
|
|
try {
|
|
@@ -216,10 +232,44 @@ class SolrCollectionDao {
|
|
|
return returnValue;
|
|
|
}
|
|
|
|
|
|
- private Collection<String> getShards(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) {
|
|
|
- Collection<String> list = new HashSet<>();
|
|
|
+ private String getRandomBaseUrl(Collection<Slice> slices) {
|
|
|
+ String coreUrl = null;
|
|
|
+ if (slices != null) {
|
|
|
+ for (Slice slice : slices) {
|
|
|
+ if (!slice.getReplicas().isEmpty()) {
|
|
|
+ Replica replica = slice.getReplicas().iterator().next();
|
|
|
+ coreUrl = replica.getStr("base_url");
|
|
|
+ if (coreUrl != null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return coreUrl;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateMaximumNumberOfShardsPerCore(Collection<Slice> slices, SolrPropsConfig solrPropsConfig) throws IOException {
|
|
|
+ String baseUrl = getRandomBaseUrl(slices);
|
|
|
+ if (baseUrl != null) {
|
|
|
+ CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
|
|
|
+ HttpGet request = new HttpGet(baseUrl + String.format(MODIFY_COLLECTION_QUERY,
|
|
|
+ solrPropsConfig.getCollection(), MAX_SHARDS_PER_NODE, calculateMaxShardsPerNode(solrPropsConfig)));
|
|
|
+ HttpResponse response = httpClient.execute(request);
|
|
|
+ if (response.getStatusLine().getStatusCode() != Response.Status.OK.getStatusCode()) {
|
|
|
+ throw new IllegalStateException(String.format("Cannot update collection (%s) - increase max number of nodes per core", solrPropsConfig.getCollection()));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException(String.format("Cannot get any core url for updating collection (%s)", solrPropsConfig.getCollection()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Collection<Slice> getSlices(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) {
|
|
|
ZkStateReader reader = solrClient.getZkStateReader();
|
|
|
- Collection<Slice> slices = reader.getClusterState().getSlices(solrPropsConfig.getCollection());
|
|
|
+ return reader.getClusterState().getSlices(solrPropsConfig.getCollection());
|
|
|
+ }
|
|
|
+
|
|
|
+ private Collection<String> getShards(Collection<Slice> slices, SolrPropsConfig solrPropsConfig) {
|
|
|
+ Collection<String> list = new HashSet<>();
|
|
|
for (Slice slice : slices) {
|
|
|
for (Replica replica : slice.getReplicas()) {
|
|
|
LOG.info("colName=" + solrPropsConfig.getCollection() + ", slice.name=" + slice.getName() + ", slice.state=" + slice.getState() +
|
|
@@ -245,7 +295,7 @@ class SolrCollectionDao {
|
|
|
collectionCreateRequest.setNumShards(solrPropsConfig.getNumberOfShards());
|
|
|
collectionCreateRequest.setReplicationFactor(solrPropsConfig.getReplicationFactor());
|
|
|
collectionCreateRequest.setConfigName(solrPropsConfig.getConfigName());
|
|
|
- collectionCreateRequest.setMaxShardsPerNode(solrPropsConfig.getReplicationFactor() * solrPropsConfig.getNumberOfShards());
|
|
|
+ collectionCreateRequest.setMaxShardsPerNode(calculateMaxShardsPerNode(solrPropsConfig));
|
|
|
CollectionAdminResponse createResponse = collectionCreateRequest.process(solrClient);
|
|
|
if (createResponse.getStatus() != 0) {
|
|
|
LOG.error("Error creating collection. collectionName=" + solrPropsConfig.getCollection() + ", response=" + createResponse);
|
|
@@ -256,4 +306,8 @@ class SolrCollectionDao {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private Integer calculateMaxShardsPerNode(SolrPropsConfig solrPropsConfig) {
|
|
|
+ return solrPropsConfig.getReplicationFactor() * solrPropsConfig.getNumberOfShards();
|
|
|
+ }
|
|
|
}
|