/*
 * Decompiled with CFR 0.152.
 */
package csbase.util.messages;

import csbase.util.data.dispatcher.ExecutorDispatcher;
import csbase.util.data.dispatcher.IDispatcher;
import csbase.util.messages.ConsumerId;
import csbase.util.messages.IMessageListener;
import csbase.util.messages.Message;
import csbase.util.messages.MessageListenerDispatcher;
import csbase.util.messages.MessageStore;
import csbase.util.messages.Topic;
import csbase.util.messages.dao.IMessageStoreDAO;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import tecgraf.javautils.core.filter.IFilter;

public class MessageBroker {
    private static final long MIN_MANAGER_PERIOD = TimeUnit.MINUTES.toMillis(10L);
    private UUID brokerId = UUID.randomUUID();
    private IDispatcher<IMessageListener, Message> dispatcher;
    private IMessageStoreDAO dao;
    private Timer managerTimer;
    private TimerTask persistTask;
    private long managerPeriod;
    private long receiveTimeout;
    private HashMap<String, Topic> topicsByName;
    private AtomicBoolean started;
    private static final Logger LOGGER = Logger.getLogger(MessageBroker.class.getName());

    public MessageBroker(IMessageStoreDAO dao, long persistPeriod, long receiveTimeout, int maxThreads) {
        this(new ThreadPoolExecutor(maxThreads, maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()), dao, persistPeriod, receiveTimeout);
    }

    public MessageBroker(ExecutorService executor, IMessageStoreDAO dao, long persistPeriod, long receiveTimeout) {
        ExecutorDispatcher<IMessageListener, Message> dispatcher = new ExecutorDispatcher<IMessageListener, Message>(executor, new MessageListenerDispatcher());
        this.init(dispatcher, dao, persistPeriod, receiveTimeout);
    }

    public MessageBroker(IDispatcher<IMessageListener, Message> dispatcher, IMessageStoreDAO dao, long persistPeriod, long receiveTimeout) {
        this.init(dispatcher, dao, persistPeriod, receiveTimeout);
    }

    private void init(IDispatcher<IMessageListener, Message> dispatcher, IMessageStoreDAO dao, long persistPeriod, long receiveTimeout) {
        this.dispatcher = dispatcher;
        this.dao = dao;
        this.managerPeriod = Math.max(persistPeriod, MIN_MANAGER_PERIOD);
        this.receiveTimeout = receiveTimeout;
        this.topicsByName = new HashMap();
        this.started = new AtomicBoolean(false);
        this.persistTask = new TimerTask(){

            @Override
            public void run() {
                ArrayList<MessageStore> stores = new ArrayList<MessageStore>();
                for (Topic aTopic : MessageBroker.this.topicsByName.values()) {
                    MessageStore aStore = aTopic.getMessageStore();
                    stores.add(aStore);
                }
                MessageBroker.this.dao.saveAllMessageStores(stores);
            }
        };
    }

    public synchronized void start() {
        if (this.started.compareAndSet(false, true)) {
            this.topicsByName.clear();
            for (MessageStore store : this.dao.getAllMessageStores()) {
                store.setReceiveTimeout(this.receiveTimeout);
                Topic topic = new Topic(store, this.dispatcher);
                this.topicsByName.put(store.getName(), topic);
            }
            this.managerTimer = new Timer();
            this.managerTimer.schedule(this.persistTask, 0L, this.managerPeriod);
        }
    }

    public synchronized void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.managerTimer.cancel();
            this.managerTimer = null;
            ArrayList<MessageStore> pools = new ArrayList<MessageStore>();
            for (Topic aTopic : this.topicsByName.values()) {
                pools.add(aTopic.getMessageStore());
            }
            this.dao.saveAllMessageStores(pools);
        }
    }

    public Serializable createConsumerId() {
        return new ConsumerId(this.brokerId);
    }

    public Message[] receive(String topicName, Serializable consumerId, IFilter<Message> filter) {
        this.validateConsumerId(consumerId);
        Topic topic = this.getTopic(topicName, false);
        if (topic != null) {
            return topic.receive(consumerId, filter);
        }
        return new Message[0];
    }

    public void setMessageListener(String topicName, Serializable consumerId, IMessageListener listener, IFilter<Message> filter) {
        this.validateConsumerId(consumerId);
        Topic topic = this.getTopic(topicName, true);
        topic.subscribe(consumerId, listener, filter);
    }

    public void removeMessageListener(String topicName, Serializable consumerId) {
        this.validateConsumerId(consumerId);
        Topic topic = this.getTopic(topicName, false);
        if (null != topic) {
            topic.unsubscribe(consumerId);
        }
    }

    public void send(Message message, long timeToLive, String ... topicsDestination) {
        message.setId(UUID.randomUUID());
        for (String aTopicName : topicsDestination) {
            try {
                Topic topic = this.getTopic(aTopicName, true);
                topic.publish(message, timeToLive);
            }
            catch (Exception e) {
                LogRecord record = new LogRecord(Level.WARNING, "Error while publishing a message on topic: " + aTopicName + " .");
                record.setThrown(e);
                LOGGER.log(record);
            }
        }
    }

    private Topic getTopic(String name, boolean create) {
        if (!this.started.get()) {
            return null;
        }
        Topic topic = this.topicsByName.get(name);
        if (topic == null && create) {
            topic = new Topic(new MessageStore(name, this.receiveTimeout), this.dispatcher);
            this.topicsByName.put(name, topic);
        }
        return topic;
    }

    private void validateConsumerId(Serializable consumerId) {
        if (consumerId == null) {
            throw new NullPointerException("consumerId == null");
        }
        if (ConsumerId.class != consumerId.getClass()) {
            throw new IllegalArgumentException("Wrong consumer ID type.");
        }
        ConsumerId id = (ConsumerId)consumerId;
        if (!id.getBrokerId().equals(this.brokerId)) {
            throw new IllegalArgumentException("Consumer ID belongs to other broker.");
        }
    }
}

