/*
 * Decompiled with CFR 0.152.
 */
package org.dataone.tidy.merge;

import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.ConnectTimeoutException;
import org.dataone.cn.dao.exceptions.DataAccessException;
import org.dataone.configuration.Settings;
import org.dataone.service.types.v1.Identifier;
import org.dataone.service.util.DateTimeMarshaller;
import org.dataone.tidy.concurrent.TidyFutureTask;
import org.dataone.tidy.concurrent.TidyJob;
import org.dataone.tidy.concurrent.TidyJobExecutorCompletionService;
import org.dataone.tidy.concurrent.TidyJobFactory;
import org.dataone.tidy.concurrent.TidyJobThreadPoolExecutor;
import org.dataone.tidy.dao.MergeResult;
import org.dataone.tidy.dao.MergeResultRepository;
import org.dataone.tidy.dao.MergeStatus;
import org.dataone.tidy.dao.SystemMetadataTidyDao;

public class MergeExecutorService {
    private static Log logger = LogFactory.getLog(MergeExecutorService.class);
    private final int corePoolSize = Settings.getConfiguration().getInt("tidy.merge.thread.pool.core.size", 10);
    private final int maximumPoolSize = Settings.getConfiguration().getInt("tidy.merge.thread.pool.max.size", 10);
    private final long keepAliveSeconds = Settings.getConfiguration().getLong("tidy.merge.thread.pool.keep.alive.seconds", 600L);
    private static final Long RUN_ID = Settings.getConfiguration().getLong("tidy.run.id", 999L);
    private final int maxPidRetry = Settings.getConfiguration().getInt("tidy.merge.executor.job.retry.max", 3);
    private final long maxFutureTimeouts = Settings.getConfiguration().getInt("tidy.merge.executor.future.timeout.count.max", 6);
    private final long futureTimeoutSeconds = Settings.getConfiguration().getLong("tidy.merge.executor.future.timeout.seconds", 10L);
    private final int minWorkQueueSize = Settings.getConfiguration().getInt("tidy.merge.executor.queue.min.size", 100);
    private final int maxWorkQueueSize = Settings.getConfiguration().getInt("tidy.merge.executor.queue.max.size", 1000);
    private CompletionService<Boolean> mergeJobCompletionService;
    private TidyJobThreadPoolExecutor mergeJobExecutor;
    private BlockingQueue workQueue = new LinkedBlockingQueue();
    private BlockingQueue<Future<Boolean>> completionQueue = new LinkedBlockingQueue<Future<Boolean>>();
    private TidyJobFactory tidyJobFactory;
    private SystemMetadataTidyDao systemMetadataDao;
    private MergeResultRepository mergeResultRepository;
    private Integer totalCompletedJobs = 0;
    private Integer totalUnrecoverableExceptions = 0;
    private Integer totalToProcess = 0;
    private ConcurrentSkipListSet<Identifier> failedPidSet = new ConcurrentSkipListSet();
    private Map<Identifier, Integer> retryJobPids = new HashMap<Identifier, Integer>();

    private MergeExecutorService() {
    }

