/** * This work was created by participants in the DataONE project, and is * jointly copyrighted by participating institutions in DataONE. For * more information on DataONE, see our web site at http://dataone.org. * * Copyright ${year} * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * $Id$ */ package org.dataone.cn.batch.logging.listener; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; import org.apache.commons.codec.net.URLCodec; import org.apache.log4j.Logger; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.util.ClientUtils; import org.dataone.cn.batch.logging.type.LogEntrySolrItem; import org.dataone.cn.hazelcast.HazelcastClientFactory; import org.dataone.configuration.Settings; import org.dataone.service.types.v1.Identifier; import org.dataone.service.types.v2.SystemMetadata; import org.dataone.solr.client.solrj.impl.CommonsHttpClientProtocolRegistry; import com.hazelcast.client.HazelcastClient; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryListener; import com.hazelcast.core.IMap; import org.dataone.cn.batch.logging.SolrClientManager; /** * Access to Objects may change * Listen to the systemMetadata maps and if the accessibility of the object * has changed, then all log records associated with the object * must also change * * All log records will need to be periodically swept for inconsistency * should this listener go down * * @author waltz */ public class SystemMetadataEntryListener implements EntryListener { private static Logger logger = Logger.getLogger(SystemMetadataEntryListener.class.getName()); private static HazelcastClient hzclient; private static final String HZ_SYSTEM_METADATA = Settings.getConfiguration().getString( "dataone.hazelcast.systemMetadata"); private static final String HZ_LOGENTRY_TOPICNAME = Settings.getConfiguration().getString( "dataone.hazelcast.logEntryTopic"); private IMap systemMetadata; private BlockingQueue> indexLogEntryQueue; private HttpSolrClient localhostSolrServer; //private LogAccessRestriction logAccessRestriction; private URLCodec urlCodec = new URLCodec("UTF-8"); SolrClientManager solrClientManager = null; public SystemMetadataEntryListener() { //String cnURL = Settings.getConfiguration().getString("D1Client.CN_URL"); //String localhostCNURL = cnURL.substring(0, cnURL.lastIndexOf("/cn")); //localhostCNURL += Settings.getConfiguration().getString("LogAggregator.solrUrlPath"); String localhostCNURL = Settings.getConfiguration().getString("LogAggregator.solrUrl"); localhostSolrServer = new HttpSolrClient(localhostCNURL); try { CommonsHttpClientProtocolRegistry.createInstance(); } catch (Exception ex) { ex.printStackTrace(); logger.error(ex.getMessage()); throw new RuntimeException(); } solrClientManager = SolrClientManager.getInstance(); } public void start() { logger.info("starting systemMetadata entry listener..."); logger.info("System Metadata value: " + HZ_SYSTEM_METADATA); hzclient = HazelcastClientFactory.getStorageClient(); this.systemMetadata = hzclient.getMap(HZ_SYSTEM_METADATA); this.systemMetadata.addEntryListener(this, true); logger.info("System Metadata size: " + this.systemMetadata.size()); } public void stop() { logger.info("stopping index task generator entry listener..."); this.systemMetadata.removeEntryListener(this); } @Override public void entryUpdated(EntryEvent event) { boolean activateJob = Boolean.parseBoolean(Settings.getConfiguration().getString( "LogAggregator.active")); if (event.getKey() != null && event.getValue() != null && activateJob) { SystemMetadata systemMetadata = event.getValue(); logger.trace("UPDATE EVENT - index task generator - system metadata callback invoked on pid: " + event.getKey().getValue()); List publishLogEntryList = retrieveLogEntries(event.getKey() .getValue()); if (!publishLogEntryList.isEmpty()) { processLogEntries(publishLogEntryList, systemMetadata); } } } @Override public void entryAdded(EntryEvent event) { boolean activateJob = Boolean.parseBoolean(Settings.getConfiguration().getString( "LogAggregator.active")); if (event.getKey() != null && event.getValue() != null & activateJob) { SystemMetadata systemMetadata = event.getValue(); if (systemMetadata.getSerialVersion().longValue() == 1) { logger.trace("ADD EVENT - index task generator - system metadata callback invoked on pid: " + event.getKey().getValue()); List publishLogEntryList = retrieveLogEntries(event.getKey() .getValue()); if (!publishLogEntryList.isEmpty()) { processLogEntries(publishLogEntryList, systemMetadata); } } } } private List retrieveLogEntries(String pid) { List completeLogEntrySolrItemList = new ArrayList(); String escapedPID = ClientUtils.escapeQueryChars(pid); SolrQuery queryParams = new SolrQuery(); queryParams.setQuery("pid:" + escapedPID); queryParams.setStart(0); queryParams.setRows(1000); QueryResponse queryResponse; try { queryResponse = localhostSolrServer.query(queryParams); List logEntrySolrItemList = queryResponse .getBeans(LogEntrySolrItem.class); completeLogEntrySolrItemList.addAll(logEntrySolrItemList); int currentTotal = logEntrySolrItemList.size(); long totalResults = queryResponse.getResults().getNumFound(); if (currentTotal < totalResults) { do { queryParams.setStart(currentTotal); queryResponse = localhostSolrServer.query(queryParams); logEntrySolrItemList = queryResponse.getBeans(LogEntrySolrItem.class); completeLogEntrySolrItemList.addAll(logEntrySolrItemList); currentTotal += logEntrySolrItemList.size(); } while (currentTotal < totalResults); } } catch (IOException | SolrServerException ex) { logger.error(ex.getMessage(), ex); } // logger.debug("returning # log entries to modify: " + completeLogEntrySolrItemList.size()); return completeLogEntrySolrItemList; } private void processLogEntries(List logEntrySolrItemList, SystemMetadata systemMetadata) { String dbFilename = Settings.getConfiguration().getString("LogAggregator.geoIPdbName"); Date now = new Date(); //List subjectsAllowedRead = logAccessRestriction.subjectsAllowedRead(systemMetadata); for (LogEntrySolrItem solrItem : logEntrySolrItemList) { /* * Fill in the solrItem fields for fields that are either obtained * from systemMetadata. */ solrItem.updateSysmetaFields(systemMetadata); solrItem.setDateUpdated(now); } // publish 100 at a time, do not overwhelm the // network with massive packets, or too many small packets int startIndex = 0; int endIndex = 0; do { endIndex += 100; if (logEntrySolrItemList.size() < endIndex) { endIndex = logEntrySolrItemList.size(); } List publishEntrySolrItemList = new ArrayList(100); publishEntrySolrItemList.addAll(logEntrySolrItemList.subList(startIndex, endIndex)); try { localhostSolrServer.addBeans(publishEntrySolrItemList); localhostSolrServer.commit(); } catch (IOException | SolrServerException ex) { logger.error("SystemMetadataEntryListener- " + ex.getMessage(), ex); } startIndex = endIndex; } while (endIndex < logEntrySolrItemList.size()); } @Override public void entryEvicted(EntryEvent arg0) { } @Override public void entryRemoved(EntryEvent arg0) { } }