/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.compat.log.Logger;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.NoopOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.ops.VBucketAware;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.protocol.binary.TapAckOperationImpl;
import net.spy.memcached.util.StringUtils;

public class MemcachedConnection
extends SpyThread {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 0x1000000;
    private static final String RECON_QUEUE_METRIC = "[MEM] Reconnecting Nodes (ReconnectQueue)";
    private static final String SHUTD_QUEUE_METRIC = "[MEM] Shutting Down Nodes (NodesToShutdown)";
    private static final String OVERALL_REQUEST_METRIC = "[MEM] Request Rate: All";
    private static final String OVERALL_AVG_BYTES_WRITE_METRIC = "[MEM] Average Bytes written to OS per write";
    private static final String OVERALL_AVG_BYTES_READ_METRIC = "[MEM] Average Bytes read from OS per read";
    private static final String OVERALL_AVG_TIME_ON_WIRE_METRIC = "[MEM] Average Time on wire for operations (\u00c2\u00b5s)";
    private static final String OVERALL_RESPONSE_METRIC = "[MEM] Response Rate: All (Failure + Success + Retry)";
    private static final String OVERALL_RESPONSE_RETRY_METRIC = "[MEM] Response Rate: Retry";
    private static final String OVERALL_RESPONSE_FAIL_METRIC = "[MEM] Response Rate: Failure";
    private static final String OVERALL_RESPONSE_SUCC_METRIC = "[MEM] Response Rate: Success";
    protected volatile boolean shutDown = false;
    private final boolean shouldOptimize;
    protected Selector selector = null;
    protected final NodeLocator locator;
    protected final FailureMode failureMode;
    private final long maxDelay;
    private int emptySelects = 0;
    private final int bufSize;
    private final ConnectionFactory connectionFactory;
    protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue;
    protected volatile boolean running = true;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue<ConnectionObserver>();
    private final OperationFactory opFact;
    private final int timeoutExceptionThreshold;
    private final Collection<Operation> retryOps;
    protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;
    private final boolean verifyAliveOnConnect;
    private final ExecutorService listenerExecutorService;
    protected final MetricCollector metrics;
    protected final MetricType metricType;

    public MemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException {
        this.connObservers.addAll(obs);
        this.reconnectQueue = new TreeMap<Long, MemcachedNode>();
        this.addedQueue = new ConcurrentLinkedQueue();
        this.failureMode = fm;
        this.shouldOptimize = f.shouldOptimize();
        this.maxDelay = f.getMaxReconnectDelay();
        this.opFact = opfactory;
        this.timeoutExceptionThreshold = f.getTimeoutExceptionThreshold();
        this.selector = Selector.open();
        this.retryOps = new ArrayList<Operation>();
        this.nodesToShutdown = new ConcurrentLinkedQueue();
        this.listenerExecutorService = f.getListenerExecutorService();
        this.bufSize = bufSize;
        this.connectionFactory = f;
        String verifyAlive = System.getProperty("net.spy.verifyAliveOnConnect");
        this.verifyAliveOnConnect = verifyAlive != null && verifyAlive.equals("true");
        List<MemcachedNode> connections = this.createConnections(a);
        this.locator = f.createLocator(connections);
        this.metrics = f.getMetricCollector();
        this.metricType = f.enableMetrics();
        this.registerMetrics();
        this.setName("Memcached IO over " + this);
        this.setDaemon(f.isDaemon());
        this.start();
    }

    protected void registerMetrics() {
        if (this.metricType.equals((Object)MetricType.DEBUG) || this.metricType.equals((Object)MetricType.PERFORMANCE)) {
            this.metrics.addHistogram(OVERALL_AVG_BYTES_READ_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_BYTES_WRITE_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC);
            this.metrics.addMeter(OVERALL_RESPONSE_METRIC);
            this.metrics.addMeter(OVERALL_REQUEST_METRIC);
            if (this.metricType.equals((Object)MetricType.DEBUG)) {
                this.metrics.addCounter(RECON_QUEUE_METRIC);
                this.metrics.addCounter(SHUTD_QUEUE_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_RETRY_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_SUCC_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_FAIL_METRIC);
            }
        }
    }

    protected List<MemcachedNode> createConnections(Collection<InetSocketAddress> addrs) throws IOException {
        ArrayList<MemcachedNode> connections = new ArrayList<MemcachedNode>(addrs.size());
        for (SocketAddress socketAddress : addrs) {
            SocketChannel ch = SocketChannel.open();
            ch.configureBlocking(false);
            MemcachedNode qa = this.connectionFactory.createMemcachedNode(socketAddress, ch, this.bufSize);
            int ops = 0;
            ch.socket().setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
            try {
                if (ch.connect(socketAddress)) {
                    this.getLogger().info("Connected to %s immediately", qa);
                    this.connected(qa);
                } else {
                    this.getLogger().info("Added %s to connect queue", qa);
                    ops = 8;
                }
                this.selector.wakeup();
                qa.setSk(ch.register(this.selector, ops, qa));
                assert (ch.isConnected() || qa.getSk().interestOps() == 8) : "Not connected, and not wanting to connect";
            }
            catch (SocketException e) {
                this.getLogger().warn((Object)"Socket error on initial connect", e);
                this.queueReconnect(qa);
            }
            connections.add(qa);
        }
        return connections;
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode qa : this.locator.getAll()) {
            int sops;
            if (qa.getSk() == null || !qa.getSk().isValid()) continue;
            if (qa.getChannel().isConnected()) {
                sops = qa.getSk().interestOps();
                int expected = 0;
                if (qa.hasReadOp()) {
                    expected |= 1;
                }
                if (qa.hasWriteOp()) {
                    expected |= 4;
                }
                if (qa.getBytesRemainingToWrite() > 0) {
                    expected |= 4;
                }
                assert (sops == expected) : "Invalid ops:  " + qa + ", expected " + expected + ", got " + sops;
                continue;
            }
            sops = qa.getSk().interestOps();
            assert (sops == 8) : "Not connected, and not watching for connect: " + sops;
        }
        this.getLogger().debug("Checked the selectors.");
        return true;
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            throw new IOException("No IO while shut down");
        }
        this.handleInputQueue();
        this.getLogger().debug("Done dealing with queue.");
        long delay = 0L;
        if (!this.reconnectQueue.isEmpty()) {
            long now = System.currentTimeMillis();
            long then = this.reconnectQueue.firstKey();
            delay = Math.max(then - now, 1L);
        }
        this.getLogger().debug("Selecting with delay of %sms", delay);
        assert (this.selectorsMakeSense()) : "Selectors don't make sense.";
        int selected = this.selector.select(delay);
        Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
        if (selectedKeys.isEmpty() && !this.shutDown) {
            this.handleEmptySelects();
        } else {
            this.getLogger().debug("Selected %d, selected %d keys", selected, selectedKeys.size());
            this.emptySelects = 0;
            for (SelectionKey sk : selectedKeys) {
                this.handleIO(sk);
            }
            selectedKeys.clear();
        }
        this.handleOperationalTasks();
    }

    private void handleOperationalTasks() throws IOException {
        this.checkPotentiallyTimedOutConnection();
        if (!this.shutDown && !this.reconnectQueue.isEmpty()) {
            this.attemptReconnects();
        }
        this.redistributeOperations(this.retryOps);
        this.retryOps.clear();
        this.handleShutdownQueue();
    }

    private void handleEmptySelects() {
        this.getLogger().debug("No selectors ready, interrupted: " + Thread.interrupted());
        if (++this.emptySelects > 256) {
            for (SelectionKey sk : this.selector.keys()) {
                this.getLogger().debug("%s has %s, interested in %s", sk, sk.readyOps(), sk.interestOps());
                if (sk.readyOps() != 0) {
                    this.getLogger().debug("%s has a ready op, handling IO", sk);
                    this.handleIO(sk);
                    continue;
                }
                this.lostConnection((MemcachedNode)sk.attachment());
            }
            assert (this.emptySelects < 0x1000000) : "Too many empty selects";
        }
    }

    private void handleShutdownQueue() throws IOException {
        for (MemcachedNode qa : this.nodesToShutdown) {
            if (this.addedQueue.contains(qa)) continue;
            this.nodesToShutdown.remove(qa);
            this.metrics.decrementCounter(SHUTD_QUEUE_METRIC);
            Collection<Operation> notCompletedOperations = qa.destroyInputQueue();
            if (qa.getChannel() != null) {
                qa.getChannel().close();
                qa.setSk(null);
                if (qa.getBytesRemainingToWrite() > 0) {
                    this.getLogger().warn("Shut down with %d bytes remaining to write", qa.getBytesRemainingToWrite());
                }
                this.getLogger().debug("Shut down channel %s", qa.getChannel());
            }
            this.redistributeOperations(notCompletedOperations);
        }
    }

    private void checkPotentiallyTimedOutConnection() {
        boolean stillCheckingTimeouts = true;
        while (stillCheckingTimeouts) {
            try {
                for (SelectionKey sk : this.selector.keys()) {
                    MemcachedNode mn = (MemcachedNode)sk.attachment();
                    if (mn.getContinuousTimeout() <= this.timeoutExceptionThreshold) continue;
                    this.getLogger().warn("%s exceeded continuous timeout threshold", sk);
                    this.lostConnection(mn);
                }
                stillCheckingTimeouts = false;
            }
            catch (ConcurrentModificationException e) {
                this.getLogger().warn((Object)"Retrying selector keys after ConcurrentModificationException caught", e);
            }
        }
    }

    private void handleInputQueue() {
        if (!this.addedQueue.isEmpty()) {
            MemcachedNode qaNode;
            this.getLogger().debug("Handling queue");
            HashSet<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
            HashSet<MemcachedNode> todo = new HashSet<MemcachedNode>();
            while ((qaNode = this.addedQueue.poll()) != null) {
                todo.add(qaNode);
            }
            for (MemcachedNode node : todo) {
                boolean readyForIO = false;
                if (node.isActive()) {
                    if (node.getCurrentWriteOp() != null) {
                        readyForIO = true;
                        this.getLogger().debug("Handling queued write %s", node);
                    }
                } else {
                    toAdd.add(node);
                }
                node.copyInputQueue();
                if (readyForIO) {
                    try {
                        if (node.getWbuf().hasRemaining()) {
                            this.handleWrites(node);
                        }
                    }
                    catch (IOException e) {
                        this.getLogger().warn((Object)"Exception handling write", e);
                        this.lostConnection(node);
                    }
                }
                node.fixupOps();
            }
            this.addedQueue.addAll(toAdd);
        }
    }

    public boolean addObserver(ConnectionObserver obs) {
        return this.connObservers.add(obs);
    }

    public boolean removeObserver(ConnectionObserver obs) {
        return this.connObservers.remove(obs);
    }

    private void connected(MemcachedNode node) {
        assert (node.getChannel().isConnected()) : "Not connected.";
        int rt = node.getReconnectCount();
        node.connected();
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionEstablished(node.getSocketAddress(), rt);
        }
    }

    private void lostConnection(MemcachedNode node) {
        this.queueReconnect(node);
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionLost(node.getSocketAddress());
        }
    }

    boolean belongsToCluster(MemcachedNode node) {
        for (MemcachedNode n : this.locator.getAll()) {
            if (!n.getSocketAddress().equals(node.getSocketAddress())) continue;
            return true;
        }
        return false;
    }

    private void handleIO(SelectionKey sk) {
        MemcachedNode node = (MemcachedNode)sk.attachment();
        try {
            this.getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment());
            if (sk.isConnectable() && this.belongsToCluster(node)) {
                this.getLogger().info("Connection state changed for %s", sk);
                SocketChannel channel = node.getChannel();
                if (channel.finishConnect()) {
                    this.finishConnect(sk, node);
                } else assert (!channel.isConnected()) : "connected";
            } else {
                this.handleReadsAndWrites(sk, node);
            }
        }
        catch (ClosedChannelException e) {
            if (!this.shutDown) {
                this.getLogger().info("Closed channel and not shutting down. Queueing reconnect on %s", node, e);
                this.lostConnection(node);
            }
        }
        catch (ConnectException e) {
            this.getLogger().info("Reconnecting due to failure to connect to %s", node, e);
            this.queueReconnect(node);
        }
        catch (OperationException e) {
            node.setupForAuth();
            this.getLogger().info("Reconnection due to exception handling a memcached operation on %s. This may be due to an authentication failure.", node, e);
            this.lostConnection(node);
        }
        catch (Exception e) {
            node.setupForAuth();
            this.getLogger().info("Reconnecting due to exception on %s", node, e);
            this.lostConnection(node);
        }
        node.fixupOps();
    }

    private void handleReadsAndWrites(SelectionKey sk, MemcachedNode node) throws IOException {
        if (sk.isValid()) {
            if (sk.isReadable()) {
                this.handleReads(node);
            }
            if (sk.isWritable()) {
                this.handleWrites(node);
            }
        }
    }

    private void finishConnect(SelectionKey sk, MemcachedNode node) throws IOException {
        if (this.verifyAliveOnConnect) {
            final CountDownLatch latch = new CountDownLatch(1);
            final OperationFuture rv = new OperationFuture("noop", latch, 2500L, this.listenerExecutorService);
            NoopOperation testOp = this.opFact.noop(new OperationCallback(){

                @Override
                public void receivedStatus(OperationStatus status) {
                    rv.set(status.isSuccess(), status);
                }

                @Override
                public void complete() {
                    latch.countDown();
                }
            });
            testOp.setHandlingNode(node);
            testOp.initialize();
            this.checkState();
            this.insertOperation(node, testOp);
            node.copyInputQueue();
            boolean done = false;
            if (sk.isValid()) {
                long timeout = TimeUnit.MILLISECONDS.toNanos(this.connectionFactory.getOperationTimeout());
                long stop = System.nanoTime() + timeout;
                while (stop > System.nanoTime()) {
                    this.handleWrites(node);
                    this.handleReads(node);
                    done = latch.getCount() == 0L;
                    if (!done) continue;
                }
            }
            if (!done || testOp.isCancelled() || testOp.hasErrored() || testOp.isTimedOut()) {
                throw new ConnectException("Could not send noop upon connect! This may indicate a running, but not responding memcached instance.");
            }
        }
        this.connected(node);
        this.addedQueue.offer(node);
        if (node.getWbuf().hasRemaining()) {
            this.handleWrites(node);
        }
    }

    private void handleWrites(MemcachedNode node) throws IOException {
        boolean canWriteMore;
        node.fillWriteBuffer(this.shouldOptimize);
        boolean bl = canWriteMore = node.getBytesRemainingToWrite() > 0;
        while (canWriteMore) {
            int wrote = node.writeSome();
            this.metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
            node.fillWriteBuffer(this.shouldOptimize);
            canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReads(MemcachedNode node) throws IOException {
        Operation currentOp = node.getCurrentReadOp();
        if (currentOp instanceof TapAckOperationImpl) {
            node.removeCurrentReadOp();
            return;
        }
        ByteBuffer rbuf = node.getRbuf();
        SocketChannel channel = node.getChannel();
        int read = channel.read(rbuf);
        this.metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
        if (read < 0) {
            currentOp = this.handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);
        }
        while (read > 0) {
            this.getLogger().debug("Read %d bytes", read);
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                if (currentOp == null) {
                    throw new IllegalStateException("No read operation.");
                }
                long timeOnWire = System.nanoTime() - currentOp.getWriteCompleteTimestamp();
                this.metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, (int)(timeOnWire / 1000L));
                this.metrics.markMeter(OVERALL_RESPONSE_METRIC);
                Operation operation = currentOp;
                synchronized (operation) {
                    this.readBufferAndLogMetrics(currentOp, rbuf, node);
                }
                currentOp = node.getCurrentReadOp();
            }
            rbuf.clear();
            read = channel.read(rbuf);
            node.completedRead();
        }
    }

    private void readBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) throws IOException {
        currentOp.readFromBuffer(rbuf);
        if (currentOp.getState() == OperationState.COMPLETE) {
            this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            if (op.hasErrored()) {
                this.metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);
            } else {
                this.metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);
            }
        } else if (currentOp.getState() == OperationState.RETRY) {
            this.handleRetryInformation(currentOp.getErrorMsg());
            this.getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: %s ", currentOp);
            ((VBucketAware)((Object)currentOp)).addNotMyVbucketNode(currentOp.getHandlingNode());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            this.retryOps.add(currentOp);
            this.metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
        }
    }

    private Operation handleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) throws IOException {
        if (currentOp instanceof TapOperation) {
            currentOp.getCallback().complete();
            ((TapOperation)currentOp).streamClosed(OperationState.COMPLETE);
            this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            return node.getCurrentReadOp();
        }
        throw new IOException("Disconnected unexpected, will reconnect.");
    }

    static String dbgBuffer(ByteBuffer b, int size) {
        StringBuilder sb = new StringBuilder();
        byte[] bytes = b.array();
        for (int i = 0; i < size; ++i) {
            char ch = (char)bytes[i];
            if (Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) {
                sb.append(ch);
                continue;
            }
            sb.append("\\x");
            sb.append(Integer.toHexString(bytes[i] & 0xFF));
        }
        return sb.toString();
    }

    protected void handleRetryInformation(byte[] retryMessage) {
        this.getLogger().debug("Got RETRY message: " + new String(retryMessage) + ", but not handled.");
    }

    protected void queueReconnect(MemcachedNode node) {
        if (this.shutDown) {
            return;
        }
        this.getLogger().warn("Closing, and reopening %s, attempt %d.", node, node.getReconnectCount());
        if (node.getSk() != null) {
            node.getSk().cancel();
            assert (!node.getSk().isValid()) : "Cancelled selection key is valid";
        }
        node.reconnecting();
        try {
            if (node.getChannel() != null && node.getChannel().socket() != null) {
                node.getChannel().socket().close();
            } else {
                this.getLogger().info("The channel or socket was null for %s", node);
            }
        }
        catch (IOException e) {
            this.getLogger().warn((Object)"IOException trying to close a socket", e);
        }
        node.setChannel(null);
        long delay = (long)Math.min((double)this.maxDelay, Math.pow(2.0, node.getReconnectCount())) * 1000L;
        long reconnectTime = System.currentTimeMillis() + delay;
        while (this.reconnectQueue.containsKey(reconnectTime)) {
            ++reconnectTime;
        }
        this.reconnectQueue.put(reconnectTime, node);
        this.metrics.incrementCounter(RECON_QUEUE_METRIC);
        node.setupResend();
        if (this.failureMode == FailureMode.Redistribute) {
            this.redistributeOperations(node.destroyInputQueue());
        } else if (this.failureMode == FailureMode.Cancel) {
            this.cancelOperations(node.destroyInputQueue());
        }
    }

    private void cancelOperations(Collection<Operation> ops) {
        for (Operation op : ops) {
            op.cancel();
        }
    }

    private void redistributeOperations(Collection<Operation> ops) {
        for (Operation op : ops) {
            if (op.isCancelled() || op.isTimedOut()) continue;
            if (op instanceof KeyedOperation) {
                KeyedOperation ko = (KeyedOperation)op;
                int added = 0;
                for (String k : ko.getKeys()) {
                    for (Operation newop : this.opFact.clone(ko)) {
                        this.addOperation(k, newop);
                        ++added;
                    }
                }
                assert (added > 0) : "Didn't add any new operations when redistributing";
                continue;
            }
            op.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void attemptReconnects() {
        long now = System.currentTimeMillis();
        IdentityHashMap<MemcachedNode, Boolean> seen = new IdentityHashMap<MemcachedNode, Boolean>();
        ArrayList<MemcachedNode> rereQueue = new ArrayList<MemcachedNode>();
        SocketChannel ch = null;
        Iterator<MemcachedNode> i = this.reconnectQueue.headMap(now).values().iterator();
        while (i.hasNext()) {
            MemcachedNode node = i.next();
            i.remove();
            this.metrics.decrementCounter(RECON_QUEUE_METRIC);
            try {
                if (!this.belongsToCluster(node)) {
                    this.getLogger().debug("Node does not belong to cluster anymore, skipping reconnect: %s", node);
                    continue;
                }
                if (!seen.containsKey(node)) {
                    seen.put(node, Boolean.TRUE);
                    this.getLogger().info("Reconnecting %s", node);
                    ch = SocketChannel.open();
                    ch.configureBlocking(false);
                    int ops = 0;
                    if (ch.connect(node.getSocketAddress())) {
                        this.connected(node);
                        this.addedQueue.offer(node);
                        this.getLogger().info("Immediately reconnected to %s", node);
                        assert (ch.isConnected());
                    } else {
                        ops = 8;
                    }
                    node.registerChannel(ch, ch.register(this.selector, ops, node));
                    assert (node.getChannel() == ch) : "Channel was lost.";
                    continue;
                }
                this.getLogger().debug("Skipping duplicate reconnect request for %s", node);
            }
            catch (SocketException e) {
                this.getLogger().warn((Object)"Error on reconnect", e);
                rereQueue.add(node);
            }
            catch (Exception e) {
                this.getLogger().error("Exception on reconnect, lost node %s", node, e);
            }
            finally {
                this.potentiallyCloseLeakingChannel(ch, node);
            }
        }
        for (MemcachedNode n : rereQueue) {
            this.queueReconnect(n);
        }
    }

    private void potentiallyCloseLeakingChannel(SocketChannel ch, MemcachedNode node) {
        if (ch != null && !ch.isConnected() && !ch.isConnectionPending()) {
            try {
                ch.close();
            }
            catch (IOException e) {
                this.getLogger().error("Exception closing channel: %s", node, e);
            }
        }
    }

    public NodeLocator getLocator() {
        return this.locator;
    }

    public void enqueueOperation(String key, Operation o) {
        this.checkState();
        StringUtils.validateKey(key, this.opFact instanceof BinaryOperationFactory);
        this.addOperation(key, o);
    }

    protected void addOperation(String key, Operation o) {
        MemcachedNode placeIn = null;
        MemcachedNode primary = this.locator.getPrimary(key);
        if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            placeIn = primary;
        } else if (this.failureMode == FailureMode.Cancel) {
            o.cancel();
        } else {
            Iterator<MemcachedNode> i = this.locator.getSequence(key);
            while (placeIn == null && i.hasNext()) {
                MemcachedNode n = i.next();
                if (!n.isActive()) continue;
                placeIn = n;
            }
            if (placeIn == null) {
                placeIn = primary;
                this.getLogger().warn("Could not redistribute to another node, retrying primary node for %s.", key);
            }
        }
        assert (o.isCancelled() || placeIn != null) : "No node found for key " + key;
        if (placeIn != null) {
            this.addOperation(placeIn, o);
        } else assert (o.isCancelled()) : "No node found for " + key + " (and not " + "immediately cancelled)";
    }

    public void insertOperation(MemcachedNode node, Operation o) {
        o.setHandlingNode(node);
        o.initialize();
        node.insertOp(o);
        this.addedQueue.offer(node);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    protected void addOperation(MemcachedNode node, Operation o) {
        o.setHandlingNode(node);
        o.initialize();
        node.addOp(o);
        this.addedQueue.offer(node);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    public void addOperations(Map<MemcachedNode, Operation> ops) {
        for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
            this.addOperation(me.getKey(), me.getValue());
        }
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of) {
        return this.broadcastOperation(of, this.locator.getAll());
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
        CountDownLatch latch = new CountDownLatch(nodes.size());
        for (MemcachedNode node : nodes) {
            this.getLogger().debug("broadcast Operation: node = " + node);
            Operation op = of.newOp(node, latch);
            op.initialize();
            node.addOp(op);
            op.setHandlingNode(node);
            this.addedQueue.offer(node);
            this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        return latch;
    }

    public void shutdown() throws IOException {
        this.shutDown = true;
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        for (MemcachedNode node : this.locator.getAll()) {
            if (node.getChannel() == null) continue;
            node.getChannel().close();
            node.setSk(null);
            if (node.getBytesRemainingToWrite() > 0) {
                this.getLogger().warn("Shut down with %d bytes remaining to write", node.getBytesRemainingToWrite());
            }
            this.getLogger().debug("Shut down channel %s", node.getChannel());
        }
        this.running = false;
        this.selector.close();
        this.getLogger().debug("Shut down selector %s", this.selector);
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        for (MemcachedNode qa : this.locator.getAll()) {
            sb.append(" ").append(qa.getSocketAddress());
        }
        sb.append("}");
        return sb.toString();
    }

    public String connectionsStatus() {
        StringBuilder connStatus = new StringBuilder();
        connStatus.append("Connection Status {");
        for (MemcachedNode node : this.locator.getAll()) {
            connStatus.append(" ").append(node.getSocketAddress()).append(" active: ").append(node.isActive()).append(", authed: ").append(node.isAuthenticated()).append(MessageFormat.format(", last read: {0} ms ago", node.lastReadDelta()));
        }
        connStatus.append(" }");
        return connStatus.toString();
    }

    public static void opTimedOut(Operation op) {
        MemcachedConnection.setTimeout(op, true);
    }

    public static void opSucceeded(Operation op) {
        MemcachedConnection.setTimeout(op, false);
    }

    private static void setTimeout(Operation op, boolean isTimeout) {
        Logger logger = LoggerFactory.getLogger(MemcachedConnection.class);
        try {
            if (op == null || op.isTimedOutUnsent()) {
                return;
            }
            MemcachedNode node = op.getHandlingNode();
            if (node == null) {
                logger.warn("handling node for operation is not set");
            } else {
                node.setContinuousTimeout(isTimeout);
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    protected void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
        assert (this.isAlive()) : "IO Thread is not running.";
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                this.handleIO();
            }
            catch (IOException e) {
                this.logRunException(e);
            }
            catch (CancelledKeyException e) {
                this.logRunException(e);
            }
            catch (ClosedSelectorException e) {
                this.logRunException(e);
            }
            catch (IllegalStateException e) {
                this.logRunException(e);
            }
            catch (ConcurrentModificationException e) {
                this.logRunException(e);
            }
        }
        this.getLogger().info("Shut down memcached client");
    }

    private void logRunException(Exception e) {
        if (this.shutDown) {
            this.getLogger().debug((Object)"Exception occurred during shutdown", e);
        } else {
            this.getLogger().warn((Object)"Problem handling memcached IO", e);
        }
    }

    public boolean isShutDown() {
        return this.shutDown;
    }
}

