/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar.ack;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Optional;
import org.apache.pulsar.client.api.Consumer;

public class PulsarMessageAck
implements PulsarAckHandler {
    public static final String STRATEGY_NAME = "ack";
    private final Consumer<?> consumer;

    public PulsarMessageAck(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    @Override
    public Uni<Void> handle(PulsarIncomingMessage<?> message) {
        return Uni.createFrom().completionStage(() -> {
            Optional txnMetadata = message.getMetadata(PulsarTransactionMetadata.class);
            if (txnMetadata.isPresent()) {
                return this.consumer.acknowledgeAsync(message.getMessageId(), ((PulsarTransactionMetadata)txnMetadata.get()).getTransaction());
            }
            return this.consumer.acknowledgeAsync(message.getMessageId());
        }).onFailure().invoke(PulsarLogging.log::unableToAcknowledgeMessage).emitOn(arg_0 -> message.runOnMessageContext(arg_0));
    }

    @ApplicationScoped
    @Identifier(value="ack")
    public static class Factory
    implements PulsarAckHandler.Factory {
        @Override
        public PulsarMessageAck create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration config) {
            return new PulsarMessageAck(consumer);
        }
    }
}

