package br.pucrio.tecgraf.soma.websocketnotifier.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierStatus;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Observable;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.glassfish.hk2.utilities.BuilderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/websocketnotifier/infrastructure/persistence/message/KafkaEventReader.class */
public class KafkaEventReader extends Observable {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) JsonObject.class);
    private final String KAFKA_SERVER;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final List<String> KAFKA_TOPICS;
    private final List<String> KAFKA_GROUP;

    @Autowired
    public KafkaEventReader(ServiceConfiguration serviceConfiguration) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.KAFKA_TOPICS = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName()).split(BuilderHelper.TOKEN_SEPARATOR));
        this.KAFKA_GROUP = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName()).split(BuilderHelper.TOKEN_SEPARATOR));
    }

    @EventListener({ApplicationReadyEvent.class})
    public void run() {
        if (this.KAFKA_TOPICS.size() != this.KAFKA_GROUP.size()) {
            String formatted = "List arguments value of --%s and --%s must have the same length".formatted(Constants.Config.KAFKA_TOPIC.option.getLongName(), Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName());
            this.logger.error(formatted);
            throw new IllegalArgumentException(formatted);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.KAFKA_TOPICS.size(); i++) {
            arrayList.add(buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, this.KAFKA_TOPICS.get(i), this.KAFKA_GROUP.get(i)));
        }
        while (!Thread.currentThread().isInterrupted()) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                readRecords((Consumer) it.next());
            }
        }
    }

    protected Properties buildProperties(String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, str2);
        properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, str3);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return properties;
    }

    protected void readRecords(Consumer<String, Object> consumer) {
        ConsumerRecords<String, Object> poll = consumer.poll(100L);
        if (!poll.isEmpty()) {
            this.logger.debug("readRecords Got {} records from Kafka", Integer.valueOf(poll.count()));
        }
        Iterator<TopicPartition> it = poll.partitions().iterator();
        while (it.hasNext()) {
            Iterator<ConsumerRecord<String, Object>> it2 = poll.records(it.next()).iterator();
            while (it2.hasNext()) {
                readRecord(consumer, it2.next(), WebsocketNotifierStatus.LIVE.name());
            }
        }
    }

    public List<JsonObject> readRecordsFromOffset(Consumer<String, Object> consumer, WebsocketNotifierInfo websocketNotifierInfo, long j) {
        ArrayList arrayList = new ArrayList();
        if (websocketNotifierInfo.getOffset().intValue() == 0) {
            return arrayList;
        }
        consumer.poll(100L);
        for (TopicPartition topicPartition : consumer.assignment()) {
            consumer.seek(topicPartition, j);
            ConsumerRecords<String, Object> poll = consumer.poll(1000L);
            if (!poll.isEmpty()) {
                this.logger.info("readRecordsFromOffset Got {} records from topic {} Kafka", Integer.valueOf(poll.count()), topicPartition.topic());
            }
            Iterator<TopicPartition> it = poll.partitions().iterator();
            while (it.hasNext()) {
                for (ConsumerRecord<String, Object> consumerRecord : poll.records(it.next())) {
                    arrayList.add(parseKafkaRecord(consumerRecord, getEventStatus(consumerRecord, websocketNotifierInfo)));
                }
            }
        }
        return arrayList;
    }

    private String getEventStatus(ConsumerRecord<String, Object> consumerRecord, WebsocketNotifierInfo websocketNotifierInfo) {
        return consumerRecord.offset() <= ((long) websocketNotifierInfo.getOffset().intValue()) ? WebsocketNotifierStatus.OLD.name() : WebsocketNotifierStatus.LOST.name();
    }

    private JsonObject parseKafkaRecord(ConsumerRecord<String, Object> consumerRecord, String str) {
        JsonObject asJsonObject = new JsonParser().parse(consumerRecord.value().toString()).getAsJsonObject();
        asJsonObject.addProperty("topic_name", consumerRecord.topic());
        asJsonObject.addProperty("offset", Long.valueOf(consumerRecord.offset()));
        asJsonObject.addProperty("eventStatus", str);
        return asJsonObject;
    }

    public void readRecord(Consumer<String, Object> consumer, ConsumerRecord<String, Object> consumerRecord, String str) {
        try {
            try {
                try {
                    setChanged();
                    JsonObject parseKafkaRecord = parseKafkaRecord(consumerRecord, str);
                    this.logger.debug("readRecord got Job {}", parseKafkaRecord);
                    notifyObservers(parseKafkaRecord);
                    try {
                        consumer.commitSync();
                    } catch (Exception e) {
                        this.logger.error("Unrecoverable error during commitSync.", (Throwable) e);
                    }
                } catch (MessagingException e2) {
                    this.logger.error("Error trying send message to job-info", (Throwable) e2);
                    try {
                        consumer.commitSync();
                    } catch (Exception e3) {
                        this.logger.error("Unrecoverable error during commitSync.", (Throwable) e3);
                    }
                }
            } catch (Exception e4) {
                this.logger.error("Unrecoverable error.", (Throwable) e4);
                try {
                    consumer.commitSync();
                } catch (Exception e5) {
                    this.logger.error("Unrecoverable error during commitSync.", (Throwable) e5);
                }
            }
        } catch (Throwable th) {
            try {
                consumer.commitSync();
            } catch (Exception e6) {
                this.logger.error("Unrecoverable error during commitSync.", (Throwable) e6);
            }
            throw th;
        }
    }

    public Consumer<String, Object> buildKafkaConsumer(String str, String str2, String str3, String str4) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(buildProperties(str, str2, str4));
        kafkaConsumer.subscribe(Collections.singletonList(str3));
        return kafkaConsumer;
    }
}
