/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.auth.AuthDescriptor;
import net.spy.memcached.auth.AuthThreadMonitor;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.ops.CASOperation;
import net.spy.memcached.ops.CASOperationStatus;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.ConcatenationOperation;
import net.spy.memcached.ops.ConcatenationType;
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.FlushOperation;
import net.spy.memcached.ops.GetAndTouchOperation;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.ops.TimedOutOperationStatus;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.transcoders.TranscodeService;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.util.StringUtils;

public class MemcachedClient
extends SpyObject
implements MemcachedClientIF,
ConnectionObserver {
    protected volatile boolean shuttingDown;
    protected final long operationTimeout;
    protected final MemcachedConnection mconn;
    protected final OperationFactory opFact;
    protected final Transcoder<Object> transcoder;
    protected final TranscodeService tcService;
    protected final AuthDescriptor authDescriptor;
    protected final ConnectionFactory connFactory;
    protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor();
    protected final ExecutorService executorService;

    public MemcachedClient(InetSocketAddress ... ia) throws IOException {
        this(new DefaultConnectionFactory(), Arrays.asList(ia));
    }

    public MemcachedClient(List<InetSocketAddress> addrs) throws IOException {
        this(new DefaultConnectionFactory(), addrs);
    }

    public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs) throws IOException {
        if (cf == null) {
            throw new NullPointerException("Connection factory required");
        }
        if (addrs == null) {
            throw new NullPointerException("Server list required");
        }
        if (addrs.isEmpty()) {
            throw new IllegalArgumentException("You must have at least one server to connect to");
        }
        if (cf.getOperationTimeout() <= 0L) {
            throw new IllegalArgumentException("Operation timeout must be positive.");
        }
        this.connFactory = cf;
        this.tcService = new TranscodeService(cf.isDaemon());
        this.transcoder = cf.getDefaultTranscoder();
        this.opFact = cf.getOperationFactory();
        assert (this.opFact != null) : "Connection factory failed to make op factory";
        this.mconn = cf.createConnection(addrs);
        assert (this.mconn != null) : "Connection factory failed to make a connection";
        this.operationTimeout = cf.getOperationTimeout();
        this.authDescriptor = cf.getAuthDescriptor();
        this.executorService = cf.getListenerExecutorService();
        if (this.authDescriptor != null) {
            this.addObserver(this);
        }
    }

    @Override
    public Collection<SocketAddress> getAvailableServers() {
        ArrayList<SocketAddress> rv = new ArrayList<SocketAddress>();
        for (MemcachedNode node : this.mconn.getLocator().getAll()) {
            if (!node.isActive()) continue;
            rv.add(node.getSocketAddress());
        }
        return rv;
    }

    @Override
    public Collection<SocketAddress> getUnavailableServers() {
        ArrayList<SocketAddress> rv = new ArrayList<SocketAddress>();
        for (MemcachedNode node : this.mconn.getLocator().getAll()) {
            if (node.isActive()) continue;
            rv.add(node.getSocketAddress());
        }
        return rv;
    }

    @Override
    public NodeLocator getNodeLocator() {
        return this.mconn.getLocator().getReadonlyCopy();
    }

    @Override
    public Transcoder<Object> getTranscoder() {
        return this.transcoder;
    }

    @Override
    public CountDownLatch broadcastOp(BroadcastOpFactory of) {
        return this.broadcastOp(of, this.mconn.getLocator().getAll(), true);
    }

    @Override
    public CountDownLatch broadcastOp(BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
        return this.broadcastOp(of, nodes, true);
    }

    private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection<MemcachedNode> nodes, boolean checkShuttingDown) {
        if (checkShuttingDown && this.shuttingDown) {
            throw new IllegalStateException("Shutting down");
        }
        return this.mconn.broadcastOperation(of, nodes);
    }

    private <T> OperationFuture<Boolean> asyncStore(StoreType storeType, String key, int exp, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, this.operationTimeout, this.executorService);
        StoreOperation op = this.opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                rv.set(val.isSuccess(), val);
            }

            @Override
            public void gotData(String key, long cas) {
                rv.setCas(cas);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    private OperationFuture<Boolean> asyncStore(StoreType storeType, String key, int exp, Object value) {
        return this.asyncStore(storeType, key, exp, value, this.transcoder);
    }

    private <T> OperationFuture<Boolean> asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, this.operationTimeout, this.executorService);
        ConcatenationOperation op = this.opFact.cat(catType, cas, key, co.getData(), new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                rv.set(val.isSuccess(), val);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public <T> OperationFuture<Boolean> touch(String key, int exp) {
        return this.touch(key, exp, (Transcoder)this.transcoder);
    }

    public <T> OperationFuture<Boolean> touch(String key, int exp, Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, this.operationTimeout, this.executorService);
        KeyedOperation op = this.opFact.touch(key, exp, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(status.isSuccess(), status);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public OperationFuture<Boolean> append(long cas, String key, Object val) {
        return this.append(cas, key, val, (Transcoder)this.transcoder);
    }

    public OperationFuture<Boolean> append(String key, Object val) {
        return this.append(0L, key, val, (Transcoder)this.transcoder);
    }

    public <T> OperationFuture<Boolean> append(long cas, String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.append, cas, key, val, tc);
    }

    public <T> OperationFuture<Boolean> append(String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.append, 0L, key, val, tc);
    }

    public OperationFuture<Boolean> prepend(long cas, String key, Object val) {
        return this.prepend(cas, key, val, (Transcoder)this.transcoder);
    }

    public OperationFuture<Boolean> prepend(String key, Object val) {
        return this.prepend(0L, key, val, (Transcoder)this.transcoder);
    }

    public <T> OperationFuture<Boolean> prepend(long cas, String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.prepend, cas, key, val, tc);
    }

    public <T> OperationFuture<Boolean> prepend(String key, T val, Transcoder<T> tc) {
        return this.asyncCat(ConcatenationType.prepend, 0L, key, val, tc);
    }

    public <T> OperationFuture<CASResponse> asyncCAS(String key, long casId, T value, Transcoder<T> tc) {
        return this.asyncCAS(key, casId, 0, value, tc);
    }

    @Override
    public <T> OperationFuture<CASResponse> asyncCAS(String key, long casId, int exp, T value, Transcoder<T> tc) {
        CachedData co = tc.encode(value);
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<CASResponse> rv = new OperationFuture<CASResponse>(key, latch, this.operationTimeout, this.executorService);
        CASOperation op = this.opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new StoreOperation.Callback(){

            @Override
            public void receivedStatus(OperationStatus val) {
                if (val instanceof CASOperationStatus) {
                    rv.set(((CASOperationStatus)val).getCASResponse(), val);
                } else if (val instanceof CancelledOperationStatus) {
                    MemcachedClient.this.getLogger().debug("CAS operation cancelled");
                } else if (val instanceof TimedOutOperationStatus) {
                    MemcachedClient.this.getLogger().debug("CAS operation timed out");
                } else {
                    throw new RuntimeException("Unhandled state: " + val);
                }
            }

            @Override
            public void gotData(String key, long cas) {
                rv.setCas(cas);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public OperationFuture<CASResponse> asyncCAS(String key, long casId, Object value) {
        return this.asyncCAS(key, casId, value, (Transcoder)this.transcoder);
    }

    public OperationFuture<CASResponse> asyncCAS(String key, long casId, int exp, Object value) {
        return this.asyncCAS(key, casId, exp, value, this.transcoder);
    }

    @Override
    public <T> CASResponse cas(String key, long casId, T value, Transcoder<T> tc) {
        return this.cas(key, casId, 0, value, tc);
    }

    @Override
    public <T> CASResponse cas(String key, long casId, int exp, T value, Transcoder<T> tc) {
        try {
            OperationFuture<CASResponse> casOp = this.asyncCAS(key, casId, exp, value, tc);
            CASResponse casr = casOp.get(this.operationTimeout, TimeUnit.MILLISECONDS);
            return casr;
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value: " + this.buildTimeoutMessage(this.operationTimeout, TimeUnit.MILLISECONDS), e);
        }
    }

    @Override
    public CASResponse cas(String key, long casId, Object value) {
        return this.cas(key, casId, value, this.transcoder);
    }

    @Override
    public CASResponse cas(String key, long casId, int exp, Object value) {
        return this.cas(key, casId, exp, value, this.transcoder);
    }

    public <T> OperationFuture<Boolean> add(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.add, key, exp, o, tc);
    }

    public OperationFuture<Boolean> add(String key, int exp, Object o) {
        return this.asyncStore(StoreType.add, key, exp, o, this.transcoder);
    }

    public <T> OperationFuture<Boolean> set(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.set, key, exp, o, tc);
    }

    public OperationFuture<Boolean> set(String key, int exp, Object o) {
        return this.asyncStore(StoreType.set, key, exp, o, this.transcoder);
    }

    public <T> OperationFuture<Boolean> replace(String key, int exp, T o, Transcoder<T> tc) {
        return this.asyncStore(StoreType.replace, key, exp, o, tc);
    }

    public OperationFuture<Boolean> replace(String key, int exp, Object o) {
        return this.asyncStore(StoreType.replace, key, exp, o, this.transcoder);
    }

    public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final GetFuture rv = new GetFuture(latch, this.operationTimeout, key, this.executorService);
        GetOperation op = this.opFact.get(key, new GetOperation.Callback(){
            private Future<T> val;

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(this.val, status);
            }

            @Override
            public void gotData(String k, int flags, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                this.val = MemcachedClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public GetFuture<Object> asyncGet(String key) {
        return this.asyncGet(key, (Transcoder)this.transcoder);
    }

    public <T> OperationFuture<CASValue<T>> asyncGets(final String key, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<CASValue<T>> rv = new OperationFuture<CASValue<T>>(key, latch, this.operationTimeout, this.executorService);
        GetsOperation op = this.opFact.gets(key, new GetsOperation.Callback(){
            private CASValue<T> val;

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(this.val, status);
            }

            @Override
            public void gotData(String k, int flags, long cas, byte[] data) {
                assert (key.equals(k)) : "Wrong key returned";
                this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public OperationFuture<CASValue<Object>> asyncGets(String key) {
        return this.asyncGets(key, (Transcoder)this.transcoder);
    }

    @Override
    public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
        try {
            return (CASValue)((OperationFuture)this.asyncGets(key, (Transcoder)tc)).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", e);
        }
    }

    @Override
    public <T> CASValue<T> getAndTouch(String key, int exp, Transcoder<T> tc) {
        try {
            return (CASValue)((OperationFuture)this.asyncGetAndTouch(key, exp, (Transcoder)tc)).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value", e);
        }
    }

    @Override
    public CASValue<Object> getAndTouch(String key, int exp) {
        return this.getAndTouch(key, exp, this.transcoder);
    }

    @Override
    public CASValue<Object> gets(String key) {
        return this.gets(key, this.transcoder);
    }

    @Override
    public <T> T get(String key, Transcoder<T> tc) {
        try {
            return ((GetFuture)this.asyncGet(key, (Transcoder)tc)).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for value", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for value", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for value: " + this.buildTimeoutMessage(this.operationTimeout, TimeUnit.MILLISECONDS), e);
        }
    }

    @Override
    public Object get(String key) {
        return this.get(key, this.transcoder);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keyIter, Iterator<Transcoder<T>> tcIter) {
        final ConcurrentHashMap m = new ConcurrentHashMap();
        final HashMap<String, Transcoder<T>> tcMap = new HashMap<String, Transcoder<T>>();
        HashMap<MemcachedNode, ArrayList<String>> chunks = new HashMap<MemcachedNode, ArrayList<String>>();
        NodeLocator locator = this.mconn.getLocator();
        while (keyIter.hasNext() && tcIter.hasNext()) {
            String key = keyIter.next();
            tcMap.put(key, tcIter.next());
            StringUtils.validateKey(key, this.opFact instanceof BinaryOperationFactory);
            MemcachedNode primaryNode = locator.getPrimary(key);
            MemcachedNode node = null;
            if (primaryNode.isActive()) {
                node = primaryNode;
            } else {
                Iterator<MemcachedNode> i = locator.getSequence(key);
                while (node == null && i.hasNext()) {
                    MemcachedNode n = i.next();
                    if (!n.isActive()) continue;
                    node = n;
                }
                if (node == null) {
                    node = primaryNode;
                }
            }
            assert (node != null) : "Didn't find a node for " + key;
            ArrayList<String> ks = (ArrayList<String>)chunks.get(node);
            if (ks == null) {
                ks = new ArrayList<String>();
                chunks.put(node, ks);
            }
            ks.add(key);
        }
        final AtomicInteger pendingChunks = new AtomicInteger(chunks.size());
        int initialLatchCount = chunks.isEmpty() ? 0 : 1;
        final CountDownLatch latch = new CountDownLatch(initialLatchCount);
        ArrayList<Operation> ops = new ArrayList<Operation>(chunks.size());
        final BulkGetFuture rv = new BulkGetFuture(m, ops, latch, this.executorService);
        GetOperation.Callback cb = new GetOperation.Callback(){

            @Override
            public void receivedStatus(OperationStatus status) {
                if (status.getStatusCode() == StatusCode.ERR_NOT_MY_VBUCKET) {
                    pendingChunks.addAndGet(Integer.parseInt(status.getMessage()));
                }
                rv.setStatus(status);
            }

            @Override
            public void gotData(String k, int flags, byte[] data) {
                Transcoder tc = (Transcoder)tcMap.get(k);
                m.put(k, MemcachedClient.this.tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
            }

            @Override
            public void complete() {
                if (pendingChunks.decrementAndGet() <= 0) {
                    latch.countDown();
                    rv.signalComplete();
                }
            }
        };
        HashMap<MemcachedNode, Operation> mops = new HashMap<MemcachedNode, Operation>();
        for (Map.Entry me : chunks.entrySet()) {
            GetOperation op = this.opFact.get((Collection)me.getValue(), cb);
            mops.put((MemcachedNode)me.getKey(), op);
            ops.add(op);
        }
        assert (mops.size() == chunks.size());
        this.mconn.checkState();
        this.mconn.addOperations(mops);
        return rv;
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Iterator<Transcoder<T>> tcIter) {
        return this.asyncGetBulk(keys.iterator(), tcIter);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keyIter, Transcoder<T> tc) {
        return this.asyncGetBulk(keyIter, new SingleElementInfiniteIterator<Transcoder<T>>(tc));
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, Transcoder<T> tc) {
        return this.asyncGetBulk(keys, new SingleElementInfiniteIterator<Transcoder<T>>(tc));
    }

    @Override
    public BulkFuture<Map<String, Object>> asyncGetBulk(Iterator<String> keyIter) {
        return this.asyncGetBulk(keyIter, this.transcoder);
    }

    @Override
    public BulkFuture<Map<String, Object>> asyncGetBulk(Collection<String> keys) {
        return this.asyncGetBulk(keys, this.transcoder);
    }

    @Override
    public <T> BulkFuture<Map<String, T>> asyncGetBulk(Transcoder<T> tc, String ... keys) {
        return this.asyncGetBulk(Arrays.asList(keys), tc);
    }

    @Override
    public BulkFuture<Map<String, Object>> asyncGetBulk(String ... keys) {
        return this.asyncGetBulk(Arrays.asList(keys), this.transcoder);
    }

    public OperationFuture<CASValue<Object>> asyncGetAndTouch(String key, int exp) {
        return this.asyncGetAndTouch(key, exp, (Transcoder)this.transcoder);
    }

    public <T> OperationFuture<CASValue<T>> asyncGetAndTouch(final String key, int exp, final Transcoder<T> tc) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<CASValue<T>> rv = new OperationFuture<CASValue<T>>(key, latch, this.operationTimeout, this.executorService);
        GetAndTouchOperation op = this.opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback(){
            private CASValue<T> val;

            @Override
            public void receivedStatus(OperationStatus status) {
                rv.set(this.val, status);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }

            @Override
            public void gotData(String k, int flags, long cas, byte[] data) {
                assert (k.equals(key)) : "Wrong key returned";
                this.val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
            }
        });
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    @Override
    public <T> Map<String, T> getBulk(Iterator<String> keyIter, Transcoder<T> tc) {
        try {
            return (Map)this.asyncGetBulk(keyIter, tc).get(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted getting bulk values", e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CancellationException) {
                throw (CancellationException)e.getCause();
            }
            throw new RuntimeException("Exception waiting for bulk values", e);
        }
        catch (TimeoutException e) {
            throw new OperationTimeoutException("Timeout waiting for bulk values: " + this.buildTimeoutMessage(this.operationTimeout, TimeUnit.MILLISECONDS), e);
        }
    }

    @Override
    public Map<String, Object> getBulk(Iterator<String> keyIter) {
        return this.getBulk(keyIter, this.transcoder);
    }

    @Override
    public <T> Map<String, T> getBulk(Collection<String> keys, Transcoder<T> tc) {
        return this.getBulk(keys.iterator(), tc);
    }

    @Override
    public Map<String, Object> getBulk(Collection<String> keys) {
        return this.getBulk(keys, this.transcoder);
    }

    @Override
    public <T> Map<String, T> getBulk(Transcoder<T> tc, String ... keys) {
        return this.getBulk(Arrays.asList(keys), tc);
    }

    @Override
    public Map<String, Object> getBulk(String ... keys) {
        return this.getBulk(Arrays.asList(keys), this.transcoder);
    }

    @Override
    public Map<SocketAddress, String> getVersions() {
        final ConcurrentHashMap<SocketAddress, String> rv = new ConcurrentHashMap<SocketAddress, String>();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                final SocketAddress sa = n.getSocketAddress();
                return MemcachedClient.this.opFact.version(new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus s) {
                        rv.put(sa, s.getMessage());
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for versions", e);
        }
        return rv;
    }

    @Override
    public Map<SocketAddress, Map<String, String>> getStats() {
        return this.getStats(null);
    }

    @Override
    public Map<SocketAddress, Map<String, String>> getStats(final String arg) {
        final HashMap<SocketAddress, Map<String, String>> rv = new HashMap<SocketAddress, Map<String, String>>();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                final SocketAddress sa = n.getSocketAddress();
                rv.put(sa, new HashMap());
                return MemcachedClient.this.opFact.stats(arg, new StatsOperation.Callback(){

                    @Override
                    public void gotStat(String name, String val) {
                        ((Map)rv.get(sa)).put(name, val);
                    }

                    @Override
                    public void receivedStatus(OperationStatus status) {
                        if (!status.isSuccess()) {
                            MemcachedClient.this.getLogger().warn("Unsuccessful stat fetch: %s", status);
                        }
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await(this.operationTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for stats", e);
        }
        return rv;
    }

    private long mutate(Mutator m, String key, long by, long def, int exp) {
        final AtomicLong rv = new AtomicLong();
        final CountDownLatch latch = new CountDownLatch(1);
        this.mconn.enqueueOperation(key, this.opFact.mutate(m, key, by, def, exp, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"));
            }

            @Override
            public void complete() {
                latch.countDown();
            }
        }));
        try {
            if (!latch.await(this.operationTimeout, TimeUnit.MILLISECONDS)) {
                throw new OperationTimeoutException("Mutate operation timed out,unable to modify counter [" + key + ']');
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
        this.getLogger().debug("Mutation returned %s", rv);
        return rv.get();
    }

    @Override
    public long incr(String key, long by) {
        return this.mutate(Mutator.incr, key, by, 0L, -1);
    }

    @Override
    public long incr(String key, int by) {
        return this.mutate(Mutator.incr, key, by, 0L, -1);
    }

    @Override
    public long decr(String key, long by) {
        return this.mutate(Mutator.decr, key, by, 0L, -1);
    }

    @Override
    public long decr(String key, int by) {
        return this.mutate(Mutator.decr, key, by, 0L, -1);
    }

    @Override
    public long incr(String key, long by, long def, int exp) {
        return this.mutateWithDefault(Mutator.incr, key, by, def, exp);
    }

    @Override
    public long incr(String key, int by, long def, int exp) {
        return this.mutateWithDefault(Mutator.incr, key, by, def, exp);
    }

    @Override
    public long decr(String key, long by, long def, int exp) {
        return this.mutateWithDefault(Mutator.decr, key, by, def, exp);
    }

    @Override
    public long decr(String key, int by, long def, int exp) {
        return this.mutateWithDefault(Mutator.decr, key, by, def, exp);
    }

    private long mutateWithDefault(Mutator t, String key, long by, long def, int exp) {
        long rv = this.mutate(t, key, by, def, exp);
        if (rv == -1L) {
            OperationFuture<Boolean> f = this.asyncStore(StoreType.add, key, exp, String.valueOf(def));
            try {
                if (((Boolean)f.get(this.operationTimeout, TimeUnit.MILLISECONDS)).booleanValue()) {
                    rv = def;
                } else {
                    rv = this.mutate(t, key, by, 0L, exp);
                    assert (rv != -1L) : "Failed to mutate or init value";
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted waiting for store", e);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof CancellationException) {
                    throw (CancellationException)e.getCause();
                }
                throw new RuntimeException("Failed waiting for store", e);
            }
            catch (TimeoutException e) {
                throw new OperationTimeoutException("Timeout waiting to mutate or init value" + this.buildTimeoutMessage(this.operationTimeout, TimeUnit.MILLISECONDS), e);
            }
        }
        return rv;
    }

    private OperationFuture<Long> asyncMutate(Mutator m, String key, long by, long def, int exp) {
        if (!(this.opFact instanceof BinaryOperationFactory || def == 0L && exp == -1)) {
            throw new UnsupportedOperationException("Default value or expiration time are not supported on the async mutate methods. Use either the binary protocol or the sync variant.");
        }
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Long> rv = new OperationFuture<Long>(key, latch, this.operationTimeout, this.executorService);
        MutatorOperation op = this.opFact.mutate(m, key, by, def, exp, new OperationCallback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"), s);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        });
        this.mconn.enqueueOperation(key, op);
        rv.setOperation(op);
        return rv;
    }

    public OperationFuture<Long> asyncIncr(String key, long by) {
        return this.asyncMutate(Mutator.incr, key, by, 0L, -1);
    }

    public OperationFuture<Long> asyncIncr(String key, int by) {
        return this.asyncMutate(Mutator.incr, key, by, 0L, -1);
    }

    public OperationFuture<Long> asyncDecr(String key, long by) {
        return this.asyncMutate(Mutator.decr, key, by, 0L, -1);
    }

    public OperationFuture<Long> asyncDecr(String key, int by) {
        return this.asyncMutate(Mutator.decr, key, by, 0L, -1);
    }

    public OperationFuture<Long> asyncIncr(String key, long by, long def, int exp) {
        return this.asyncMutate(Mutator.incr, key, by, def, exp);
    }

    public OperationFuture<Long> asyncIncr(String key, int by, long def, int exp) {
        return this.asyncMutate(Mutator.incr, key, by, def, exp);
    }

    public OperationFuture<Long> asyncDecr(String key, long by, long def, int exp) {
        return this.asyncMutate(Mutator.decr, key, by, def, exp);
    }

    public OperationFuture<Long> asyncDecr(String key, int by, long def, int exp) {
        return this.asyncMutate(Mutator.decr, key, by, def, exp);
    }

    public OperationFuture<Long> asyncIncr(String key, long by, long def) {
        return this.asyncMutate(Mutator.incr, key, by, def, 0);
    }

    public OperationFuture<Long> asyncIncr(String key, int by, long def) {
        return this.asyncMutate(Mutator.incr, key, by, def, 0);
    }

    public OperationFuture<Long> asyncDecr(String key, long by, long def) {
        return this.asyncMutate(Mutator.decr, key, by, def, 0);
    }

    public OperationFuture<Long> asyncDecr(String key, int by, long def) {
        return this.asyncMutate(Mutator.decr, key, by, def, 0);
    }

    @Override
    public long incr(String key, long by, long def) {
        return this.mutateWithDefault(Mutator.incr, key, by, def, 0);
    }

    @Override
    public long incr(String key, int by, long def) {
        return this.mutateWithDefault(Mutator.incr, key, by, def, 0);
    }

    @Override
    public long decr(String key, long by, long def) {
        return this.mutateWithDefault(Mutator.decr, key, by, def, 0);
    }

    @Override
    public long decr(String key, int by, long def) {
        return this.mutateWithDefault(Mutator.decr, key, by, def, 0);
    }

    @Deprecated
    public OperationFuture<Boolean> delete(String key, int hold) {
        return this.delete(key);
    }

    public OperationFuture<Boolean> delete(String key) {
        return this.delete(key, 0L);
    }

    public OperationFuture<Boolean> delete(String key, long cas) {
        final CountDownLatch latch = new CountDownLatch(1);
        final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, this.operationTimeout, this.executorService);
        DeleteOperation.Callback callback = new DeleteOperation.Callback(){

            @Override
            public void receivedStatus(OperationStatus s) {
                rv.set(s.isSuccess(), s);
            }

            @Override
            public void gotData(long cas) {
                rv.setCas(cas);
            }

            @Override
            public void complete() {
                latch.countDown();
                rv.signalComplete();
            }
        };
        DeleteOperation op = cas == 0L ? this.opFact.delete(key, callback) : this.opFact.delete(key, cas, callback);
        rv.setOperation(op);
        this.mconn.enqueueOperation(key, op);
        return rv;
    }

    public OperationFuture<Boolean> flush(final int delay) {
        final AtomicReference<Object> flushResult = new AtomicReference<Object>(null);
        final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                FlushOperation op = MemcachedClient.this.opFact.flush(delay, new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus s) {
                        flushResult.set(s.isSuccess());
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
                ops.add(op);
                return op;
            }
        });
        return new OperationFuture<Boolean>(null, blatch, flushResult, this.operationTimeout, this.executorService){

            @Override
            public void set(Boolean o, OperationStatus s) {
                super.set(o, s);
                this.notifyListeners();
            }

            @Override
            public boolean cancel(boolean ign) {
                boolean rv = false;
                for (Operation op : ops) {
                    op.cancel();
                    rv |= op.getState() == OperationState.WRITE_QUEUED;
                }
                this.notifyListeners();
                return rv;
            }

            @Override
            public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException {
                this.status = new OperationStatus(true, "OK", StatusCode.SUCCESS);
                return (Boolean)super.get(duration, units);
            }

            @Override
            public boolean isCancelled() {
                boolean rv = false;
                for (Operation op : ops) {
                    rv |= op.isCancelled();
                }
                return rv;
            }

            @Override
            public boolean isDone() {
                boolean rv = true;
                for (Operation op : ops) {
                    rv &= op.getState() == OperationState.COMPLETE;
                }
                return rv || this.isCancelled();
            }
        };
    }

    public OperationFuture<Boolean> flush() {
        return this.flush(-1);
    }

    @Override
    public Set<String> listSaslMechanisms() {
        final ConcurrentHashMap rv = new ConcurrentHashMap();
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                return MemcachedClient.this.opFact.saslMechs(new OperationCallback(){

                    @Override
                    public void receivedStatus(OperationStatus status) {
                        for (String s : status.getMessage().split(" ")) {
                            rv.put(s, s);
                        }
                    }

                    @Override
                    public void complete() {
                        latch.countDown();
                    }
                });
            }
        });
        try {
            blatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return rv.keySet();
    }

    @Override
    public void shutdown() {
        this.shutdown(-1L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean shutdown(long timeout, TimeUnit unit) {
        if (this.shuttingDown) {
            this.getLogger().info("Suppressing duplicate attempt to shut down");
            return false;
        }
        this.shuttingDown = true;
        String baseName = this.mconn.getName();
        this.mconn.setName(baseName + " - SHUTTING DOWN");
        boolean rv = true;
        if (this.connFactory.isDefaultExecutorService()) {
            try {
                this.executorService.shutdown();
            }
            catch (Exception ex) {
                this.getLogger().warn((Object)"Failed shutting down the ExecutorService: ", ex);
            }
        }
        try {
            if (timeout > 0L) {
                this.mconn.setName(baseName + " - SHUTTING DOWN (waiting)");
                rv = this.waitForQueues(timeout, unit);
            }
        }
        finally {
            try {
                this.mconn.setName(baseName + " - SHUTTING DOWN (telling client)");
                this.mconn.shutdown();
                this.mconn.setName(baseName + " - SHUTTING DOWN (informed client)");
                this.tcService.shutdown();
                this.authMonitor.interruptAllPendingAuth();
            }
            catch (IOException e) {
                this.getLogger().warn((Object)"exception while shutting down", e);
            }
        }
        return rv;
    }

    @Override
    public boolean waitForQueues(long timeout, TimeUnit unit) {
        CountDownLatch blatch = this.broadcastOp(new BroadcastOpFactory(){

            @Override
            public Operation newOp(MemcachedNode n, final CountDownLatch latch) {
                return MemcachedClient.this.opFact.noop(new OperationCallback(){

                    @Override
                    public void complete() {
                        latch.countDown();
                    }

                    @Override
                    public void receivedStatus(OperationStatus s) {
                    }
                });
            }
        }, this.mconn.getLocator().getAll(), false);
        try {
            return blatch.await(timeout, unit);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for queues", e);
        }
    }

    @Override
    public boolean addObserver(ConnectionObserver obs) {
        boolean rv = this.mconn.addObserver(obs);
        if (rv) {
            for (MemcachedNode node : this.mconn.getLocator().getAll()) {
                if (!node.isActive()) continue;
                obs.connectionEstablished(node.getSocketAddress(), -1);
            }
        }
        return rv;
    }

    @Override
    public boolean removeObserver(ConnectionObserver obs) {
        return this.mconn.removeObserver(obs);
    }

    @Override
    public void connectionEstablished(SocketAddress sa, int reconnectCount) {
        if (this.authDescriptor != null) {
            if (this.authDescriptor.authThresholdReached()) {
                this.shutdown();
            }
            this.authMonitor.authConnection(this.mconn, this.opFact, this.authDescriptor, this.findNode(sa));
        }
    }

    private MemcachedNode findNode(SocketAddress sa) {
        MemcachedNode node = null;
        for (MemcachedNode n : this.mconn.getLocator().getAll()) {
            if (!n.getSocketAddress().equals(sa)) continue;
            node = n;
        }
        assert (node != null) : "Couldn't find node connected to " + sa;
        return node;
    }

    private String buildTimeoutMessage(long timeWaited, TimeUnit unit) {
        StringBuilder message = new StringBuilder();
        message.append(MessageFormat.format("waited {0} ms.", unit.convert(timeWaited, TimeUnit.MILLISECONDS)));
        message.append(" Node status: ").append(this.mconn.connectionsStatus());
        return message.toString();
    }

    @Override
    public void connectionLost(SocketAddress sa) {
    }

    public String toString() {
        return this.connFactory.toString();
    }
}

