package csbase.util.messages;

import csbase.util.data.dispatcher.ExecutorDispatcher;
import csbase.util.data.dispatcher.IDispatcher;
import csbase.util.messages.dao.IMessageStoreDAO;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import tecgraf.javautils.core.filter.IFilter;
import tecgraf.javautils.core.lng.LNG;

/* loaded from: input_file:csbase/util/messages/MessageBroker.class */
public class MessageBroker {
    private UUID brokerId;
    private IDispatcher<IMessageListener, Message> dispatcher;
    private IMessageStoreDAO dao;
    private Timer managerTimer;
    private TimerTask persistTask;
    private long managerPeriod;
    private long receiveTimeout;
    private HashMap<String, Topic> topicsByName;
    private AtomicBoolean started;
    static final String className = MessageBroker.class.getSimpleName();
    private static final long MIN_MANAGER_PERIOD = TimeUnit.MINUTES.toMillis(10);
    private static final Logger LOGGER = Logger.getLogger(MessageBroker.class.getName());

    public MessageBroker(IMessageStoreDAO iMessageStoreDAO, long j, long j2, int i) {
        this(new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()), iMessageStoreDAO, j, j2);
    }

    public MessageBroker(ExecutorService executorService, IMessageStoreDAO iMessageStoreDAO, long j, long j2) {
        this.brokerId = UUID.randomUUID();
        init(new ExecutorDispatcher(executorService, new MessageListenerDispatcher()), iMessageStoreDAO, j, j2);
    }

    public MessageBroker(IDispatcher<IMessageListener, Message> iDispatcher, IMessageStoreDAO iMessageStoreDAO, long j, long j2) {
        this.brokerId = UUID.randomUUID();
        init(iDispatcher, iMessageStoreDAO, j, j2);
    }

    private void init(IDispatcher<IMessageListener, Message> iDispatcher, IMessageStoreDAO iMessageStoreDAO, long j, long j2) {
        this.dispatcher = iDispatcher;
        this.dao = iMessageStoreDAO;
        this.managerPeriod = Math.max(j, MIN_MANAGER_PERIOD);
        this.receiveTimeout = j2;
        this.topicsByName = new HashMap<>();
        this.started = new AtomicBoolean(false);
        this.persistTask = new TimerTask() { // from class: csbase.util.messages.MessageBroker.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ArrayList arrayList = new ArrayList();
                Iterator it = MessageBroker.this.topicsByName.values().iterator();
                while (it.hasNext()) {
                    arrayList.add(((Topic) it.next()).getMessageStore());
                }
                MessageBroker.this.dao.saveAllMessageStores(arrayList);
            }
        };
    }

    public synchronized void start() {
        if (this.started.compareAndSet(false, true)) {
            this.topicsByName.clear();
            for (MessageStore messageStore : this.dao.getAllMessageStores()) {
                messageStore.setReceiveTimeout(this.receiveTimeout);
                this.topicsByName.put(messageStore.getName(), new Topic(messageStore, this.dispatcher));
            }
            this.managerTimer = new Timer();
            this.managerTimer.schedule(this.persistTask, 0L, this.managerPeriod);
        }
    }

    public synchronized void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.managerTimer.cancel();
            this.managerTimer = null;
            ArrayList arrayList = new ArrayList();
            Iterator<Topic> it = this.topicsByName.values().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getMessageStore());
            }
            this.dao.saveAllMessageStores(arrayList);
        }
    }

    public Serializable createConsumerId() {
        return new ConsumerId(this.brokerId);
    }

    public Message[] receive(String str, Serializable serializable, IFilter<Message> iFilter) {
        validateConsumerId(serializable);
        Topic topic = getTopic(str, false);
        return topic != null ? topic.receive(serializable, iFilter) : new Message[0];
    }

    public void setMessageListener(String str, Serializable serializable, IMessageListener iMessageListener, IFilter<Message> iFilter) {
        validateConsumerId(serializable);
        getTopic(str, true).subscribe(serializable, iMessageListener, iFilter);
    }

    public void removeMessageListener(String str, Serializable serializable) {
        validateConsumerId(serializable);
        Topic topic = getTopic(str, false);
        if (null != topic) {
            topic.unsubscribe(serializable);
        }
    }

    public void send(Message message, long j, String... strArr) {
        message.setId(UUID.randomUUID());
        for (String str : strArr) {
            try {
                getTopic(str, true).publish(message, j);
            } catch (Exception e) {
                LogRecord logRecord = new LogRecord(Level.WARNING, LNG.get(className + ".warning.message.publish", str));
                logRecord.setThrown(e);
                LOGGER.log(logRecord);
            }
        }
    }

    private Topic getTopic(String str, boolean z) {
        if (!this.started.get()) {
            return null;
        }
        Topic topic = this.topicsByName.get(str);
        if (topic == null && z) {
            topic = new Topic(new MessageStore(str, this.receiveTimeout), this.dispatcher);
            this.topicsByName.put(str, topic);
        }
        return topic;
    }

    private void validateConsumerId(Serializable serializable) {
        if (serializable == null) {
            throw new NullPointerException("consumerId == null");
        }
        if (ConsumerId.class != serializable.getClass()) {
            throw new IllegalArgumentException(className + ".illegalarg.consumerid.type");
        }
        if (!((ConsumerId) serializable).getBrokerId().equals(this.brokerId)) {
            throw new IllegalArgumentException(className + ".illegalarg.consumerid.otherbroker");
        }
    }
}
