/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.kafka.client.runtime.devui;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.netty.buffer.ByteBufInputStream;
import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.quarkus.kafka.client.runtime.devui.KafkaTopicClient;
import io.quarkus.kafka.client.runtime.devui.model.Order;
import io.quarkus.kafka.client.runtime.devui.model.request.KafkaMessageCreateRequest;
import io.quarkus.kafka.client.runtime.devui.model.request.KafkaMessagesRequest;
import io.quarkus.kafka.client.runtime.devui.model.request.KafkaOffsetRequest;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaAclEntry;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaAclInfo;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaClusterInfo;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaConsumerGroup;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaConsumerGroupMember;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaConsumerGroupMemberPartitionAssignment;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaInfo;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaMessagePage;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaNode;
import io.quarkus.kafka.client.runtime.devui.model.response.KafkaTopic;
import io.quarkus.kafka.client.runtime.devui.util.ConsumerFactory;
import io.smallrye.common.annotation.Identifier;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Bytes;

@Singleton
public class KafkaUiUtils {
    private final KafkaAdminClient kafkaAdminClient;
    private final KafkaTopicClient kafkaTopicClient;
    private final ObjectMapper objectMapper;
    private final Map<String, Object> config;

    public KafkaUiUtils(KafkaAdminClient kafkaAdminClient, KafkaTopicClient kafkaTopicClient, @Identifier(value="default-kafka-broker") Map<String, Object> config) {
        this.kafkaAdminClient = kafkaAdminClient;
        this.kafkaTopicClient = kafkaTopicClient;
        this.config = config;
        this.objectMapper = ((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().disable(new SerializationFeature[]{SerializationFeature.FAIL_ON_EMPTY_BEANS})).disable(new DeserializationFeature[]{DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES})).build();
    }

    public KafkaInfo getKafkaInfo() throws ExecutionException, InterruptedException {
        KafkaClusterInfo clusterInfo = this.getClusterInfo();
        String broker = clusterInfo.getController().asFullNodeName();
        List<KafkaTopic> topics = this.getTopics();
        List<KafkaConsumerGroup> consumerGroups = this.getConsumerGroups();
        return new KafkaInfo(broker, clusterInfo, topics, consumerGroups);
    }

    public KafkaClusterInfo getClusterInfo() throws ExecutionException, InterruptedException {
        return this.clusterInfo(this.kafkaAdminClient.getCluster());
    }

    private KafkaNode kafkaNode(Node node) {
        return new KafkaNode(node.host(), node.port(), node.idString());
    }

    private KafkaClusterInfo clusterInfo(DescribeClusterResult dcr) throws InterruptedException, ExecutionException {
        KafkaNode controller = this.kafkaNode((Node)dcr.controller().get());
        ArrayList<KafkaNode> nodes = new ArrayList<KafkaNode>();
        for (Node node : (Collection)dcr.nodes().get()) {
            nodes.add(this.kafkaNode(node));
        }
        Set aclOperations = (Set)dcr.authorizedOperations().get();
        StringBuilder aclOperationsStr = new StringBuilder();
        if (aclOperations != null) {
            for (AclOperation operation : (Set)dcr.authorizedOperations().get()) {
                if (aclOperationsStr.length() == 0) {
                    aclOperationsStr.append(", ");
                }
                aclOperationsStr.append(operation.name());
            }
        } else {
            aclOperationsStr = new StringBuilder("NONE");
        }
        return new KafkaClusterInfo((String)dcr.clusterId().get(), controller, nodes, aclOperationsStr.toString());
    }

    public List<KafkaTopic> getTopics() throws InterruptedException, ExecutionException {
        ArrayList<KafkaTopic> res = new ArrayList<KafkaTopic>();
        for (TopicListing tl : this.kafkaAdminClient.getTopics()) {
            res.add(this.kafkaTopic(tl));
        }
        return res;
    }

    private KafkaTopic kafkaTopic(TopicListing tl) throws ExecutionException, InterruptedException {
        Collection<Integer> partitions = this.partitions(tl.name());
        return new KafkaTopic(tl.name(), tl.topicId().toString(), partitions.size(), tl.isInternal(), this.getTopicMessageCount(tl.name(), partitions));
    }

    public long getTopicMessageCount(String topicName, Collection<Integer> partitions) throws ExecutionException, InterruptedException {
        Map<Integer, Long> maxPartitionOffsetMap = this.kafkaTopicClient.getPagePartitionOffset(topicName, partitions, Order.NEW_FIRST);
        return maxPartitionOffsetMap.values().stream().reduce(Long::sum).orElse(0L);
    }