    public MergeExecutorService(SystemMetadataTidyDao systemMetadataDao, MergeResultRepository mergeResultRepository, TidyJobFactory jobFactory) {
        this.systemMetadataDao = systemMetadataDao;
        this.mergeResultRepository = mergeResultRepository;
        this.tidyJobFactory = jobFactory;
        this.mergeJobExecutor = new TidyJobThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, Long.MAX_VALUE, TimeUnit.NANOSECONDS, this.workQueue);
        this.mergeJobCompletionService = new TidyJobExecutorCompletionService<Boolean>(this.mergeJobExecutor, this.completionQueue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void executeJobs() throws DataAccessException {
        try {
            TreeSet<Identifier> retryJobPidSet = new TreeSet<Identifier>();
            List<Identifier> processPidList = this.systemMetadataDao.listIdentifiers();
            TreeSet<Identifier> processPidSet = new TreeSet<Identifier>(processPidList);
            TreeSet<Identifier> priorSuccessPidSet = new TreeSet<Identifier>();
            List<MergeResult> mergeResultsList = this.mergeResultRepository.findByMergeStatusStrAndRunId(MergeStatus.SUCCESS.toString(), RUN_ID);
            for (MergeResult mergeResult : mergeResultsList) {
                Identifier mergedPid = new Identifier();
                mergedPid.setValue(mergeResult.getPid());
                priorSuccessPidSet.add(mergedPid);
            }
            processPidSet.removeAll(priorSuccessPidSet);
            this.totalToProcess = processPidSet.size();
            logger.info((Object)("Total pids to process = " + this.totalToProcess));
            int futureTimeouts = 0;
            do {
                block29: {
                    try {
                        if (!(this.workQueue.size() > this.minWorkQueueSize || processPidSet.isEmpty() && retryJobPidSet.isEmpty())) {
                            TidyJob tidyJob;
                            Identifier pid;
                            int submitCount;
                            Iterator identiferIterator = processPidSet.iterator();
                            Iterator retryIdentifierIterator = retryJobPidSet.iterator();
                            for (submitCount = this.workQueue.size(); retryIdentifierIterator.hasNext() && submitCount < this.maxWorkQueueSize; ++submitCount) {
                                pid = (Identifier)retryIdentifierIterator.next();
                                retryIdentifierIterator.remove();
                                tidyJob = this.tidyJobFactory.getTidyJob(pid);
                                this.mergeJobCompletionService.submit(tidyJob);
                            }
                            while (identiferIterator.hasNext() && submitCount < this.maxWorkQueueSize) {
                                pid = (Identifier)identiferIterator.next();
                                identiferIterator.remove();
                                tidyJob = this.tidyJobFactory.getTidyJob(pid);
                                this.mergeJobCompletionService.submit(tidyJob);
                                ++submitCount;
                            }
                        }
                        Future<Boolean> futureSuccess = this.mergeJobCompletionService.poll(this.futureTimeoutSeconds, TimeUnit.SECONDS);
                        logger.debug((Object)("after submit " + this.completionQueue.size()));
                        if (futureSuccess != null) {
                            futureTimeouts = 0;
                            Boolean reportSuccess = futureSuccess.get();
                            if (futureSuccess instanceof TidyFutureTask) {
                                TidyFutureTask tidyFuture = (TidyFutureTask)futureSuccess;
                                TidyJob tidyJob = tidyFuture.getTidyJob();
                                if (reportSuccess.booleanValue()) {
                                    this.totalCompletedJobs = this.totalCompletedJobs + 1;
                                    logger.info((Object)("Job Success: " + tidyJob.getPid().getValue()));
                                } else if (this.canHazRetry(tidyJob).booleanValue()) {
                                    logger.info((Object)("Job Retry: " + tidyJob.getPid().getValue()));
                                    retryJobPidSet.add(tidyJob.getPid());
                                } else {
                                    this.failedPidSet.add(tidyJob.getPid());
                                    logger.error((Object)("Job Failed: " + tidyJob.getPid().getValue() + " :" + tidyJob.getFailure().getMessage()));
                                    System.err.print("[" + DateTimeMarshaller.serializeDateToUTC((Date)new Date()) + "] Failed executeJobs: " + tidyJob.getPid().getValue() + " ");
                                    tidyJob.getFailure().printStackTrace();
                                }
                            } else if (reportSuccess.booleanValue()) {
                                logger.info((Object)"Job Success but No Tidy Job returned");
                            } else {
                                logger.error((Object)"Job Failed and No Tidy Job returned");
                            }
                        } else {
                            ++futureTimeouts;
                        }
                        if ((long)futureTimeouts <= this.maxFutureTimeouts) break block29;
                        logger.fatal((Object)("waited over " + this.maxFutureTimeouts * this.futureTimeoutSeconds + " seconds for any mergeJob to return, must be in an error state!"));
                        break;
                    }
                    catch (InterruptedException e) {
                        logger.info((Object)"polling interrupted");
                    }
                    catch (ExecutionException ex) {
                        futureTimeouts = 0;
                        logger.error((Object)ex);
                        if (this.hasTransientThrowable(ex).booleanValue()) {
                            TidyJob retryJob = null;
                            while ((retryJob = this.mergeJobExecutor.getFailure()) != null) {
                                logger.info((Object)("ExecutionException retryJob has " + retryJob.getPid().getValue()));
                                if (!this.canHazRetry(retryJob).booleanValue()) {
                                    this.failedPidSet.add(retryJob.getPid());
                                    System.err.print("[" + DateTimeMarshaller.serializeDateToUTC((Date)new Date()) + "]  no retry executeJobs: " + ex.getClass().getCanonicalName() + " ");
                                    ex.printStackTrace();
                                    continue;
                                }
                                retryJobPidSet.add(retryJob.getPid());
                            }
                        }
                        this.totalUnrecoverableExceptions = this.totalUnrecoverableExceptions + 1;
                        System.err.print("[" + DateTimeMarshaller.serializeDateToUTC((Date)new Date()) + "] unrecoverable executeJob: " + ex.getClass().getCanonicalName() + " ");
                        ex.printStackTrace();
                    }
                }
                if (this.mergeJobExecutor.getActiveCount() > this.maximumPoolSize) {
                    logger.error((Object)("Job Executor has more jobs that can be handled! " + this.mergeJobExecutor.getActiveCount()));
                }
                logger.debug((Object)("TotalToProcess = " + Integer.toString(this.totalToProcess) + " totalCompleted " + Integer.toString(this.totalCompletedJobs) + " totalFailed " + Integer.toString(this.failedPidSet.size()) + " total Unrecoverable " + Integer.toString(this.totalUnrecoverableExceptions)));
            } while (this.mergeJobExecutor.getActiveCount() != 0 || this.totalToProcess > this.totalCompletedJobs + this.totalUnrecoverableExceptions + this.failedPidSet.size());
        }
        catch (DataAccessException dae) {
            dae.printStackTrace();
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
        finally {
            logger.info((Object)("Number of Jobs Successfuly Completed " + this.totalCompletedJobs));
            logger.info((Object)("Number of Jobs on Failed " + this.failedPidSet.size()));
            logger.info((Object)("Number of Jobs with Unrecoverable Exceptions " + this.totalUnrecoverableExceptions));
            this.mergeJobExecutor.shutdownNow();
        }
    }

    public Integer getTotalCompletedJobs() {
        return this.totalCompletedJobs;
    }

    public Integer getTotalFailedJobs() {
        return this.totalUnrecoverableExceptions + this.failedPidSet.size();
    }

    public Integer getTotalToProcess() {
        return this.totalToProcess;
    }

    private Boolean canHazRetry(TidyJob retryJob) {
        Boolean retry = false;
        if (!this.failedPidSet.contains(retryJob.getPid()) && this.hasTransientThrowable(retryJob.getFailure()).booleanValue()) {
            if (!this.retryJobPids.containsKey(retryJob.getPid())) {
                this.retryJobPids.put(retryJob.getPid(), 0);
                retry = true;
                logger.info((Object)("retry " + retryJob.getPid().getValue() + " because of SocketTimeoutException"));
            } else {
                Integer failedCount = this.retryJobPids.get(retryJob.getPid());
                failedCount = failedCount + 1;
                logger.info((Object)("retry " + retryJob.getPid().getValue() + " with count " + failedCount));
                if (failedCount >= this.maxPidRetry) {
                    logger.warn((Object)("fail count exceeded removing job from retry, adding to failedPidSet: " + retryJob.getPid().getValue()));
                    this.retryJobPids.remove(retryJob.getPid());
                } else {
                    this.retryJobPids.put(retryJob.getPid(), failedCount);
                    retry = true;
                }
            }
        }
        return retry;
    }

    private Boolean hasTransientThrowable(Throwable t) {
        Boolean isTransient = false;
        HashSet<Throwable> causeList = new HashSet<Throwable>();
        if (t != null) {
            logger.debug((Object)t.getMessage());
        }
        Boolean addedCause = causeList.add(t);
        do {
            if (t instanceof SocketTimeoutException || t instanceof ConnectTimeoutException || t instanceof org.apache.commons.httpclient.ConnectTimeoutException || t instanceof SSLPeerUnverifiedException) {
                isTransient = true;
            }
            if ((t = t.getCause()) != null) {
                logger.debug((Object)t.getMessage());
            }
            addedCause = causeList.add(t);
        } while (t != null && !isTransient.booleanValue() && addedCause.booleanValue());
        return isTransient;
    }
}

