/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeer;

public class QuorumCnxManager {
    private static final Logger LOG = Logger.getLogger(QuorumCnxManager.class);
    static final int CAPACITY = 100;
    static final int PACKETMAXSIZE = 0x100000;
    static final int MAX_CONNECTION_ATTEMPTS = 2;
    private long observerCounter = -1L;
    final QuorumPeer self;
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final ArrayBlockingQueue<Message> recvQueue = new ArrayBlockingQueue(100);
    boolean shutdown = false;
    public final Listener listener;

    public QuorumCnxManager(QuorumPeer self) {
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        this.self = self;
        this.listener = new Listener();
    }

    public void testInitiateConnection(long sid) throws Exception {
        LOG.debug((Object)("Opening channel to server " + sid));
        SocketChannel channel = SocketChannel.open(this.self.getVotingView().get((Object)Long.valueOf((long)sid)).electionAddr);
        channel.socket().setTcpNoDelay(true);
        this.initiateConnection(channel, sid);
    }

    public boolean initiateConnection(SocketChannel s, Long sid) {
        try {
            byte[] msgBytes = new byte[8];
            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
            msgBuffer.putLong(this.self.getId());
            msgBuffer.position(0);
            s.write(msgBuffer);
        }
        catch (IOException e) {
            LOG.warn((Object)"Exception reading or writing challenge: ", (Throwable)e);
            return false;
        }
        if (sid > this.self.getId()) {
            try {
                LOG.info((Object)("Have smaller server identifier, so dropping the connection: (" + sid + ", " + this.self.getId() + ")"));
                s.socket().close();
            }
            catch (IOException e) {
                LOG.warn((Object)"Ignoring exception when closing socket or trying to reopen connection: ", (Throwable)e);
            }
        } else {
            SendWorker sw = new SendWorker(s, sid);
            RecvWorker rw = new RecvWorker(s, sid);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            this.senderWorkerMap.put(sid, sw);
            if (vsw != null) {
                vsw.finish();
            }
            if (!this.queueSendMap.containsKey(sid)) {
                this.queueSendMap.put(sid, new ArrayBlockingQueue(100));
            }
            sw.start();
            rw.start();
            return true;
        }
        return false;
    }

    boolean receiveConnection(SocketChannel s) {
        SendWorker sw;
        Long sid = null;
        try {
            byte[] msgBytes = new byte[8];
            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
            s.read(msgBuffer);
            msgBuffer.position(0);
            sid = msgBuffer.getLong();
            if (sid == Long.MAX_VALUE) {
                sid = this.observerCounter--;
                LOG.info((Object)("Setting arbitrary identifier to observer: " + sid));
            }
        }
        catch (IOException e) {
            LOG.warn((Object)("Exception reading or writing challenge: " + e.toString()));
            return false;
        }
        if (sid < this.self.getId()) {
            try {
                sw = this.senderWorkerMap.get(sid);
                if (sw != null) {
                    sw.finish();
                }
                LOG.debug((Object)("Create new connection to server: " + sid));
                s.socket().close();
                this.connectOne(sid);
            }
            catch (IOException e) {
                LOG.info((Object)("Error when closing socket or trying to reopen connection: " + e.toString()));
            }
        } else {
            sw = new SendWorker(s, sid);
            RecvWorker rw = new RecvWorker(s, sid);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            this.senderWorkerMap.put(sid, sw);
            if (vsw != null) {
                vsw.finish();
            }
            if (!this.queueSendMap.containsKey(sid)) {
                this.queueSendMap.put(sid, new ArrayBlockingQueue(100));
            }
            sw.start();
            rw.start();
            return true;
        }
        return false;
    }

    public void toSend(Long sid, ByteBuffer b) {
        if (this.self.getId() == sid.longValue()) {
            try {
                b.position(0);
                this.recvQueue.put(new Message(b.duplicate(), sid));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Exception when loopbacking", (Throwable)e);
            }
        } else {
            try {
                if (!this.queueSendMap.containsKey(sid)) {
                    ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(100);
                    this.queueSendMap.put(sid, bq);
                    bq.put(b);
                } else {
                    ArrayBlockingQueue<ByteBuffer> bq = this.queueSendMap.get(sid);
                    if (bq != null) {
                        if (bq.remainingCapacity() == 0) {
                            bq.take();
                        }
                        bq.put(b);
                    } else {
                        LOG.error((Object)("No queue for server " + sid));
                    }
                }
                this.connectOne(sid);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting to put message in queue.", (Throwable)e);
            }
        }
    }

    synchronized void connectOne(long sid) {
        if (this.senderWorkerMap.get(sid) == null) {
            if (!this.self.quorumPeers.containsKey(sid)) {
                LOG.warn((Object)("Invalid server id: " + sid));
                return;
            }
            InetSocketAddress electionAddr = this.self.quorumPeers.get((Object)Long.valueOf((long)sid)).electionAddr;
            try {
                LOG.debug((Object)("Opening channel to server " + sid));
                SocketChannel channel = SocketChannel.open(this.self.getView().get((Object)Long.valueOf((long)sid)).electionAddr);
                channel.socket().setTcpNoDelay(true);
                this.initiateConnection(channel, sid);
            }
            catch (UnresolvedAddressException e) {
                LOG.warn((Object)("Cannot open channel to " + sid + " at election address " + electionAddr), (Throwable)e);
                throw e;
            }
            catch (IOException e) {
                LOG.warn((Object)("Cannot open channel to " + sid + " at election address " + electionAddr), (Throwable)e);
            }
        } else {
            LOG.debug((Object)("There is a connection already for server " + sid));
        }
    }

