/*
 * Decompiled with CFR 0.152.
 */
package org.dataone.cn.batch.synchronization.tasks;

import com.hazelcast.client.HazelcastClient;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.log4j.Logger;
import org.dataone.cn.ComponentActivationUtility;
import org.dataone.cn.batch.exceptions.ExecutionDisabledException;
import org.dataone.cn.batch.exceptions.NodeCommUnavailable;
import org.dataone.cn.batch.synchronization.NodeCommFactory;
import org.dataone.cn.batch.synchronization.NodeCommSyncObjectFactory;
import org.dataone.cn.batch.synchronization.tasks.SyncFailedTask;
import org.dataone.cn.batch.synchronization.tasks.V2TransferObjectTask;
import org.dataone.cn.batch.synchronization.type.NodeComm;
import org.dataone.cn.batch.synchronization.type.NodeCommState;
import org.dataone.cn.batch.synchronization.type.SyncObjectState;
import org.dataone.cn.batch.synchronization.type.SyncQueueFacade;
import org.dataone.cn.hazelcast.HazelcastClientFactory;
import org.dataone.cn.synchronization.types.SyncObject;
import org.dataone.configuration.Settings;
import org.dataone.service.exceptions.ServiceFailure;
import org.dataone.service.types.v1.NodeReference;
import org.dataone.service.util.DateTimeMarshaller;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class SyncObjectTask
implements Callable<String> {
    static final Logger logger = Logger.getLogger(SyncObjectTask.class);
    private ThreadPoolTaskExecutor taskExecutor;
    private Integer maxNumberOfClientsPerMemberNode;
    private NodeCommFactory nodeCommunicationsFactory;
    private static final String nodecommName = "NODECOMM";
    private static final String taskName = "SYNCOBJECT";
    private static final long threadTimeout = Settings.getConfiguration().getLong("Synchronization.SyncObjectTask.threadTimeout", 900000L);
    private static long FUTURE_REAP_WAIT = Settings.getConfiguration().getLong("Synchronization.SyncObjectTask.reapFutureWait", 250L);
    private static CircularFifoQueue<SyncObjectState> latestResults = new CircularFifoQueue(50);
    private static long delayUntil = -1L;

    @Override
    public String call() throws ExecutionDisabledException {
        HazelcastClient hazelcast = HazelcastClientFactory.getProcessingClient();
        logger.info((Object)"Starting SyncObjectTask");
        SyncQueueFacade hzSyncObjectQueue = new SyncQueueFacade();
        HashMap<FutureTask<SyncObjectState>, HashMap<String, Object>> futuresMap = new HashMap<FutureTask<SyncObjectState>, HashMap<String, Object>>();
        try {
            while (true) {
                boolean success;
                SyncObject task = null;
                if (ComponentActivationUtility.synchronizationIsActive()) {
                    task = hzSyncObjectQueue.poll(60L, TimeUnit.SECONDS);
                } else {
                    if (futuresMap.isEmpty()) {
                        logger.info((Object)"All Tasks are complete. Shutting down\n");
                        throw new ExecutionDisabledException();
                    }
                    this.interruptableSleep(1000L);
                }
                this.reapFutures(futuresMap);
                if (task != null && !(success = this.executeTransferObjectTask(task, futuresMap))) {
                    logger.info((Object)(task.taskLabel() + " - requeueing task."));
                    hzSyncObjectQueue.addWithPriority(task);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("ActiveCount: " + this.taskExecutor.getActiveCount() + "  Pool size: " + this.taskExecutor.getPoolSize() + "  Max Pool Size: " + this.taskExecutor.getMaxPoolSize()));
                }
                if (this.taskExecutor.getActiveCount() < this.taskExecutor.getPoolSize()) continue;
                if (this.taskExecutor.getPoolSize() == this.taskExecutor.getMaxPoolSize() && futuresMap.isEmpty()) {
                    BlockingQueue<Runnable> blockingTaskQueue = this.taskExecutor.getThreadPoolExecutor().getQueue();
                    Runnable[] taskArray = new Runnable[]{};
                    taskArray = blockingTaskQueue.toArray(taskArray);
                    for (int j = 0; j < taskArray.length; ++j) {
                        this.taskExecutor.getThreadPoolExecutor().remove(taskArray[j]);
                    }
                }
                this.taskExecutor.getThreadPoolExecutor().purge();
            }
        }
        catch (InterruptedException ex) {
            logger.error((Object)("Interrupted! by something " + ex.getMessage() + "\n"), (Throwable)ex);
            return "Interrupted";
        }
    }

    private void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void interruptableSleep(Long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException iex) {
            logger.debug((Object)"sleep interrupted");
        }
    }

    private boolean executeTransferObjectTask(SyncObject task, HashMap<FutureTask<SyncObjectState>, HashMap<String, Object>> futuresMap) {
        if (task == null) {
            return true;
        }
        boolean isSuccess = false;
        try {
            logger.info((Object)(task.taskLabel() + " received"));
            NodeReference nodeReference = new NodeReference();
            nodeReference.setValue(task.getNodeId());
            NodeComm nodeCommunications = NodeCommSyncObjectFactory.getInstance().getNodeComm(nodeReference);
            try {
                V2TransferObjectTask transferObject = new V2TransferObjectTask(nodeCommunications, task);
                FutureTask<SyncObjectState> futureTask = new FutureTask<SyncObjectState>(transferObject);
                this.taskExecutor.execute(futureTask);
                HashMap<String, NodeComm> futureHash = new HashMap<String, NodeComm>();
                futureHash.put(nodecommName, nodeCommunications);
                futureHash.put(taskName, (NodeComm)task);
                futuresMap.put(futureTask, futureHash);
                logger.info((Object)(task.taskLabel() + " submitted for execution"));
                isSuccess = true;
            }
            catch (TaskRejectedException ex) {
                logger.error((Object)(task.taskLabel() + " Executor rejected the task"));
                logger.error((Object)("ActiveCount: " + this.taskExecutor.getActiveCount() + " Pool size " + this.taskExecutor.getPoolSize() + " Max Pool Size " + this.taskExecutor.getMaxPoolSize()));
                nodeCommunications.setState(NodeCommState.AVAILABLE);
            }
        }
        catch (NodeCommUnavailable | ServiceFailure ex) {
            logger.warn((Object)"No MN communication threads available at this time");
        }
        return isSuccess;
    }

    private void reapFutures(HashMap<FutureTask<SyncObjectState>, HashMap<String, Object>> futuresMap) throws InterruptedException {
        if (futuresMap.size() > 0) {
            logger.info((Object)("waiting on " + futuresMap.size() + " futures"));
        } else {
            logger.debug((Object)"Polling empty hzSyncObjectQueue");
        }
        if (!futuresMap.isEmpty()) {
            ArrayList<FutureTask<SyncObjectState>> removalList = new ArrayList<FutureTask<SyncObjectState>>();
            for (FutureTask<SyncObjectState> futureTask : futuresMap.keySet()) {
                HashMap<String, Object> futureHash = futuresMap.get(futureTask);
                SyncObject futureTask2 = (SyncObject)futureHash.get(taskName);
                NodeComm futureNodeComm = (NodeComm)futureHash.get(nodecommName);
                logger.debug((Object)("trying future " + futureTask2.taskLabel()));
                try {
                    SyncObjectState futureOutcome = futureTask.get(FUTURE_REAP_WAIT, TimeUnit.MILLISECONDS);
                    logger.info((Object)(futureTask2.taskLabel() + " SyncObjectState: " + (Object)((Object)futureOutcome)));
                    latestResults.add((Object)futureOutcome);
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)("futureMap is done? " + futureTask.isDone()));
                        logger.debug((Object)(futureTask2.taskLabel() + " Returned from the Future :(" + futureNodeComm.getNumber() + "):"));
                    }
                    futureNodeComm.setState(NodeCommState.AVAILABLE);
                    removalList.add(futureTask);
                }
                catch (CancellationException ex) {
                    logger.debug((Object)(futureTask2.taskLabel() + " The Future has been canceled :(" + futureNodeComm.getNumber() + "):"));
                    futureNodeComm.setState(NodeCommState.AVAILABLE);
                    removalList.add(futureTask);
                }
                catch (ExecutionException ex) {
                    logger.error((Object)(futureTask2.taskLabel() + "An Exception is reported FROM the Future :(" + futureNodeComm.getNumber() + "):"));
                    logger.error((Object)(futureTask2.taskLabel() + ex.getMessage()), (Throwable)ex);
                    futureNodeComm.setState(NodeCommState.AVAILABLE);
                    removalList.add(futureTask);
                }
                catch (TimeoutException ex) {
                    logger.debug((Object)(futureTask2.taskLabel() + " Waiting for the future :(" + futureNodeComm.getNumber() + "): since " + DateTimeMarshaller.serializeDateToUTC((Date)futureNodeComm.getRunningStartDate())));
                    Date now = new Date();
                    if (now.getTime() - futureNodeComm.getRunningStartDate().getTime() <= threadTimeout) continue;
                    logger.warn((Object)(futureTask2.taskLabel() + " Cancelling. :(" + futureNodeComm.getNumber() + "): Waiting since " + DateTimeMarshaller.serializeDateToUTC((Date)futureNodeComm.getRunningStartDate())));
                    if (futureTask.cancel(true)) {
                        NodeReference nodeReference = new NodeReference();
                        nodeReference.setValue(futureTask2.getNodeId());
                        futureNodeComm.setState(NodeCommState.AVAILABLE);
                        this.submitSynchronizationFailed(futureTask2, nodeReference);
                    } else {
                        logger.warn((Object)(futureTask2.taskLabel() + " Unable to cancel the task"));
                    }
                    this.taskExecutor.getThreadPoolExecutor().remove(futureTask);
                }
            }
            if (!removalList.isEmpty()) {
                for (Future future : removalList) {
                    futuresMap.remove(future);
                }
            }
        }
    }

    private void submitSynchronizationFailed(SyncObject task, NodeReference mnNodeId) {
        try {
            logger.info((Object)(task.taskLabel() + " Submit SyncFailed"));
            NodeComm nodeCommunications = this.nodeCommunicationsFactory.getNodeComm(mnNodeId);
            SyncFailedTask syncFailedTask = new SyncFailedTask(nodeCommunications, task);
            FutureTask<String> futureTask = new FutureTask<String>(syncFailedTask);
            this.taskExecutor.execute(futureTask);
        }
        catch (TaskRejectedException ex) {
            logger.error((Object)(task.taskLabel() + " Submit SyncFailed Rejected from MN"));
            logger.error((Object)("ActiveCount: " + this.taskExecutor.getActiveCount() + " Pool size " + this.taskExecutor.getPoolSize() + " Max Pool Size " + this.taskExecutor.getMaxPoolSize()));
        }
        catch (ServiceFailure ex) {
            ex.printStackTrace();
            logger.error((Object)ex.getDescription());
        }
        catch (NodeCommUnavailable ex) {
            ex.printStackTrace();
            logger.error((Object)ex.getMessage());
        }
    }

    public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
        return this.taskExecutor;
    }

    public void setThreadPoolTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public Integer getMaxNumberOfClientsPerMemberNode() {
        return this.maxNumberOfClientsPerMemberNode;
    }

    public void setMaxNumberOfClientsPerMemberNode(Integer maxNumberOfClientsPerMemberNode) {
        this.maxNumberOfClientsPerMemberNode = maxNumberOfClientsPerMemberNode;
    }

    public NodeCommFactory getNodeCommunicationsFactory() {
        return this.nodeCommunicationsFactory;
    }

    public void setNodeCommunicationsFactory(NodeCommFactory nodeCommunicationsFactory) {
        this.nodeCommunicationsFactory = nodeCommunicationsFactory;
    }
}

