/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.job.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.job.JobHistoryEvent;
import br.pucrio.tecgraf.soma.job.SomaJobHistoryConsumer;
import br.pucrio.tecgraf.soma.job.application.appservice.JobHistoryEventService;
import br.pucrio.tecgraf.soma.job.application.appservice.LostEventAppService;
import br.pucrio.tecgraf.soma.job.domain.model.LostEvent;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.persistence.RollbackException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hibernate.exception.JDBCConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class JobHistoryEventReader {
    private final Logger logger = LoggerFactory.getLogger(SomaJobHistoryConsumer.class);
    private JobHistoryEventService jobHistoryEventService;
    private LostEventAppService lostEventAppService;
    private RecordReader recordReader;

    @Autowired
    public JobHistoryEventReader(JobHistoryEventService jobHistoryEventService, LostEventAppService lostEventAppService) {
        this.jobHistoryEventService = jobHistoryEventService;
        this.lostEventAppService = lostEventAppService;
        this.recordReader = new RecordReader();
    }

    public void run(String kafkaServer, String schemaRegistryUrl, String topic, String group) throws IOException, InterruptedException {
        KafkaConsumer<String, JobHistoryEvent> consumer = this.buildKafkaConsumer(kafkaServer, schemaRegistryUrl, topic, group);
        consumer.subscribe(Collections.singletonList(topic));
        while (!Thread.currentThread().isInterrupted()) {
            this.readRecords(consumer);
        }
    }

    protected void readRecords(KafkaConsumer<String, JobHistoryEvent> consumer) throws InterruptedException, IOException {
        ConsumerRecords<String, JobHistoryEvent> records = consumer.poll(Duration.ofMillis(100L));
        this.logger.debug("Got {} records from Kafka", (Object)records.count());
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, JobHistoryEvent>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, JobHistoryEvent> record : partitionRecords) {
                this.recordReader.readRecord(consumer, partition, partitionRecords, record);
            }
        }
    }

    protected KafkaConsumer<String, JobHistoryEvent> buildKafkaConsumer(String kafkaServer, String schemaRegistryUrl, String topic, String group) {
        return new KafkaConsumer<String, JobHistoryEvent>(this.buildProperties(kafkaServer, schemaRegistryUrl, topic, group));
    }

    private Properties buildProperties(String kafkaServer, String schemaRegistryUrl, String topic, String group) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaServer);
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", schemaRegistryUrl);
        properties.setProperty("specific.avro.reader", "true");
        properties.setProperty("group.id", group);
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

    private class RecordReader {
        private RecordReader() {
        }

        public void readRecord(KafkaConsumer<String, JobHistoryEvent> consumer, TopicPartition partition, List<ConsumerRecord<String, JobHistoryEvent>> partitionRecords, ConsumerRecord<String, JobHistoryEvent> record) throws InterruptedException, IOException {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    int partitionId = partition.partition();
                    JobHistoryEventReader.this.logger.info("Job {} at Kafka topic {}:{}:{}", record.key(), record.topic(), partitionId, lastOffset);
                    JobHistoryEventReader.this.jobHistoryEventService.process(record.value(), partitionId, lastOffset);
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1L)));
                    break;
                }
                catch (JDBCConnectionException ce) {
                    long waitingTime = 300000L;
                    JobHistoryEventReader.this.logger.warn("No DB connection. Retrying in {} seconds.", (Object)waitingTime);
                    Thread.sleep(waitingTime);
                }
                catch (IOException | RollbackException ex) {
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    int partitionId = partition.partition();
                    String eventId = this.getEventId(record.value().getEvent());
                    LostEvent lostEvent = new LostEvent(eventId, partitionId, lastOffset, record.value().toString());
                    JobHistoryEventReader.this.lostEventAppService.saveLostEvent(lostEvent, partitionId, lastOffset);
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1L)));
                    JobHistoryEventReader.this.logger.error("Error trying to process a kafka message. A lost event with id {} was persisted in the database for further inspection.", (Object)lostEvent.getId(), (Object)ex);
                    break;
                }
                catch (Exception e) {
                    JobHistoryEventReader.this.logger.error("Unrecoverable error.", e);
                    break;
                }
            }
        }

        private String getEventId(Object event) {
            Method method;
            try {
                method = event.getClass().getMethod("getEventId", new Class[0]);
            }
            catch (NoSuchMethodException | SecurityException e) {
                return null;
            }
            try {
                return (String)method.invoke(event, new Object[0]);
            }
            catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                return null;
            }
        }
    }
}

