/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.deployment;

import com.github.dockerjava.api.command.InspectContainerResponse;
import io.quarkus.builder.item.BuildItem;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CuratedApplicationShutdownBuildItem;
import io.quarkus.deployment.builditem.DevServicesConfigResultBuildItem;
import io.quarkus.deployment.builditem.DevServicesSharedNetworkBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.console.ConsoleInstalledBuildItem;
import io.quarkus.deployment.console.StartupLogCompressor;
import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig;
import io.quarkus.deployment.logging.LoggingSetupBuildItem;
import io.quarkus.devservices.common.ConfigureUtil;
import io.quarkus.devservices.common.ContainerLocator;
import io.quarkus.kafka.client.deployment.DevServicesKafkaBrokerBuildItem;
import io.quarkus.kafka.client.deployment.KafkaBuildTimeConfig;
import io.quarkus.kafka.client.deployment.KafkaDevServicesBuildTimeConfig;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

public class DevServicesKafkaProcessor {
    private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class);
    private static final int KAFKA_PORT = 9092;
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-kafka";
    private static final ContainerLocator kafkaContainerLocator = new ContainerLocator("quarkus-dev-service-kafka", 9092);
    static volatile Closeable closeable;
    static volatile KafkaDevServiceCfg cfg;
    static volatile boolean first;
    private final IsDockerWorking isDockerWorking = new IsDockerWorking(true);

    @BuildStep(onlyIfNot={IsNormal.class}, onlyIf={GlobalDevServicesConfig.Enabled.class})
    public DevServicesKafkaBrokerBuildItem startKafkaDevService(LaunchModeBuildItem launchMode, KafkaBuildTimeConfig kafkaClientBuildTimeConfig, List<DevServicesSharedNetworkBuildItem> devServicesSharedNetworkBuildItem, BuildProducer<DevServicesConfigResultBuildItem> devServicePropertiesProducer, Optional<ConsoleInstalledBuildItem> consoleInstalledBuildItem, CuratedApplicationShutdownBuildItem closeBuildItem, LoggingSetupBuildItem loggingSetupBuildItem, GlobalDevServicesConfig devServicesConfig) {
        DevServicesKafkaBrokerBuildItem bootstrapServers;
        KafkaBroker kafkaBroker;
        KafkaDevServiceCfg configuration = this.getConfiguration(kafkaClientBuildTimeConfig);
        if (closeable != null) {
            boolean shouldShutdownTheBroker;
            boolean bl = shouldShutdownTheBroker = !configuration.equals(cfg);
            if (!shouldShutdownTheBroker) {
                return null;
            }
            this.shutdownBroker();
            cfg = null;
        }
        StartupLogCompressor compressor = new StartupLogCompressor((launchMode.isTest() ? "(test) " : "") + "Kafka Dev Services Starting:", consoleInstalledBuildItem, loggingSetupBuildItem);
        try {
            kafkaBroker = this.startKafka(configuration, launchMode, !devServicesSharedNetworkBuildItem.isEmpty(), devServicesConfig.timeout);
            bootstrapServers = null;
            if (kafkaBroker != null) {
                closeable = kafkaBroker.getCloseable();
                devServicePropertiesProducer.produce((BuildItem)new DevServicesConfigResultBuildItem(KAFKA_BOOTSTRAP_SERVERS, kafkaBroker.getBootstrapServers()));
                bootstrapServers = new DevServicesKafkaBrokerBuildItem(kafkaBroker.getBootstrapServers());
            }
            compressor.close();
        }
        catch (Throwable t) {
            compressor.closeAndDumpCaptured();
            throw new RuntimeException(t);
        }
        if (first) {
            first = false;
            Runnable closeTask = () -> {
                if (closeable != null) {
                    this.shutdownBroker();
                }
                first = true;
                closeable = null;
                cfg = null;
            };
            closeBuildItem.addCloseTask(closeTask, true);
        }
        cfg = configuration;
        if (bootstrapServers != null) {
            if (kafkaBroker.isOwner()) {
                log.infof("Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=%s", (Object)bootstrapServers.getBootstrapServers());
            }
            this.createTopicPartitions(bootstrapServers.getBootstrapServers(), configuration);
        }
        return bootstrapServers;
    }

    public void createTopicPartitions(String bootstrapServers, KafkaDevServiceCfg configuration) {
        Map<String, Integer> topicPartitions = configuration.topicPartitions;
        if (topicPartitions.isEmpty()) {
            return;
        }
        Map props = Map.ofEntries(Map.entry("bootstrap.servers", bootstrapServers), Map.entry("client.id", "kafka-devservices"));
        try (AdminClient adminClient = KafkaAdminClient.create(props);){
            long adminClientTimeout = configuration.topicPartitionsTimeout.toMillis();
            Set currentTopics = (Set)adminClient.listTopics().names().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            Map partitions = (Map)adminClient.describeTopics((Collection)currentTopics).all().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            List<NewTopic> newTopics = topicPartitions.entrySet().stream().filter(e -> {
                TopicDescription topicDescription = (TopicDescription)partitions.get(e.getKey());
                if (topicDescription == null) {
                    return true;
                }
                log.warnf("Topic '%s' already exists with %s partition(s)", e.getKey(), (Object)topicDescription.partitions().size());
                return false;
            }).map(e -> new NewTopic((String)e.getKey(), ((Integer)e.getValue()).intValue(), 1)).collect(Collectors.toList());
            CreateTopicsResult topics = adminClient.createTopics(newTopics);
            topics.all().get(adminClientTimeout, TimeUnit.MILLISECONDS);
            HashMap newTopicPartitions = new HashMap();
            partitions.forEach((key, value) -> newTopicPartitions.put(key, value.partitions().size()));
            newTopics.forEach(t -> newTopicPartitions.put(t.name(), t.numPartitions()));
            log.infof("Dev Services for Kafka broker contains following topics with partitions: %s", newTopicPartitions);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.errorf((Throwable)e2, "Failed to create topics: %s", topicPartitions);
        }
    }

    private void shutdownBroker() {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (Throwable e) {
                log.error((Object)"Failed to stop the Kafka broker", e);
            }
            finally {
                closeable = null;
            }
        }
    }

    private KafkaBroker startKafka(KafkaDevServiceCfg config, LaunchModeBuildItem launchMode, boolean useSharedNetwork, Optional<Duration> timeout) {
        if (!config.devServicesEnabled) {
            log.debug((Object)"Not starting dev services for Kafka, as it has been disabled in the config.");
            return null;
        }
        if (ConfigUtils.isPropertyPresent((String)KAFKA_BOOTSTRAP_SERVERS)) {
            log.debug((Object)"Not starting dev services for Kafka, the kafka.bootstrap.servers is configured.");
            return null;
        }
        if (!this.hasKafkaChannelWithoutBootstrapServers()) {
            log.debug((Object)"Not starting dev services for Kafka, all the channels are configured.");
            return null;
        }
        if (!this.isDockerWorking.getAsBoolean()) {
            log.warn((Object)"Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers).");
            return null;
        }
        Optional maybeContainerAddress = kafkaContainerLocator.locateContainer(config.serviceName, config.shared, launchMode.getLaunchMode());
        Supplier<KafkaBroker> defaultKafkaBrokerSupplier = () -> {
            RedPandaKafkaContainer container = new RedPandaKafkaContainer(DockerImageName.parse((String)config.imageName), config.fixedExposedPort, launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, useSharedNetwork);
            timeout.ifPresent(arg_0 -> ((RedPandaKafkaContainer)container).withStartupTimeout(arg_0));
            container.start();
            return new KafkaBroker(container.getBootstrapServers(), () -> ((RedPandaKafkaContainer)container).close());
        };
        return maybeContainerAddress.map(containerAddress -> new KafkaBroker(containerAddress.getUrl(), null)).orElseGet(defaultKafkaBrokerSupplier);
    }

    private boolean hasKafkaChannelWithoutBootstrapServers() {
        Config config = ConfigProvider.getConfig();
        for (String name : config.getPropertyNames()) {
            boolean isIncoming = name.startsWith("mp.messaging.incoming.");
            boolean isOutgoing = name.startsWith("mp.messaging.outgoing.");
            boolean isConnector = name.endsWith(".connector");
            boolean isKafka = isConnector && "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored"));
            boolean isConfigured = false;
            if ((isIncoming || isOutgoing) && isKafka) {
                isConfigured = ConfigUtils.isPropertyPresent((String)name.replace(".connector", ".bootstrap.servers"));
            }
            if (isConfigured) continue;
            return true;
        }
        return false;
    }

    private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig cfg) {
        KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
        return new KafkaDevServiceCfg(devServicesConfig);
    }

    static {
        first = true;
    }

    private static final class RedPandaKafkaContainer
    extends GenericContainer<RedPandaKafkaContainer> {
        private final Integer fixedExposedPort;
        private final boolean useSharedNetwork;
        private String hostName = null;
        private static final String STARTER_SCRIPT = "/var/lib/redpanda/redpanda.sh";

        private RedPandaKafkaContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName, boolean useSharedNetwork) {
            super(dockerImageName);
            this.fixedExposedPort = fixedExposedPort;
            this.useSharedNetwork = useSharedNetwork;
            if (serviceName != null) {
                this.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName);
            }
            if (!dockerImageName.getRepository().equals("vectorized/redpanda")) {
                throw new IllegalArgumentException("Only vectorized/redpanda images are supported");
            }
            this.withCreateContainerCmdModifier(cmd -> cmd.withEntrypoint(new String[]{"sh"}));
            this.withCommand(new String[]{"-c", "while [ ! -f /var/lib/redpanda/redpanda.sh ]; do sleep 0.1; done; /var/lib/redpanda/redpanda.sh"});
            this.waitingFor((WaitStrategy)Wait.forLogMessage((String)".*Started Kafka API server.*", (int)1));
        }

        protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
            super.containerIsStarting(containerInfo, reused);
            Object command = "#!/bin/bash\n";
            command = (String)command + "/usr/bin/rpk redpanda start --check=false --node-id 0 --smp 1 ";
            command = (String)command + "--memory 1G --overprovisioned --reserve-memory 0M ";
            command = (String)command + String.format("--kafka-addr %s ", this.getKafkaAddresses());
            command = (String)command + String.format("--advertise-kafka-addr %s ", this.getKafkaAdvertisedAddresses());
            this.copyFileToContainer(Transferable.of((byte[])((String)command).getBytes(StandardCharsets.UTF_8), (int)511), STARTER_SCRIPT);
        }

        private String getKafkaAddresses() {
            ArrayList<String> addresses = new ArrayList<String>();
            if (this.useSharedNetwork) {
                addresses.add("PLAINTEXT://0.0.0.0:29092");
            }
            addresses.add("OUTSIDE://0.0.0.0:9092");
            return String.join((CharSequence)",", addresses);
        }

        private String getKafkaAdvertisedAddresses() {
            ArrayList<String> addresses = new ArrayList<String>();
            if (this.useSharedNetwork) {
                addresses.add(String.format("PLAINTEXT://%s:29092", this.hostName));
            }
            addresses.add(String.format("OUTSIDE://%s:%d", this.getHost(), this.getMappedPort(9092)));
            return String.join((CharSequence)",", addresses);
        }

        protected void configure() {
            super.configure();
            this.addExposedPort(9092);
            this.hostName = ConfigureUtil.configureSharedNetwork((GenericContainer)this, (String)"kafka");
            if (this.fixedExposedPort != null) {
                this.addFixedExposedPort(this.fixedExposedPort, 9092);
            }
        }

        public String getBootstrapServers() {
            return this.getKafkaAdvertisedAddresses();
        }
    }

    private static final class KafkaDevServiceCfg {
        private final boolean devServicesEnabled;
        private final String imageName;
        private final Integer fixedExposedPort;
        private final boolean shared;
        private final String serviceName;
        private final Map<String, Integer> topicPartitions;
        private final Duration topicPartitionsTimeout;

        public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
            this.devServicesEnabled = config.enabled.orElse(true);
            this.imageName = config.imageName;
            this.fixedExposedPort = config.port.orElse(0);
            this.shared = config.shared;
            this.serviceName = config.serviceName;
            this.topicPartitions = config.topicPartitions;
            this.topicPartitionsTimeout = config.topicPartitionsTimeout;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaDevServiceCfg that = (KafkaDevServiceCfg)o;
            return this.devServicesEnabled == that.devServicesEnabled && Objects.equals(this.imageName, that.imageName) && Objects.equals(this.fixedExposedPort, that.fixedExposedPort);
        }

        public int hashCode() {
            return Objects.hash(this.devServicesEnabled, this.imageName, this.fixedExposedPort);
        }
    }

    private static class KafkaBroker {
        private final String url;
        private final Closeable closeable;

        public KafkaBroker(String url, Closeable closeable) {
            this.url = url;
            this.closeable = closeable;
        }

        public boolean isOwner() {
            return this.closeable != null;
        }

        public String getBootstrapServers() {
            return this.url;
        }

        public Closeable getCloseable() {
            return this.closeable;
        }
    }
}

