/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.io.buffer.Buffer;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.http.NettyHttpChannel;
import reactor.io.net.impl.netty.http.NettyHttpWSServerHandler;
import reactor.rx.action.support.DefaultSubscriber;

public class NettyHttpServerHandler<IN, OUT>
extends NettyChannelHandlerBridge<IN, OUT> {
    private final NettyChannelStream<IN, OUT> tcpStream;
    protected NettyHttpChannel<IN, OUT> request;

    public NettyHttpServerHandler(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, NettyChannelStream<IN, OUT> tcpStream) {
        super(handler, tcpStream);
        this.tcpStream = tcpStream;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        ctx.read();
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> messageClass = msg.getClass();
        if (this.request == null && HttpRequest.class.isAssignableFrom(messageClass)) {
            this.request = new NettyHttpChannel<IN, OUT>(this.tcpStream, (HttpRequest)msg);
            Publisher closePublisher = (Publisher)this.handler.apply(this.request);
            DefaultSubscriber<Void> closeSub = new DefaultSubscriber<Void>(){
                Subscription subscription;

                public void onSubscribe(Subscription s) {
                    this.subscription = s;
                    s.request(Long.MAX_VALUE);
                }

                public void onError(Throwable t) {
                    log.error("Error processing connection. Closing the channel.", t);
                    if (CHANNEL_REF.get(NettyHttpServerHandler.this) == 0) {
                        ctx.channel().close();
                    }
                }

                public void onComplete() {
                    this.subscription.cancel();
                    if (ctx.channel().isOpen()) {
                        if (log.isDebugEnabled()) {
                            log.debug("Close Http Response ");
                        }
                        NettyHttpServerHandler.this.writeLast(ctx);
                    }
                }
            };
            closePublisher.subscribe((Subscriber)closeSub);
        }
        if (HttpContent.class.isAssignableFrom(messageClass)) {
            super.channelRead(ctx, ((ByteBufHolder)msg).content());
        }
        this.postRead(ctx, msg);
    }

    protected void postRead(ChannelHandlerContext ctx, Object msg) {
        if (this.channelSubscription != null && DefaultLastHttpContent.class.equals(msg.getClass())) {
            this.channelSubscription.onComplete();
            this.channelSubscription = null;
        }
    }

    protected void writeLast(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(this.request.checkHeader() ? this.request.getNettyResponse() : LastHttpContent.EMPTY_LAST_CONTENT).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    @Override
    protected ChannelFuture doOnWrite(Object data, ChannelHandlerContext ctx) {
        if (data.getClass().equals(Buffer.class)) {
            return ctx.write((Object)new DefaultHttpContent(NettyHttpServerHandler.convertBufferToByteBuff(ctx, (Buffer)data)));
        }
        return ctx.write(data);
    }

    @Override
    protected void doOnSubscribe(ChannelHandlerContext ctx, Subscription s) {
        if (this.request.checkHeader()) {
            ctx.write((Object)this.request.getNettyResponse());
        }
    }

    NettyHttpServerHandler<IN, OUT> withWebsocketSupport(String url, String protocols) {
        if (!this.request.checkHeader()) {
            log.error("Cannot enable websocket if headers have already been sent");
            return this;
        }
        return new NettyHttpWSServerHandler(url, protocols, this);
    }
}

