package br.pucrio.tecgraf.soma.logsmonitor.flow;

import br.pucrio.tecgraf.soma.logsmonitor.manager.WebSocketSessionManager;
import br.pucrio.tecgraf.soma.logsmonitor.model.Notification;
import br.pucrio.tecgraf.soma.logsmonitor.model.Topic;
import br.pucrio.tecgraf.soma.logsmonitor.model.mapper.TopicEventMapper;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorEvent;
import br.pucrio.tecgraf.soma.logsmonitor.service.TopicService;
import br.pucrio.tecgraf.soma.logsmonitor.service.TopicServiceFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Flow;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

@Scope("prototype")
@Component
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/logsmonitor/flow/TopicSubscriber.class */
public class TopicSubscriber implements Flow.Subscriber<ResourceMonitorEvent> {
    private static final Log logger = LogFactory.getLog((Class<?>) TopicSubscriber.class);
    private final String sessionId;
    private final String subscriptionId;
    private final Topic topic;
    private Long currentSeqnum;
    private final TopicEventMapper mapper;

    @Autowired
    private TopicServiceFactory serviceFactory;

    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    WebSocketSessionManager webSocketSessionManager;
    private Flow.Subscription subscription;
    private Boolean completed;

    public TopicSubscriber(String str, String str2, Topic topic, TopicEventMapper topicEventMapper, Long l) {
        logger.debug("Topic [%s]: new TopicSubscriber with session [%s] and subscriptionId [%s]".formatted(topic, str, str2));
        this.sessionId = str;
        this.subscriptionId = str2;
        this.topic = topic;
        this.currentSeqnum = l;
        this.mapper = topicEventMapper;
        this.completed = false;
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public String getSubscriptionId() {
        return this.subscriptionId;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onSubscribe(Flow.Subscription subscription) {
        logger.debug("Session [%s]: onSubscribe activated for topic [%s] with subscriptionId [%s]".formatted(this.sessionId, this.topic, this.subscriptionId));
        this.subscription = subscription;
        if (this.completed.booleanValue()) {
            subscription.cancel();
        } else {
            subscription.request(1L);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onNext(ResourceMonitorEvent resourceMonitorEvent) {
        logger.debug("Session [%s]: onNext activated for topic [%s] with subscriptionId [%s]".formatted(this.sessionId, this.topic, this.subscriptionId));
        WebSocketSession session = this.webSocketSessionManager.getSession(this.sessionId);
        if (session == null) {
            this.subscription.cancel();
            return;
        }
        try {
            if (!session.isOpen()) {
                this.subscription.cancel();
                return;
            }
            TopicService serviceByTopicType = this.serviceFactory.getServiceByTopicType(this.topic.getTopicType());
            ArrayList arrayList = new ArrayList();
            if (this.currentSeqnum.longValue() < resourceMonitorEvent.getStartSeqnum().longValue()) {
                logger.debug("The subscription current seqnum [%d] is lesser than the event start seqnum [%d]".formatted(this.currentSeqnum, resourceMonitorEvent.getStartSeqnum()));
                arrayList.addAll(serviceByTopicType.getEvents(this.topic, this.currentSeqnum, resourceMonitorEvent.getStartSeqnum()));
                arrayList.add(resourceMonitorEvent);
            } else if (this.currentSeqnum.longValue() > resourceMonitorEvent.getStartSeqnum().longValue() && this.currentSeqnum.longValue() <= resourceMonitorEvent.getEndSeqnum().longValue()) {
                logger.debug("The subscription current seqnum [%d] is between the events start and end seqnum ]%d, %d]".formatted(this.currentSeqnum, resourceMonitorEvent.getStartSeqnum(), resourceMonitorEvent.getEndSeqnum()));
                arrayList.addAll(serviceByTopicType.getEvents(this.topic, this.currentSeqnum, resourceMonitorEvent.getEndSeqnum()));
            } else if (this.currentSeqnum.longValue() > resourceMonitorEvent.getEndSeqnum().longValue()) {
                logger.debug("The subscription current seqnum [%d] is greater than the event end seqnum [%d]".formatted(this.currentSeqnum, resourceMonitorEvent.getEndSeqnum()));
            } else {
                logger.debug("The subscription current seqnum [%d] is equal to the event start seqnum [%d]".formatted(this.currentSeqnum, resourceMonitorEvent.getStartSeqnum()));
                arrayList.add(resourceMonitorEvent);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                session.sendMessage(new TextMessage(this.objectMapper.writeValueAsBytes(new Notification(this.subscriptionId, this.topic.getTopicType(), this.mapper.map((ResourceMonitorEvent) it.next(), this.topic)))));
                this.currentSeqnum = resourceMonitorEvent.getEndSeqnum();
            }
            this.subscription.request(1L);
        } catch (IOException e) {
            logger.error("There was an IOException when trying to sendMessage", e);
            this.subscription.cancel();
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onError(Throwable th) {
        logger.error("Session [%s]: onError activated for topic [%s] with subscriptionId [%s]".formatted(this.sessionId, this.topic, this.subscriptionId), th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public synchronized void onComplete() {
        logger.debug("Session [%s]: onComplete activated for topic [%s] with subscriptionId [%s]".formatted(this.sessionId, this.topic, this.subscriptionId));
        this.completed = true;
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        TopicSubscriber topicSubscriber = (TopicSubscriber) obj;
        return this.sessionId.equals(topicSubscriber.sessionId) && this.topic.equals(topicSubscriber.topic);
    }

    public int hashCode() {
        return Objects.hash(this.sessionId, this.topic.getUUID());
    }
}
