/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.logsmonitor.flow;

import br.pucrio.tecgraf.soma.logsmonitor.manager.WebSocketSessionManager;
import br.pucrio.tecgraf.soma.logsmonitor.model.Notification;
import br.pucrio.tecgraf.soma.logsmonitor.model.Topic;
import br.pucrio.tecgraf.soma.logsmonitor.model.mapper.TopicEventMapper;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorEvent;
import br.pucrio.tecgraf.soma.logsmonitor.service.TopicService;
import br.pucrio.tecgraf.soma.logsmonitor.service.TopicServiceFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

@Component
@Scope(value="prototype")
public class TopicSubscriber
implements Flow.Subscriber<ResourceMonitorEvent> {
    private static final Log logger = LogFactory.getLog(TopicSubscriber.class);
    private final String sessionId;
    private final String subscriptionId;
    private final Topic topic;
    private Long currentSeqnum;
    private final TopicEventMapper mapper;
    @Autowired
    private TopicServiceFactory serviceFactory;
    @Autowired
    ObjectMapper objectMapper;
    @Autowired
    WebSocketSessionManager webSocketSessionManager;
    private Flow.Subscription subscription;
    private Boolean completed;

    public TopicSubscriber(String sessionId, String subscriptionId, Topic topic, TopicEventMapper mapper, Long initialSeqnum) {
        logger.debug((Object)String.format("Topic [%s]: new TopicSubscriber with session [%s] and subscriptionId [%s]", topic, sessionId, subscriptionId));
        this.sessionId = sessionId;
        this.subscriptionId = subscriptionId;
        this.topic = topic;
        this.currentSeqnum = initialSeqnum;
        this.mapper = mapper;
        this.completed = false;
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public String getSubscriptionId() {
        return this.subscriptionId;
    }

    @Override
    public synchronized void onSubscribe(Flow.Subscription subscription) {
        logger.debug((Object)String.format("Session [%s]: onSubscribe activated for topic [%s] with subscriptionId [%s]", this.sessionId, this.topic, this.subscriptionId));
        this.subscription = subscription;
        if (!this.completed.booleanValue()) {
            subscription.request(1L);
        } else {
            subscription.cancel();
        }
    }

    @Override
    public synchronized void onNext(ResourceMonitorEvent event) {
        block11: {
            logger.debug((Object)String.format("Session [%s]: onNext activated for topic [%s] with subscriptionId [%s]", this.sessionId, this.topic, this.subscriptionId));
            WebSocketSession session = this.webSocketSessionManager.getSession(this.sessionId);
            if (session == null) {
                this.subscription.cancel();
                return;
            }
            try {
                if (session.isOpen()) {
                    TopicService service = this.serviceFactory.getServiceByTopicType(this.topic.getTopicType());
                    ArrayList<ResourceMonitorEvent> events = new ArrayList<ResourceMonitorEvent>();
                    if (this.currentSeqnum < event.getStartSeqnum()) {
                        logger.debug((Object)String.format("The subscription current seqnum [%d] is lesser than the event start seqnum [%d]", this.currentSeqnum, event.getStartSeqnum()));
                        List priorEvents = service.getEvents(this.topic, this.currentSeqnum, event.getStartSeqnum());
                        events.addAll(priorEvents);
                        events.add(event);
                    } else if (this.currentSeqnum > event.getStartSeqnum() && this.currentSeqnum <= event.getEndSeqnum()) {
                        logger.debug((Object)String.format("The subscription current seqnum [%d] is between the events start and end seqnum ]%d, %d]", this.currentSeqnum, event.getStartSeqnum(), event.getEndSeqnum()));
                        List subsequentEvents = service.getEvents(this.topic, this.currentSeqnum, event.getEndSeqnum());
                        events.addAll(subsequentEvents);
                    } else if (this.currentSeqnum > event.getEndSeqnum()) {
                        logger.debug((Object)String.format("The subscription current seqnum [%d] is greater than the event end seqnum [%d]", this.currentSeqnum, event.getEndSeqnum()));
                    } else {
                        logger.debug((Object)String.format("The subscription current seqnum [%d] is equal to the event start seqnum [%d]", this.currentSeqnum, event.getStartSeqnum()));
                        events.add(event);
                    }
                    for (ResourceMonitorEvent e : events) {
                        Notification notification = new Notification(this.subscriptionId, this.topic.getTopicType(), this.mapper.map(e, this.topic));
                        session.sendMessage((WebSocketMessage)new TextMessage(this.objectMapper.writeValueAsBytes((Object)notification)));
                        this.currentSeqnum = event.getEndSeqnum();
                    }
                    break block11;
                }
                this.subscription.cancel();
                return;
            }
            catch (IOException e) {
                logger.error((Object)"There was an IOException when trying to sendMessage", (Throwable)e);
                this.subscription.cancel();
                return;
            }
        }
        this.subscription.request(1L);
    }

    @Override
    public synchronized void onError(Throwable throwable) {
        logger.error((Object)String.format("Session [%s]: onError activated for topic [%s] with subscriptionId [%s]", this.sessionId, this.topic, this.subscriptionId), throwable);
    }

    @Override
    public synchronized void onComplete() {
        logger.debug((Object)String.format("Session [%s]: onComplete activated for topic [%s] with subscriptionId [%s]", this.sessionId, this.topic, this.subscriptionId));
        this.completed = true;
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        TopicSubscriber that = (TopicSubscriber)o;
        return this.sessionId.equals(that.sessionId) && this.topic.equals((Object)that.topic);
    }

    public int hashCode() {
        return Objects.hash(this.sessionId, this.topic.getUUID());
    }
}

