|
@@ -19,123 +19,178 @@
|
|
|
package org.apache.ambari.logsearch.dao;
|
|
|
|
|
|
import org.apache.ambari.logsearch.common.LogSearchConstants;
|
|
|
+import org.apache.ambari.logsearch.common.LogType;
|
|
|
import org.apache.ambari.logsearch.common.MessageEnums;
|
|
|
-import org.apache.ambari.logsearch.conf.SolrPropsConfig;
|
|
|
import org.apache.ambari.logsearch.conf.SolrUserPropsConfig;
|
|
|
+import org.apache.http.HttpResponse;
|
|
|
+import org.apache.http.client.methods.HttpGet;
|
|
|
+import org.apache.http.impl.client.CloseableHttpClient;
|
|
|
import org.apache.solr.client.solrj.SolrRequest;
|
|
|
import org.apache.solr.client.solrj.SolrServerException;
|
|
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
|
-import org.apache.solr.client.solrj.request.LukeRequest;
|
|
|
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
|
|
|
import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
|
|
|
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
|
|
|
import org.apache.solr.client.solrj.response.LukeResponse;
|
|
|
import org.apache.solr.client.solrj.response.LukeResponse.FieldInfo;
|
|
|
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
|
|
|
import org.apache.solr.common.SolrException;
|
|
|
+import org.apache.solr.common.cloud.Replica;
|
|
|
+import org.apache.solr.common.cloud.Slice;
|
|
|
+import org.apache.solr.common.cloud.ZkStateReader;
|
|
|
+import org.apache.solr.common.util.JavaBinCodec;
|
|
|
+import org.apache.solr.common.util.NamedList;
|
|
|
import org.codehaus.jettison.json.JSONObject;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.data.solr.core.SolrTemplate;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
|
|
-import javax.inject.Inject;
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.inject.Inject;
|
|
|
+import javax.inject.Named;
|
|
|
+
|
|
|
public class SolrSchemaFieldDao {
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(SolrSchemaFieldDao.class);
|
|
|
|
|
|
- private static final int SETUP_RETRY_SECOND = 30;
|
|
|
- private static final int SETUP_UPDATE_SECOND = 1 * 60; // 1 min
|
|
|
+ private static final int RETRY_SECOND = 30;
|
|
|
|
|
|
- private boolean populateFieldsThreadActive = false;
|
|
|
-
|
|
|
- private Map<String, String> schemaFieldNameMap = new HashMap<>();
|
|
|
- private Map<String, String> schemaFieldTypeMap = new HashMap<>();
|
|
|
+ @Inject
|
|
|
+ @Named("serviceSolrTemplate")
|
|
|
+ private SolrTemplate serviceSolrTemplate;
|
|
|
|
|
|
@Inject
|
|
|
- private SolrUserPropsConfig solrUserPropsConfig;
|
|
|
+ @Named("auditSolrTemplate")
|
|
|
+ private SolrTemplate auditSolrTemplate;
|
|
|
+
|
|
|
+ @Inject
|
|
|
+ private SolrUserPropsConfig solrUserConfigPropsConfig;
|
|
|
+
|
|
|
+ private CloudSolrClient serviceSolrClient;
|
|
|
+ private CloudSolrClient auditSolrClient;
|
|
|
+
|
|
|
+ private int retryCount;
|
|
|
+ private int skipCount;
|
|
|
|
|
|
- public void populateSchemaFields(final CloudSolrClient solrClient, final SolrPropsConfig solrPropsConfig) {
|
|
|
- if (!populateFieldsThreadActive) {
|
|
|
- populateFieldsThreadActive = true;
|
|
|
- LOG.info("Creating thread to populated fields for collection=" + solrPropsConfig.getCollection());
|
|
|
- Thread fieldPopulationThread = new Thread("populated_fields_" + solrPropsConfig.getCollection()) {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- LOG.info("Started thread to get fields for collection=" + solrPropsConfig.getCollection());
|
|
|
- int retryCount = 0;
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- Thread.sleep(SETUP_RETRY_SECOND * 1000);
|
|
|
- retryCount++;
|
|
|
- boolean _result = _populateSchemaFields(solrClient, solrPropsConfig);
|
|
|
- if (_result) {
|
|
|
- LOG.info("Populate fields for collection " + solrPropsConfig.getCollection() + " is success, Update it after " +
|
|
|
- SETUP_UPDATE_SECOND + " sec");
|
|
|
- Thread.sleep(SETUP_UPDATE_SECOND * 1000);
|
|
|
- }
|
|
|
- } catch (InterruptedException sleepInterrupted) {
|
|
|
- LOG.info("Sleep interrupted while populating fields for collection " + solrPropsConfig.getCollection());
|
|
|
- break;
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Error while populating fields for collection " + solrPropsConfig.getCollection() + ", retryCount=" + retryCount);
|
|
|
- }
|
|
|
- }
|
|
|
- populateFieldsThreadActive = false;
|
|
|
- LOG.info("Exiting thread for populating fields. collection=" + solrPropsConfig.getCollection());
|
|
|
- }
|
|
|
- };
|
|
|
- fieldPopulationThread.setDaemon(true);
|
|
|
- fieldPopulationThread.start();
|
|
|
+ private boolean serviceCollectionSetUp = false;
|
|
|
+ private boolean auditCollectionSetUp = false;
|
|
|
+
|
|
|
+ private Map<String, String> serviceSchemaFieldNameMap = new HashMap<>();
|
|
|
+ private Map<String, String> serviceSchemaFieldTypeMap = new HashMap<>();
|
|
|
+ private Map<String, String> auditSchemaFieldNameMap = new HashMap<>();
|
|
|
+ private Map<String, String> auditSchemaFieldTypeMap = new HashMap<>();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ this.serviceSolrClient = (CloudSolrClient) serviceSolrTemplate.getSolrClient();
|
|
|
+ this.auditSolrClient = (CloudSolrClient) auditSolrTemplate.getSolrClient();
|
|
|
+ }
|
|
|
+
|
|
|
+ void serviceCollectionSetUp() {
|
|
|
+ this.serviceCollectionSetUp = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ void auditCollectionSetUp() {
|
|
|
+ this.auditCollectionSetUp = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Scheduled(fixedDelay = RETRY_SECOND * 1000)
|
|
|
+ public void populateAllSchemaFields() {
|
|
|
+ if (skipCount > 0) {
|
|
|
+ skipCount--;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (serviceCollectionSetUp) {
|
|
|
+ populateSchemaFields(serviceSolrClient, serviceSchemaFieldNameMap, serviceSchemaFieldTypeMap);
|
|
|
+ }
|
|
|
+ if (auditCollectionSetUp) {
|
|
|
+ populateSchemaFields(auditSolrClient, auditSchemaFieldNameMap, auditSchemaFieldTypeMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Called from the thread. Don't call this directly
|
|
|
- */
|
|
|
- private boolean _populateSchemaFields(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) {
|
|
|
- String historyCollection = solrUserPropsConfig.getCollection();
|
|
|
- if (solrClient != null && !solrPropsConfig.getCollection().equals(historyCollection)) {
|
|
|
- LukeResponse lukeResponse = null;
|
|
|
+ private void populateSchemaFields(CloudSolrClient solrClient, Map<String, String> schemaFieldNameMap,
|
|
|
+ Map<String, String> schemaFieldTypeMap) {
|
|
|
+ if (solrClient != null) {
|
|
|
+ LOG.debug("Started thread to get fields for collection=" + solrClient.getDefaultCollection());
|
|
|
+ List<LukeResponse> lukeResponses = null;
|
|
|
SchemaResponse schemaResponse = null;
|
|
|
try {
|
|
|
- LukeRequest lukeRequest = new LukeRequest();
|
|
|
- lukeRequest.setNumTerms(0);
|
|
|
- lukeResponse = lukeRequest.process(solrClient);
|
|
|
-
|
|
|
+ lukeResponses = getLukeResponsesForCores(solrClient);
|
|
|
+
|
|
|
SolrRequest<SchemaResponse> schemaRequest = new SchemaRequest();
|
|
|
schemaRequest.setMethod(SolrRequest.METHOD.GET);
|
|
|
schemaRequest.setPath("/schema");
|
|
|
schemaResponse = schemaRequest.process(solrClient);
|
|
|
|
|
|
- LOG.debug("populateSchemaFields() collection=" + solrPropsConfig.getCollection() + ", luke=" + lukeResponse +
|
|
|
+ LOG.debug("populateSchemaFields() collection=" + solrClient.getDefaultCollection() + ", luke=" + lukeResponses +
|
|
|
", schema= " + schemaResponse);
|
|
|
} catch (SolrException | SolrServerException | IOException e) {
|
|
|
- LOG.error("Error occured while popuplating field. collection=" + solrPropsConfig.getCollection(), e);
|
|
|
+ LOG.error("Error occured while popuplating field. collection=" + solrClient.getDefaultCollection(), e);
|
|
|
}
|
|
|
|
|
|
- if (lukeResponse != null && schemaResponse != null) {
|
|
|
- extractSchemaFieldsName(lukeResponse, schemaResponse);
|
|
|
- return true;
|
|
|
+ if (schemaResponse != null) {
|
|
|
+ extractSchemaFieldsName(lukeResponses, schemaResponse, schemaFieldNameMap, schemaFieldTypeMap);
|
|
|
+ LOG.debug("Populate fields for collection " + solrClient.getDefaultCollection()+ " was successful, next update it after " +
|
|
|
+ solrUserConfigPropsConfig.getPopulateIntervalMins() + " minutes");
|
|
|
+ retryCount = 0;
|
|
|
+ skipCount = (solrUserConfigPropsConfig.getPopulateIntervalMins() * 60) / RETRY_SECOND - 1;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ retryCount++;
|
|
|
+ LOG.error("Error while populating fields for collection " + solrClient.getDefaultCollection() + ", retryCount=" + retryCount);
|
|
|
}
|
|
|
}
|
|
|
- return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final String LUKE_REQUEST_URL_SUFFIX = "admin/luke?numTerms=0&wt=javabin&version=2";
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private List<LukeResponse> getLukeResponsesForCores(CloudSolrClient solrClient) {
|
|
|
+ ZkStateReader zkStateReader = solrClient.getZkStateReader();
|
|
|
+ Collection<Slice> activeSlices = zkStateReader.getClusterState().getActiveSlices(solrClient.getDefaultCollection());
|
|
|
+
|
|
|
+ List<LukeResponse> lukeResponses = new ArrayList<>();
|
|
|
+ for (Slice slice : activeSlices) {
|
|
|
+ for (Replica replica : slice.getReplicas()) {
|
|
|
+ try (CloseableHttpClient httpClient = HttpClientUtil.createClient(null)) {
|
|
|
+ HttpGet request = new HttpGet(replica.getCoreUrl() + LUKE_REQUEST_URL_SUFFIX);
|
|
|
+ HttpResponse response = httpClient.execute(request);
|
|
|
+ NamedList<Object> lukeData = (NamedList<Object>) new JavaBinCodec(null, null).unmarshal(response.getEntity().getContent());
|
|
|
+ LukeResponse lukeResponse = new LukeResponse();
|
|
|
+ lukeResponse.setResponse(lukeData);
|
|
|
+ lukeResponses.add(lukeResponse);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Exception during getting luke responses", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return lukeResponses;
|
|
|
}
|
|
|
|
|
|
- private void extractSchemaFieldsName(LukeResponse lukeResponse, SchemaResponse schemaResponse) {
|
|
|
+ private void extractSchemaFieldsName(List<LukeResponse> lukeResponses, SchemaResponse schemaResponse,
|
|
|
+ Map<String, String> schemaFieldNameMap, Map<String, String> schemaFieldTypeMap) {
|
|
|
try {
|
|
|
HashMap<String, String> _schemaFieldNameMap = new HashMap<>();
|
|
|
HashMap<String, String> _schemaFieldTypeMap = new HashMap<>();
|
|
|
|
|
|
- for (Entry<String, FieldInfo> e : lukeResponse.getFieldInfo().entrySet()) {
|
|
|
- String name = e.getKey();
|
|
|
- String type = e.getValue().getType();
|
|
|
- if (!name.contains("@") && !name.startsWith("_") && !name.contains("_md5") && !name.contains("_ms") &&
|
|
|
- !name.contains(LogSearchConstants.NGRAM_PREFIX) && !name.contains("tags") && !name.contains("_str")) {
|
|
|
- _schemaFieldNameMap.put(name, type);
|
|
|
+ for (LukeResponse lukeResponse : lukeResponses) {
|
|
|
+ for (Entry<String, FieldInfo> e : lukeResponse.getFieldInfo().entrySet()) {
|
|
|
+ String name = e.getKey();
|
|
|
+ String type = e.getValue().getType();
|
|
|
+ if (!name.contains("@") && !name.startsWith("_") && !name.contains("_md5") && !name.contains("_ms") &&
|
|
|
+ !name.contains(LogSearchConstants.NGRAM_PREFIX) && !name.contains("tags") && !name.contains("_str")) {
|
|
|
+ _schemaFieldNameMap.put(name, type);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -162,19 +217,21 @@ public class SolrSchemaFieldDao {
|
|
|
}
|
|
|
|
|
|
synchronized (this) {
|
|
|
- schemaFieldNameMap = _schemaFieldNameMap;
|
|
|
- schemaFieldTypeMap = _schemaFieldTypeMap;
|
|
|
+ schemaFieldNameMap.clear();
|
|
|
+ schemaFieldNameMap.putAll(_schemaFieldNameMap);
|
|
|
+ schemaFieldTypeMap.clear();
|
|
|
+ schemaFieldTypeMap.putAll(_schemaFieldTypeMap);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
LOG.error(e + "Credentials not specified in logsearch.properties " + MessageEnums.ERROR_SYSTEM);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized Map<String, String> getSchemaFieldNameMap() {
|
|
|
- return schemaFieldNameMap;
|
|
|
+ public Map<String, String> getSchemaFieldNameMap(LogType logType) {
|
|
|
+ return LogType.AUDIT == logType ? auditSchemaFieldNameMap : serviceSchemaFieldNameMap;
|
|
|
}
|
|
|
|
|
|
- public synchronized Map<String, String> getSchemaFieldTypeMap() {
|
|
|
- return schemaFieldTypeMap;
|
|
|
+ public Map<String, String> getSchemaFieldTypeMap(LogType logType) {
|
|
|
+ return LogType.AUDIT == logType ? auditSchemaFieldTypeMap : serviceSchemaFieldTypeMap;
|
|
|
}
|
|
|
}
|