    public void connectAll() {
        Enumeration<Long> en = this.queueSendMap.keys();
        while (en.hasMoreElements()) {
            long sid = en.nextElement();
            this.connectOne(sid);
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            LOG.debug((Object)("Queue size: " + queue.size()));
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug((Object)"Halting listener");
        this.listener.halt();
        this.softHalt();
    }

    public void softHalt() {
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.debug((Object)("Halting sender: " + sw));
            sw.finish();
        }
    }

    class RecvWorker
    extends Thread {
        Long sid;
        SocketChannel channel;
        volatile boolean running = true;

        RecvWorker(SocketChannel channel, Long sid) {
            this.sid = sid;
            this.channel = channel;
        }

        synchronized boolean finish() {
            this.running = false;
            this.interrupt();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                byte[] size = new byte[4];
                ByteBuffer msgLength = ByteBuffer.wrap(size);
                while (this.running && !QuorumCnxManager.this.shutdown && this.channel != null) {
                    while (msgLength.hasRemaining()) {
                        if (this.channel.read(msgLength) >= 0) continue;
                        throw new IOException("Channel eof");
                    }
                    msgLength.position(0);
                    int length = msgLength.getInt();
                    if (length <= 0) continue;
                    if (length > 0x100000) {
                        throw new IOException("Invalid packet of length " + length);
                    }
                    byte[] msgArray = new byte[length];
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    int numbytes = 0;
                    while (message.hasRemaining()) {
                        numbytes += this.channel.read(message);
                    }
                    message.position(0);
                    ArrayBlockingQueue<Message> arrayBlockingQueue = QuorumCnxManager.this.recvQueue;
                    synchronized (arrayBlockingQueue) {
                        QuorumCnxManager.this.recvQueue.put(new Message(message.duplicate(), this.sid));
                    }
                    msgLength.position(0);
                }
            }
            catch (Exception e) {
                LOG.warn((Object)"Connection broken: ", (Throwable)e);
            }
            finally {
                try {
                    this.channel.socket().close();
                }
                catch (IOException e) {
                    LOG.warn((Object)"Exception while trying to close channel");
                }
            }
        }
    }

    class SendWorker
    extends Thread {
        Long sid;
        SocketChannel channel;
        RecvWorker recvWorker;
        volatile boolean running = true;

        SendWorker(SocketChannel channel, Long sid) {
            this.sid = sid;
            this.channel = channel;
            this.recvWorker = null;
            LOG.debug((Object)("Address of remote peer: " + this.sid));
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            this.running = false;
            LOG.debug((Object)"Calling finish");
            this.interrupt();
            try {
                this.channel.close();
            }
            catch (IOException e) {
                LOG.warn((Object)"Exception while closing socket");
            }
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid);
            return this.running;
        }

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity() + 4];
            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
            msgBuffer.putInt(b.capacity());
            msgBuffer.put(b.array(), 0, b.capacity());
            msgBuffer.position(0);
            if (this.channel == null) {
                throw new IOException("SocketChannel is null");
            }
            this.channel.write(msgBuffer);
        }

        public void run() {
            ByteBuffer b;
            try {
                b = QuorumCnxManager.this.lastMessageSent.get(this.sid);
                if (b != null) {
                    this.send(b);
                }
            }
            catch (IOException e) {
                LOG.error((Object)"Failed to send last message. Shutting down thread.", (Throwable)e);
                this.finish();
            }
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.channel != null) {
                        b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (bq == null) {
                                LOG.error((Object)("No queue of incoming messages for server " + this.sid));
                                break block6;
                            }
                            b = bq.poll(1000L, TimeUnit.MILLISECONDS);
                            if (b == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, b);
                            this.send(b);
                            continue block6;
                        }
                        catch (InterruptedException e) {
                            LOG.warn((Object)"Interrupted while waiting for message on queue", (Throwable)e);
                        }
                    }
                    break;
                }
                catch (Exception e) {
                    LOG.warn((Object)("Exception when using channel: " + this.sid), (Throwable)e);
                    break;
                }
            }
            this.finish();
            LOG.warn((Object)"Send worker leaving thread");
        }
    }

    public class Listener
    extends Thread {
        volatile ServerSocketChannel ss = null;

        public void run() {
            int numRetries = 0;
            while (!QuorumCnxManager.this.shutdown && numRetries < 3) {
                try {
                    this.ss = ServerSocketChannel.open();
                    int port = QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr.getPort();
                    LOG.info((Object)("My election bind port: " + port));
                    this.ss.socket().setReuseAddress(true);
                    this.ss.socket().bind(new InetSocketAddress(port));
                    while (!QuorumCnxManager.this.shutdown) {
                        SocketChannel client = this.ss.accept();
                        Socket sock = client.socket();
                        sock.setTcpNoDelay(true);
                        LOG.debug((Object)("Connection request " + sock.getRemoteSocketAddress()));
                        LOG.debug((Object)("Connection request: " + QuorumCnxManager.this.self.getId()));
                        QuorumCnxManager.this.receiveConnection(client);
                        numRetries = 0;
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)"Exception while listening", (Throwable)e);
                    ++numRetries;
                }
            }
            LOG.info((Object)"Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.fatal((Object)("As I'm leaving the listener thread, I won't be able to participate in leader election any longer: " + QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr));
            }
        }

        void halt() {
            try {
                LOG.debug((Object)("Trying to close listener: " + this.ss));
                if (this.ss != null) {
                    LOG.debug((Object)("Closing listener: " + QuorumCnxManager.this.self.getId()));
                    this.ss.close();
                }
            }
            catch (IOException e) {
                LOG.warn((Object)("Exception when shutting down listener: " + e));
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

