package br.pucrio.tecgraf.soma.websocketnotifier.application.service;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.infrastructure.persistence.message.KafkaEventReader;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierStatus;
import ch.qos.logback.classic.ClassicConstants;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.glassfish.hk2.utilities.BuilderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.propertyeditors.CustomBooleanEditor;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/websocketnotifier/application/service/ProducerHandler.class */
public class ProducerHandler {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) ProducerHandler.class);
    private Producer<String, String> producer;
    private final String KAFKA_SERVER;
    private final String KAFKA_NOTIFICATION_TOPIC;
    private final List<String> KAFKA_TOPICS;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final List<Integer> LAST_EVENTS_MINIMUM_NUMBER;
    private KtableService ktableService;
    private KafkaEventReader eventReader;

    @Autowired
    public ProducerHandler(ServiceConfiguration serviceConfiguration, KtableService ktableService, KafkaEventReader kafkaEventReader) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_NOTIFICATION_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_NOTIFICATION_TOPIC.option.getLongName());
        this.KAFKA_TOPICS = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName()).split(BuilderHelper.TOKEN_SEPARATOR));
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.LAST_EVENTS_MINIMUM_NUMBER = (List) Arrays.stream(serviceConfiguration.getValue(Constants.Config.LAST_EVENTS_MINIMUM_NUMBER.option.getLongName()).split(BuilderHelper.TOKEN_SEPARATOR)).map(Integer::parseInt).collect(Collectors.toList());
        this.ktableService = ktableService;
        this.eventReader = kafkaEventReader;
        if (this.KAFKA_TOPICS.size() != this.LAST_EVENTS_MINIMUM_NUMBER.size()) {
            String format = String.format("List arguments value of --%s and --%s must have the same length", Constants.Config.KAFKA_TOPIC.option.getLongName(), Constants.Config.LAST_EVENTS_MINIMUM_NUMBER.option.getLongName());
            this.logger.error(format);
            throw new IllegalArgumentException(format);
        }
    }

    public WebsocketNotifierInfo getLastOffset(WebsocketNotifierInfo websocketNotifierInfo) {
        return this.ktableService.getLastUserOffset(websocketNotifierInfo);
    }

    protected Producer<String, String> getProducer() {
        if (this.producer == null) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", this.KAFKA_SERVER);
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.ACKS_CONFIG, CustomBooleanEditor.VALUE_1);
            properties.setProperty("retries", "3");
            properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, CustomBooleanEditor.VALUE_1);
            this.producer = new KafkaProducer(properties);
        }
        return this.producer;
    }

    public void publishKafkaOffset(WebsocketNotifierInfo websocketNotifierInfo, WebsocketNotifierStatus websocketNotifierStatus) {
        if (websocketNotifierStatus.equals(WebsocketNotifierStatus.OLD)) {
            return;
        }
        String str = null;
        JsonObject jsonObject = null;
        try {
            str = new Gson().toJson(websocketNotifierInfo);
            jsonObject = (JsonObject) new JsonParser().parse(str);
        } catch (Error e) {
            this.logger.error("Error during push event: " + websocketNotifierInfo.toString(), (Throwable) e);
        }
        if (str == null || str.isEmpty()) {
            return;
        }
        getProducer().send(new ProducerRecord<>(this.KAFKA_NOTIFICATION_TOPIC, jsonObject.get(ClassicConstants.USER_MDC_KEY).getAsString(), jsonObject.toString()));
        this.logger.debug("Published topic {} user {}", this.KAFKA_NOTIFICATION_TOPIC, jsonObject);
    }

    public List<WebsocketNotifierInfo> buildWebsocketInfos(String str, String str2, List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (String str3 : list) {
            WebsocketNotifierInfo websocketNotifierInfo = new WebsocketNotifierInfo();
            websocketNotifierInfo.setTopicName(str3);
            websocketNotifierInfo.setUser(str);
            websocketNotifierInfo.setApplication(str2);
            arrayList.add(getLastOffset(websocketNotifierInfo));
        }
        return arrayList;
    }

    private int getLastMinimumNumberParam(CharSequence charSequence) {
        int i = 0;
        int i2 = 0;
        while (true) {
            if (i2 >= this.KAFKA_TOPICS.size()) {
                break;
            }
            if (this.KAFKA_TOPICS.get(i2).equals(charSequence)) {
                i = this.LAST_EVENTS_MINIMUM_NUMBER.get(i2).intValue();
                break;
            }
            i2++;
        }
        if (i < 0) {
            i = 0;
        }
        return i;
    }

    private int getLastMinimumNumber(String str, int i) {
        int lastMinimumNumberParam = i - getLastMinimumNumberParam(str);
        if (lastMinimumNumberParam > 0) {
            return lastMinimumNumberParam + 1;
        }
        return 0;
    }

    private void publishUserFirstOffset(Consumer<String, Object> consumer, WebsocketNotifierInfo websocketNotifierInfo) {
        long j = 1;
        for (TopicPartition topicPartition : consumer.assignment()) {
            if (consumer.position(topicPartition) > j) {
                j = consumer.position(topicPartition);
            }
        }
        websocketNotifierInfo.setOffset(Integer.valueOf((int) j));
        publishKafkaOffset(websocketNotifierInfo, WebsocketNotifierStatus.LIVE);
    }

    public List<JsonObject> getLostMessagesToUser(List<WebsocketNotifierInfo> list) {
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            for (WebsocketNotifierInfo websocketNotifierInfo : list) {
                Consumer<String, Object> buildKafkaConsumer = this.eventReader.buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, (String) websocketNotifierInfo.getTopicName(), ((Object) websocketNotifierInfo.getTopicName()) + "-lostevents-" + System.currentTimeMillis());
                List<JsonObject> readRecordsFromOffset = this.eventReader.readRecordsFromOffset(buildKafkaConsumer, websocketNotifierInfo, getLastMinimumNumber(websocketNotifierInfo.getTopicName().toString(), websocketNotifierInfo.getOffset().intValue()));
                if (readRecordsFromOffset.isEmpty()) {
                    publishUserFirstOffset(buildKafkaConsumer, websocketNotifierInfo);
                } else {
                    arrayList.addAll(readRecordsFromOffset);
                }
                buildKafkaConsumer.close();
            }
        }
        return arrayList;
    }
}
