/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.SubscriberWrapper;
import io.smallrye.reactive.messaging.helpers.ClassUtils;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.i18n.ProviderLogging;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class SubscriberMediator
extends AbstractMediator {
    private PublisherBuilder<? extends Message<?>> source;
    private SubscriberBuilder<Message<?>, Void> subscriber;
    private final AtomicReference<Subscription> subscription = new AtomicReference();

    public SubscriberMediator(MediatorConfiguration configuration) {
        super(configuration);
        if (configuration.shape() != Shape.SUBSCRIBER) {
            throw ProviderExceptions.ex.illegalArgumentForSubscriberShape(configuration.shape());
        }
    }

    @Override
    public void initialize(Object bean) {
        super.initialize(bean);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE: 
            case STREAM_OF_PAYLOAD: {
                this.processMethodReturningASubscriber();
                break;
            }
            case MESSAGE: 
            case PAYLOAD: {
                if (ClassUtils.isAssignable(this.configuration.getReturnType(), CompletionStage.class)) {
                    this.processMethodReturningACompletionStage();
                    break;
                }
                if (ClassUtils.isAssignable(this.configuration.getReturnType(), Uni.class)) {
                    this.processMethodReturningAUni();
                    break;
                }
                this.processMethodReturningVoid();
                break;
            }
            default: {
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedConsumption(this.configuration.consumption());
            }
        }
        assert (this.subscriber != null);
    }

    @Override
    public SubscriberBuilder<Message<?>, Void> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override
    public boolean isConnected() {
        return this.source != null;
    }

    @Override
    public void connectToUpstream(PublisherBuilder<? extends Message<?>> publisher) {
        this.source = this.convert(publisher);
    }

    @Override
    public void run() {
        assert (this.source != null);
        assert (this.subscriber != null);
        AtomicReference syncErrorCatcher = new AtomicReference();
        CompletionSubscriber delegate = this.subscriber.build();
        Subscriber delegating = new Subscriber<Message<?>>((Subscriber)delegate, syncErrorCatcher){
            final /* synthetic */ Subscriber val$delegate;
            final /* synthetic */ AtomicReference val$syncErrorCatcher;
            {
                this.val$delegate = subscriber;
                this.val$syncErrorCatcher = atomicReference;
            }

            public void onSubscribe(Subscription s) {
                SubscriberMediator.this.subscription.set(s);
                this.val$delegate.onSubscribe(s);
            }

            public void onNext(Message<?> o) {
                this.val$delegate.onNext(o);
            }

            public void onError(Throwable t) {
                ProviderLogging.log.streamProcessingException(t);
                this.val$syncErrorCatcher.set(t);
                this.val$delegate.onError(t);
            }

            public void onComplete() {
                this.val$delegate.onComplete();
            }
        };
        this.source.to(delegating).run();
        Throwable throwable = (Throwable)syncErrorCatcher.get();
        if (throwable != null) {
            throw ProviderExceptions.ex.weavingForIncoming(this.configuration.getIncoming(), throwable);
        }
    }

    private void processMethodReturningVoid() {
        this.subscriber = this.configuration.isBlocking() ? ReactiveStreams.builder().flatMapCompletionStage(m -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)m)).onItem().transformToUni(msg -> this.invokeBlocking(msg.getPayload())).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)m)).subscribeAsCompletionStage()).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore() : ReactiveStreams.builder().flatMapCompletionStage(m -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)m)).onItem().transform(msg -> this.invoke(msg.getPayload())).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)m)).subscribeAsCompletionStage()).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore();
    }

    private BiFunction<Object, Throwable, Uni<? extends Message<?>>> handleInvocationResult(Message<?> m) {
        return (success, failure) -> {
            if (failure != null) {
                if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                    return Uni.createFrom().completionStage(m.nack(failure).thenApply(x -> m));
                }
                return Uni.createFrom().failure(failure);
            }
            if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                return Uni.createFrom().completionStage(m.ack().thenApply(x -> m));
            }
            return Uni.createFrom().item((Object)m);
        };
    }

    private void processMethodReturningACompletionStage() {
        boolean invokeWithPayload = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(m -> {
            CompletionStage stage = invokeWithPayload ? (CompletionStage)this.invoke(message.getPayload()) : (CompletionStage)this.invoke(message);
            return Uni.createFrom().completionStage(stage.thenApply(x -> message));
        }).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)message)).subscribeAsCompletionStage()).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore();
    }

    private void processMethodReturningAUni() {
        boolean invokeWithPayload = MediatorConfiguration.Consumption.PAYLOAD == this.configuration.consumption();
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> Uni.createFrom().completionStage(this.handlePreProcessingAck((Message<?>)message)).onItem().transformToUni(x -> {
            if (invokeWithPayload) {
                return (Uni)this.invoke(message.getPayload());
            }
            return (Uni)this.invoke(message);
        }).onItemOrFailure().transformToUni(this.handleInvocationResult((Message<?>)message)).subscribeAsCompletionStage()).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore();
    }

    private void processMethodReturningASubscriber() {
        Object result = this.invoke(new Object[0]);
        if (!(result instanceof Subscriber) && !(result instanceof SubscriberBuilder)) {
            throw ProviderExceptions.ex.illegalStateExceptionForSubscriberOrSubscriberBuilder(result.getClass().getName());
        }
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            Object sub = result instanceof Subscriber ? (Subscriber)result : ((SubscriberBuilder)result).build();
            SubscriberWrapper<Object, Message> wrapper = new SubscriberWrapper<Object, Message>((Subscriber<Object>)sub, Message::getPayload, (m, t) -> {
                if (this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
                    if (t != null) {
                        return m.nack(t);
                    }
                    return m.ack();
                }
                CompletableFuture future = new CompletableFuture();
                if (t != null) {
                    future.completeExceptionally((Throwable)t);
                } else {
                    future.complete(null);
                }
                return future;
            });
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this::handlePreProcessingAck).via(wrapper).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore();
        } else {
            Object sub = result instanceof Subscriber ? (Subscriber)result : ((SubscriberBuilder)result).build();
            Subscriber casted = sub;
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(this::handlePreProcessingAck).via(new SubscriberWrapper(casted, Function.identity(), null)).onError(failure -> this.health.reportApplicationFailure(this.configuration.methodAsString(), (Throwable)failure)).ignore();
        }
    }
}

