/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.disq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.intelie.disq.DisqBuilder;
import net.intelie.disq.PersistentQueue;
import net.intelie.disq.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Disq<T>
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Disq.class);
    private final List<Thread> threads;
    private final long autoFlushNanos;
    private final List<Object> locks;
    private final PersistentQueue<T> queue;
    private final AtomicLong nextFlush;
    private final AtomicBoolean open;

    public Disq(ThreadFactory factory, int threads, long autoFlushMs, Processor<T> processor, PersistentQueue<T> queue) {
        this.threads = new ArrayList<Thread>(threads);
        this.autoFlushNanos = autoFlushMs * 1000000L;
        this.locks = new ArrayList<Object>();
        this.queue = queue;
        this.open = new AtomicBoolean(true);
        this.nextFlush = this.autoFlushNanos > 0L ? new AtomicLong(System.nanoTime() + this.autoFlushNanos) : null;
        for (int i = 0; i < threads; ++i) {
            Object shutdownLock = new Object();
            Thread thread = factory.newThread(new WorkerRunnable(queue, shutdownLock, processor));
            this.locks.add(shutdownLock);
            this.threads.add(thread);
            thread.start();
        }
    }

    public static <T> DisqBuilder<T> builder() {
        return new DisqBuilder(null);
    }

    public static <T> DisqBuilder<T> builder(Processor<T> processor) {
        return new DisqBuilder<T>(processor);
    }

    public PersistentQueue<T> queue() {
        return this.queue;
    }

    public long count() {
        return this.queue.count();
    }

    public long bytes() {
        return this.queue.bytes();
    }

    public long remainingBytes() {
        return this.queue.remainingBytes();
    }

    public boolean submit(T obj) throws IOException {
        return this.open.get() && this.queue.push(obj);
    }

    public void pause() {
        this.queue.setPopPaused(true);
    }

    public void resume() {
        this.queue.setPopPaused(false);
    }

    public void clear() throws IOException {
        this.queue.clear();
    }

    public void flush() throws IOException {
        this.queue.flush();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException, InterruptedException {
        if (!this.open.getAndSet(false)) {
            return;
        }
        this.queue.setPushPaused(true);
        try {
            for (int i = 0; i < this.threads.size(); ++i) {
                Object object = this.locks.get(i);
                synchronized (object) {
                    this.threads.get(i).interrupt();
                    continue;
                }
            }
            for (Thread thread : this.threads) {
                thread.join();
            }
        }
        finally {
            this.queue.close();
        }
    }

    private class WorkerRunnable
    implements Runnable {
        private final PersistentQueue<T> queue;
        private final Object shutdownLock;
        private final Processor<T> processor;

        public WorkerRunnable(PersistentQueue<T> queue, Object shutdownLock, Processor<T> processor) {
            this.queue = queue;
            this.shutdownLock = shutdownLock;
            this.processor = processor;
        }

        @Override
        public void run() {
            while (Disq.this.open.get()) {
                try {
                    long nextFlushNanos = Disq.this.nextFlush != null ? Disq.this.nextFlush.get() : 0L;
                    Object obj = this.blockingPop(nextFlushNanos);
                    this.process(obj);
                    this.maybeFlush(nextFlushNanos);
                }
                catch (Throwable e) {
                    LOGGER.info("Exception processing element", e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process(T obj) throws Exception {
            if (obj == null || this.processor == null) {
                return;
            }
            Object object = this.shutdownLock;
            synchronized (object) {
                boolean interrupted = Thread.interrupted();
                try {
                    this.processor.process(obj);
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        private void maybeFlush(long nextFlushNanos) throws IOException {
            long now = System.nanoTime();
            if (Disq.this.nextFlush != null && now >= nextFlushNanos && Disq.this.nextFlush.compareAndSet(nextFlushNanos, now + Disq.this.autoFlushNanos)) {
                this.queue.flush();
            }
        }

        private T blockingPop(long nextFlushNanos) throws IOException {
            Object obj = null;
            try {
                if (Disq.this.nextFlush != null) {
                    long wait = Math.max(nextFlushNanos - System.nanoTime(), 0L);
                    obj = this.queue.blockingPop(wait, TimeUnit.NANOSECONDS);
                } else {
                    obj = this.queue.blockingPop();
                }
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            return obj;
        }
    }
}

