/*
 * Decompiled with CFR 0.152.
 */
package csbase.util.messages;

import csbase.util.data.channel.DataChannel;
import csbase.util.data.dispatcher.IDispatchListener;
import csbase.util.data.dispatcher.IDispatcher;
import csbase.util.messages.AsynchronousConsumer;
import csbase.util.messages.IMessageListener;
import csbase.util.messages.Message;
import csbase.util.messages.MessageStore;
import java.io.Serializable;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import tecgraf.javautils.core.filter.IFilter;
import tecgraf.javautils.core.lng.LNG;

public class Topic
implements IDispatchListener<IMessageListener, Message> {
    static final String className = Topic.class.getSimpleName();
    private static final Logger LOGGER = Logger.getLogger(Topic.class.getName());
    private DataChannel<IMessageListener, Message> channel;
    private MessageStore store;
    private IDispatcher<IMessageListener, Message> dispatcher;

    public Topic(MessageStore store, IDispatcher<IMessageListener, Message> dispatcher) {
        this.store = store;
        this.dispatcher = dispatcher;
        this.channel = new DataChannel<IMessageListener, Message>(this.dispatcher, this);
    }

    public Message[] receive(Serializable consumerId, IFilter<Message> filter) {
        return this.store.receive(consumerId, filter);
    }

    public void publish(Message message, long timeToLive) {
        this.store.publish(message, timeToLive);
        this.channel.publish((Message[])new Message[]{message});
    }

    public int subscribe(Serializable consumerId, IMessageListener listener, IFilter<Message> selector) {
        AsynchronousConsumer consumer = new AsynchronousConsumer(consumerId, listener, this.store);
        int size = this.channel.subscribe(consumer, selector);
        Message[] messages = this.store.peek(consumerId, selector);
        if (messages.length > 0) {
            this.dispatcher.dispatch(this, consumer, (Message[])messages);
        }
        return size;
    }

    public int unsubscribe(Serializable consumerId) {
        return this.channel.unsubscribe(new AsynchronousConsumer(consumerId, null, null));
    }

    public MessageStore getMessageStore() {
        return this.store;
    }

    public void onExceptionThrown(Exception e, IMessageListener consumer, Message ... messages) {
        AsynchronousConsumer asyncConsumer = (AsynchronousConsumer)AsynchronousConsumer.class.cast(consumer);
        LogRecord record = new LogRecord(Level.WARNING, LNG.get((String)(className + ".warning.messagedelivery.consumer"), (Object[])new Object[]{asyncConsumer.getId()}));
        record.setThrown(e);
        LOGGER.log(record);
    }

    public void onDataDelivered(IMessageListener consumer, Message ... messages) {
    }
}

