/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.smallrye.reactive.messaging.pulsar.ConfigResolver;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarFailureHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.SchemaResolver;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapGetter;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;

public class PulsarIncomingChannel<T> {
    private final Consumer<T> consumer;
    private final Flow.Publisher<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> publisher;
    private final String channel;
    private final PulsarAckHandler ackHandler;
    private final PulsarFailureHandler failureHandler;
    private final EventLoopContext context;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final boolean healthEnabled;
    private final boolean tracingEnabled;
    private final Instrumenter<PulsarTrace, Void> instrumenter;

    public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema, PulsarAckHandler.Factory ackHandlerFactory, PulsarFailureHandler.Factory failureHandlerFactory, PulsarConnectorIncomingConfiguration ic, ConfigResolver configResolver) throws PulsarClientException {
        this.channel = ic.getChannel();
        this.healthEnabled = ic.getHealthEnabled();
        this.tracingEnabled = ic.getTracingEnabled();
        ConsumerBuilder builder = client.newConsumer(schema);
        ConsumerConfigurationData<?> conf = configResolver.getConsumerConf(ic);
        if (conf.getSubscriptionName() == null) {
            String s2 = UUID.randomUUID().toString();
            PulsarLogging.log.noSubscriptionName(s2);
            conf.setSubscriptionName(s2);
        }
        if (PulsarIncomingChannel.hasTopicConfig(conf)) {
            conf.setTopicNames(Arrays.stream(ic.getTopic().orElse(this.channel).split(",")).collect(Collectors.toSet()));
        }
        if (conf.getConsumerName() == null) {
            conf.setConsumerName(this.channel);
        }
        builder.loadConf(configResolver.configToMap(conf));
        ic.getDeadLetterPolicyMaxRedeliverCount().ifPresent(i -> builder.deadLetterPolicy(PulsarIncomingChannel.getDeadLetterPolicy(ic, i)));
        ic.getNegativeAckRedeliveryBackoff().ifPresent(s -> builder.negativeAckRedeliveryBackoff(this.parseBackoff((String)s)));
        ic.getAckTimeoutRedeliveryBackoff().ifPresent(s -> builder.ackTimeoutRedeliveryBackoff(this.parseBackoff((String)s)));
        if (conf.getConsumerEventListener() != null) {
            builder.consumerEventListener(conf.getConsumerEventListener());
        }
        if (conf.getPayloadProcessor() != null) {
            builder.messagePayloadProcessor(conf.getPayloadProcessor());
        }
        if (conf.getKeySharedPolicy() != null) {
            builder.keySharedPolicy(conf.getKeySharedPolicy());
        } else if (conf.getSubscriptionType() == SubscriptionType.Key_Shared) {
            builder.keySharedPolicy((KeySharedPolicy)KeySharedPolicy.autoSplitHashRange());
        }
        if (conf.getCryptoKeyReader() != null) {
            builder.cryptoKeyReader(conf.getCryptoKeyReader());
        }
        if (conf.getMessageCrypto() != null) {
            builder.messageCrypto(conf.getMessageCrypto());
        }
        if (ic.getBatchReceive().booleanValue() && conf.getBatchReceivePolicy() == null) {
            builder.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY);
        }
        this.consumer = builder.subscribe();
        PulsarLogging.log.createdConsumerWithConfig(this.channel, SchemaResolver.getSchemaName(schema), conf);
        this.ackHandler = ackHandlerFactory.create(this.consumer, ic);
        this.failureHandler = failureHandlerFactory.create(this.consumer, ic, this::reportFailure);
        this.context = ((VertxInternal)vertx.getDelegate()).createEventLoopContext();
        if (!ic.getBatchReceive().booleanValue()) {
            Multi receiveMulti = Multi.createBy().repeating().completionStage(() -> this.consumer.receiveAsync()).until(m -> this.closed.get()).plug(msgMulti -> {
                if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
                    return msgMulti.onItem().call(msg -> Uni.createFrom().item(() -> ((Message)msg).getValue()));
                }
                return msgMulti;
            }).emitOn(command -> this.context.runOnContext(event -> command.run())).onItem().transform(message -> new PulsarIncomingMessage(message, this.ackHandler, this.failureHandler)).onFailure(throwable -> this.isEndOfStream(client, (Throwable)throwable)).recoverWithCompletion().onFailure().invoke(failure -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, (Throwable)failure);
                this.reportFailure((Throwable)failure, false);
            });
            if (this.tracingEnabled) {
                receiveMulti = receiveMulti.onItem().invoke(this::incomingTrace);
            }
            this.publisher = receiveMulti.emitOn((Executor)this.context.nettyEventLoop()).plug(ContextOperator::apply);
        } else {
            Multi batchReceiveMulti = Multi.createBy().repeating().completionStage(() -> this.consumer.batchReceiveAsync()).until(m -> this.closed.get()).filter(m -> m.size() > 0).plug(msgMulti -> {
                if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
                    return msgMulti.onItem().call(msg -> Uni.createFrom().item(() -> {
                        msg.forEach(m -> m.getValue());
                        return null;
                    }));
                }
                return msgMulti;
            }).emitOn(command -> this.context.runOnContext(event -> command.run())).onItem().transform(m -> new PulsarIncomingBatchMessage(m, this.ackHandler, this.failureHandler)).onFailure(throwable -> this.isEndOfStream(client, (Throwable)throwable)).recoverWithCompletion().onFailure().invoke(failure -> {
                PulsarLogging.log.failedToReceiveFromConsumer(this.channel, (Throwable)failure);
                this.reportFailure((Throwable)failure, false);
            });
            if (this.tracingEnabled) {
                batchReceiveMulti = batchReceiveMulti.onItem().invoke(this::incomingBatchTrace);
            }
            this.publisher = batchReceiveMulti.emitOn((Executor)this.context.nettyEventLoop()).plug(ContextOperator::apply);
        }
        PulsarAttributesExtractor attributesExtractor = new PulsarAttributesExtractor();
        MessagingAttributesGetter<PulsarTrace, Void> messagingAttributesGetter = attributesExtractor.getMessagingAttributesGetter();
        InstrumenterBuilder instrumenterBuilder = Instrumenter.builder((OpenTelemetry)GlobalOpenTelemetry.get(), (String)"io.smallrye.reactive.messaging", (SpanNameExtractor)MessagingSpanNameExtractor.create(messagingAttributesGetter, (MessageOperation)MessageOperation.RECEIVE));
        this.instrumenter = instrumenterBuilder.addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, (MessageOperation)MessageOperation.RECEIVE)).addAttributesExtractor((AttributesExtractor)attributesExtractor).buildConsumerInstrumenter((TextMapGetter)PulsarTraceTextMapGetter.INSTANCE);
    }

    public void incomingTrace(PulsarMessage<T> pulsarMessage) {
        PulsarIncomingMessageMetadata metadata = (PulsarIncomingMessageMetadata)pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get();
        TracingUtils.traceIncoming(this.instrumenter, pulsarMessage, (Object)new PulsarTrace.Builder().withConsumerName(this.consumer.getConsumerName()).withMessage(metadata.getMessage()).build());
    }

    public void incomingBatchTrace(PulsarIncomingBatchMessage<T> pulsarMessage) {
        for (PulsarMessage<T> message : pulsarMessage.getMessages()) {
            this.incomingTrace(message);
        }
    }

    private boolean isEndOfStream(PulsarClient client, Throwable throwable) {
        if (this.closed.get()) {
            return true;
        }
        if (this.consumer.hasReachedEndOfTopic()) {
            PulsarLogging.log.consumerReachedEndOfTopic(this.channel);
            return true;
        }
        if (client.isClosed()) {
            PulsarLogging.log.clientClosed(this.channel, throwable);
            return true;
        }
        return false;
    }

    private static DeadLetterPolicy getDeadLetterPolicy(PulsarConnectorIncomingConfiguration ic, Integer redeliverCount) {
        return DeadLetterPolicy.builder().maxRedeliverCount(redeliverCount.intValue()).deadLetterTopic((String)ic.getDeadLetterPolicyDeadLetterTopic().orElse(null)).retryLetterTopic((String)ic.getDeadLetterPolicyRetryLetterTopic().orElse(null)).initialSubscriptionName((String)ic.getDeadLetterPolicyInitialSubscriptionName().orElse(null)).build();
    }

    private RedeliveryBackoff parseBackoff(String backoffString) {
        String[] strings = backoffString.split(",");
        try {
            return MultiplierRedeliveryBackoff.builder().minDelayMs(Long.parseLong(strings[0])).maxDelayMs(Long.parseLong(strings[1])).multiplier(Double.parseDouble(strings[2])).build();
        }
        catch (Exception e) {
            PulsarLogging.log.unableToParseRedeliveryBackoff(backoffString, this.channel);
            return null;
        }
    }

    private static boolean hasTopicConfig(ConsumerConfigurationData<?> conf) {
        return conf.getTopicsPattern() != null || conf.getTopicNames() != null && conf.getTopicNames().isEmpty();
    }

    public Flow.Publisher<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> getPublisher() {
        return this.publisher;
    }

    public String getChannel() {
        return this.channel;
    }

    public Consumer<T> getConsumer() {
        return this.consumer;
    }

    public void close() {
        this.closed.set(true);
        try {
            this.consumer.close();
        }
        catch (PulsarClientException e) {
            PulsarLogging.log.unableToCloseConsumer(e);
        }
    }

    public synchronized void reportFailure(Throwable failure, boolean fatal) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
        if (fatal) {
            this.close();
        }
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.consumer.isConnected());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        this.isStarted(builder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            ArrayList<Throwable> actualFailures;
            PulsarIncomingChannel pulsarIncomingChannel = this;
            synchronized (pulsarIncomingChannel) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.channel, false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.channel, true);
            }
        }
    }
}

