/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.processor.CancelException;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannel;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

public class ZeroMQChannelStream<IN, OUT>
extends ChannelStream<IN, OUT> {
    private final ZeroMQConsumerSpec eventSpec = new ZeroMQConsumerSpec();
    private final InetSocketAddress remoteAddress;
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private Subscriber<? super IN> inputSub;

    public ZeroMQChannelStream(Environment env, long prefetch, Dispatcher eventsDispatcher, InetSocketAddress remoteAddress, Codec<Buffer, IN, OUT> codec) {
        super(env, codec, prefetch, eventsDispatcher);
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void doSubscribeWriter(Publisher<? extends OUT> writer, final Subscriber<? super Void> postWriter) {
        writer.subscribe((Subscriber)new DefaultSubscriber<OUT>(){
            ZMsg currentMsg;

            public void onSubscribe(final Subscription subscription) {
                ZeroMQChannelStream.this.eventSpec.close(new Consumer<Void>(){

                    public void accept(Void aVoid) {
                        subscription.cancel();
                    }
                });
                subscription.request(Long.MAX_VALUE);
                postWriter.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
            }

            public void onNext(OUT out) {
                boolean isNewMsg;
                ByteBuffer data;
                if (Buffer.class.isAssignableFrom(out.getClass())) {
                    data = ((Buffer)out).byteBuffer();
                } else if (ZeroMQChannelStream.this.getEncoder() != null) {
                    data = ((Buffer)ZeroMQChannelStream.this.getEncoder().apply(out)).byteBuffer();
                } else {
                    postWriter.onError(Exceptions.addValueAsLastCause((Throwable)new IllegalArgumentException("Data cannot be encoded"), out));
                    return;
                }
                byte[] bytes = new byte[data.remaining()];
                data.get(bytes);
                ZMsg msg = this.currentMsg;
                this.currentMsg = new ZMsg();
                if (msg == null) {
                    msg = this.currentMsg;
                    isNewMsg = true;
                } else {
                    isNewMsg = false;
                }
                if (isNewMsg) {
                    switch (ZeroMQChannelStream.this.socket.getType()) {
                        case 6: {
                            msg.add(new ZFrame(ZeroMQChannelStream.this.connectionId));
                            break;
                        }
                    }
                }
                msg.add(new ZFrame(bytes));
            }

            public void onError(Throwable throwable) {
                ZeroMQChannelStream.this.doFlush(this.currentMsg, (Subscriber<Void>)null);
                postWriter.onError(throwable);
            }

            public void onComplete() {
                ZeroMQChannelStream.this.doFlush(this.currentMsg, (Subscriber<Void>)postWriter);
            }
        });
    }

    @Override
    public void doDecoded(IN in) {
        try {
            if (this.inputSub != null) {
                this.inputSub.onNext(in);
            }
        }
        catch (CancelException cancelException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(Subscriber<? super IN> subscriber) {
        if (subscriber == null) {
            throw new IllegalStateException("Input Subscriber cannot be null");
        }
        ZeroMQChannelStream zeroMQChannelStream = this;
        synchronized (zeroMQChannelStream) {
            if (this.inputSub != null) {
                return;
            }
            this.inputSub = subscriber;
        }
        this.inputSub.onSubscribe((Subscription)new PushSubscription(null, this.inputSub));
    }

    public ZeroMQChannelStream<IN, OUT> setConnectionId(String connectionId) {
        this.connectionId = connectionId;
        return this;
    }

    public ZeroMQChannelStream<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    private void doFlush(ZMsg msg, Subscriber<? super Void> onComplete) {
        if (null != msg) {
            boolean success = msg.send(this.socket);
            if (null != onComplete) {
                if (success) {
                    onComplete.onComplete();
                } else {
                    onComplete.onError((Throwable)new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    public void close() {
        this.getDispatcher().dispatch(null, (Consumer)new Consumer<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void accept(Void v) {
                block6: {
                    try {
                        ArrayList<Consumer<Void>> closeHandlers;
                        List<Consumer<Void>> list = ((ZeroMQChannelStream)ZeroMQChannelStream.this).eventSpec.closeHandlers;
                        synchronized (list) {
                            closeHandlers = new ArrayList<Consumer<Void>>(((ZeroMQChannelStream)ZeroMQChannelStream.this).eventSpec.closeHandlers);
                        }
                        for (Consumer consumer : closeHandlers) {
                            consumer.accept(null);
                        }
                    }
                    catch (Throwable t) {
                        if (ZeroMQChannelStream.this.inputSub == null) break block6;
                        ZeroMQChannelStream.this.inputSub.onError(t);
                    }
                }
            }
        }, null);
    }

    @Override
    public ReactorChannel.ConsumerSpec on() {
        return this.eventSpec;
    }

    public ZMQ.Socket delegate() {
        return this.socket;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.eventSpec.closeHandlers + ", connectionId='" + this.connectionId + '\'' + ", socket=" + this.socket + '}';
    }

    private static class ZeroMQConsumerSpec
    implements ReactorChannel.ConsumerSpec {
        final List<Consumer<Void>> closeHandlers = new ArrayList<Consumer<Void>>();

        private ZeroMQConsumerSpec() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public ReactorChannel.ConsumerSpec close(Consumer<Void> onClose) {
            List<Consumer<Void>> list = this.closeHandlers;
            synchronized (list) {
                this.closeHandlers.add(onClose);
            }
            return this;
        }

        @Override
        public ReactorChannel.ConsumerSpec readIdle(long idleTimeout, Consumer<Void> onReadIdle) {
            return this;
        }

        @Override
        public ReactorChannel.ConsumerSpec writeIdle(long idleTimeout, Consumer<Void> onWriteIdle) {
            return this;
        }
    }
}