    public Collection<Integer> partitions(String topicName) throws ExecutionException, InterruptedException {
        return this.kafkaTopicClient.partitions(topicName);
    }

    public KafkaMessagePage getMessages(KafkaMessagesRequest request) throws ExecutionException, InterruptedException {
        return this.kafkaTopicClient.getTopicMessages(request.getTopicName(), request.getOrder(), request.getPartitionOffset(), request.getPageSize());
    }

    public void createMessage(KafkaMessageCreateRequest request) {
        this.kafkaTopicClient.createMessage(request);
    }

    public List<KafkaConsumerGroup> getConsumerGroups() throws InterruptedException, ExecutionException {
        ArrayList<KafkaConsumerGroup> res = new ArrayList<KafkaConsumerGroup>();
        for (ConsumerGroupDescription cgd : this.kafkaAdminClient.getConsumerGroups()) {
            Map metadata = (Map)this.kafkaAdminClient.listConsumerGroupOffsets(cgd.groupId()).partitionsToOffsetAndMetadata().get();
            Set<KafkaConsumerGroupMember> members = cgd.members().stream().map(member -> new KafkaConsumerGroupMember(member.consumerId(), member.clientId(), member.host(), this.getPartitionAssignments(metadata, (MemberDescription)member))).collect(Collectors.toSet());
            res.add(new KafkaConsumerGroup(cgd.groupId(), cgd.state().name(), cgd.coordinator().host(), cgd.coordinator().id(), cgd.partitionAssignor(), this.getTotalLag(members), members));
        }
        return res;
    }

    private long getTotalLag(Set<KafkaConsumerGroupMember> members) {
        return members.stream().map(KafkaConsumerGroupMember::getPartitions).flatMap(Collection::stream).map(KafkaConsumerGroupMemberPartitionAssignment::getLag).reduce(Long::sum).orElse(0L);
    }

    private Set<KafkaConsumerGroupMemberPartitionAssignment> getPartitionAssignments(Map<TopicPartition, OffsetAndMetadata> topicOffsetMap, MemberDescription member) {
        Set topicPartitions = member.assignment().topicPartitions();
        try (Consumer<Bytes, Bytes> consumer = ConsumerFactory.createConsumer(topicPartitions, this.config);){
            Map endOffsets = consumer.endOffsets((Collection)topicPartitions);
            Set<KafkaConsumerGroupMemberPartitionAssignment> set = topicPartitions.stream().map(tp -> {
                Long topicOffset = Optional.ofNullable((OffsetAndMetadata)topicOffsetMap.get(tp)).map(OffsetAndMetadata::offset).orElse(0L);
                return new KafkaConsumerGroupMemberPartitionAssignment(tp.partition(), tp.topic(), this.getLag(topicOffset, (Long)endOffsets.get(tp)));
            }).collect(Collectors.toSet());
            return set;
        }
    }

    private long getLag(long topicOffset, long endOffset) {
        return endOffset - topicOffset;
    }

    public Map<Integer, Long> getOffset(KafkaOffsetRequest request) throws ExecutionException, InterruptedException {
        return this.kafkaTopicClient.getPagePartitionOffset(request.getTopicName(), request.getRequestedPartitions(), request.getOrder());
    }

    public KafkaAclInfo getAclInfo() throws InterruptedException, ExecutionException {
        KafkaClusterInfo clusterInfo = this.clusterInfo(this.kafkaAdminClient.getCluster());
        ArrayList<KafkaAclEntry> entries = new ArrayList<KafkaAclEntry>();
        try {
            Collection<AclBinding> acls = this.kafkaAdminClient.getAclInfo();
            for (AclBinding acl : acls) {
                KafkaAclEntry entry = new KafkaAclEntry(acl.entry().operation().name(), acl.entry().principal(), acl.entry().permissionType().name(), acl.pattern().toString());
                entries.add(entry);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return new KafkaAclInfo(clusterInfo.getId(), clusterInfo.getController().asFullNodeName(), clusterInfo.getAclOperations(), entries);
    }

    public String toJson(Object o) {
        String res;
        try {
            res = this.objectMapper.writeValueAsString(o);
        }
        catch (JsonProcessingException ex) {
            res = "";
        }
        return res;
    }

    public JsonObject fromJson(Buffer buffer) {
        return new JsonObject(this.fromJson(buffer, Map.class));
    }

    public <T> T fromJson(Buffer buffer, Class<T> type) {
        try {
            JsonParser parser = this.objectMapper.createParser((InputStream)new ByteBufInputStream(buffer.getByteBuf()));
            return (T)this.objectMapper.readValue(parser, type);
        }
        catch (IOException e) {
            return null;
        }
    }
}

