package br.pucrio.tecgraf.soma.logsmonitor.flow;

import br.pucrio.tecgraf.soma.logsmonitor.model.Topic;
import br.pucrio.tecgraf.soma.logsmonitor.model.error.ErrorType;
import br.pucrio.tecgraf.soma.logsmonitor.model.error.ResourceErrorEvent;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorEvent;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener;
import br.pucrio.tecgraf.soma.logsmonitor.websocket.WebSocketNotificatioErrorService;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/logsmonitor/flow/TopicPublisher.class */
public class TopicPublisher {
    private static final Log logger = LogFactory.getLog((Class<?>) TopicPublisher.class);
    private final SubmissionPublisher<ResourceMonitorEvent> delegate;
    private final Map<String, TopicSubscriber> subscribers;
    private final Topic topic;
    private final Integer topicUUID;
    private final ResourceMonitorListener<ResourceMonitorEvent> listener;
    private Flow.Subscription monitorSubscription;

    @Autowired
    private WebSocketNotificatioErrorService webSocketErrorService;

    public TopicPublisher(final Topic topic) {
        logger.debug("Topic [%s]: new TopicPublisher".formatted(topic));
        this.topic = topic;
        this.topicUUID = topic.getUUID();
        this.delegate = new SubmissionPublisher<>();
        this.subscribers = new HashMap();
        this.listener = new ResourceMonitorListener<ResourceMonitorEvent>() { // from class: br.pucrio.tecgraf.soma.logsmonitor.flow.TopicPublisher.1
            @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener
            public void onSubscribe(Flow.Subscription subscription) {
                TopicPublisher.logger.debug("MonitorListener: onSubscribe activated for topic [%s] on its MonitorListener".formatted(topic));
                TopicPublisher.this.monitorSubscription = subscription;
            }

            @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener
            public void onNext(ResourceMonitorEvent resourceMonitorEvent) {
                TopicPublisher.logger.debug("MonitorListener: onNext activated for topic [%s] on its MonitorListener".formatted(topic));
                if (TopicPublisher.this.delegate.isClosed()) {
                    return;
                }
                TopicPublisher.this.delegate.submit(resourceMonitorEvent);
            }

            @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener
            public void onError(Throwable th) {
                TopicPublisher.logger.debug("MonitorListener: onError activated for topic [%s] on its MonitorListener".formatted(topic));
                TopicPublisher.this.notifyError(th.getMessage());
                TopicPublisher.this.delegate.close();
            }

            @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener
            public void onComplete() {
            }
        };
    }

    public Topic getTopic() {
        return this.topic;
    }

    public ResourceMonitorListener<ResourceMonitorEvent> getListener() {
        return this.listener;
    }

    public synchronized boolean hasSubscribers() {
        return !this.subscribers.isEmpty();
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.topicUUID.equals(((TopicPublisher) obj).topicUUID);
    }

    public int hashCode() {
        return Objects.hash(this.topicUUID);
    }

    public synchronized boolean isSubscribed(String str) {
        return this.subscribers.containsKey(str);
    }

    public synchronized void subscribe(String str, TopicSubscriber topicSubscriber) {
        logger.debug("Topic [%s]: subscribing session [%s]".formatted(this.topic, str));
        if (!this.subscribers.containsKey(str)) {
            this.delegate.subscribe(topicSubscriber);
            this.subscribers.put(topicSubscriber.getSessionId(), topicSubscriber);
        }
        logger.debug("Topic [%s]: active sessions numbers [%d]".formatted(this.topic, Integer.valueOf(this.subscribers.keySet().size())));
    }

    public synchronized void unsubscribe(String str) {
        logger.debug("Topic [%s]: unsubscribing session [%s]".formatted(this.topic, str));
        this.subscribers.remove(str).onComplete();
        logger.debug("Topic [%s]: active sessions numbers [%d]".formatted(this.topic, Integer.valueOf(this.subscribers.keySet().size())));
        if (hasSubscribers()) {
            return;
        }
        logger.debug("Topic [%s]: removing listener from monitor".formatted(this.topic));
        if (this.monitorSubscription != null) {
            this.monitorSubscription.cancel();
        }
        this.delegate.close();
    }

    private synchronized void notifyError(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(ResourceErrorEvent.JSON_PROPERTY_DETAILS, str);
        for (String str2 : this.subscribers.keySet()) {
            this.webSocketErrorService.onErrorNotify(str2, this.subscribers.get(str2).getSubscriptionId(), ErrorType.RESOURCE_ERROR, "Internal error in monitor", hashMap);
        }
    }
}
