/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.stateful.fork;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.PropertyVisitor;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.RowList;
import net.intelie.pipes.Scope;
import net.intelie.pipes.Sink;
import net.intelie.pipes.stateful.fork.DelayedInput;
import net.intelie.pipes.stateful.fork.DelayedOutput;
import net.intelie.pipes.stateful.fork.ForkInput;
import net.intelie.pipes.stateful.fork.ForkOutput;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.time.Task;
import net.intelie.pipes.time.TaskAction;
import net.intelie.pipes.time.TaskHandle;
import net.intelie.pipes.time.TimeQueue;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.Output;
import net.intelie.pipes.types.OutputType;
import net.intelie.pipes.types.RowFields;
import net.intelie.pipes.types.TimeOutput;
import net.intelie.pipes.util.Iterables;

public class ForkPipe
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Metadata oldMeta;
    private final ForkInput input;
    private final ForkOutput output;
    private final Pipe[] pipes;
    private final boolean split;
    private final OutputType restricted;

    public ForkPipe(Metadata oldMeta, ForkInput input, ForkOutput output, Pipe ... pipes) {
        this.oldMeta = oldMeta;
        this.input = input;
        this.output = output;
        this.pipes = pipes;
        this.split = ForkPipe.calculateSplit(pipes);
        this.restricted = output.restrictedOutput();
    }

    private static boolean calculateSplit(Pipe[] pipes) {
        for (Pipe pipe : pipes) {
            if (!pipe.split()) continue;
            return true;
        }
        return false;
    }

    public boolean split() {
        return this.split;
    }

    public Metadata metadata() {
        return this.output.metadata();
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    public Pipe mapper() {
        Pipe[] newPipes = new Pipe[this.pipes.length];
        for (int i = 0; i < this.pipes.length; ++i) {
            newPipes[i] = this.pipes[i].mapper();
        }
        return new ForkPipe(this.oldMeta, this.input, new DelayedOutput(this.oldMeta, newPipes), newPipes);
    }

    public Pipe reducer() {
        Pipe[] newPipes = new Pipe[this.pipes.length];
        for (int i = 0; i < this.pipes.length; ++i) {
            newPipes[i] = this.pipes[i].reducer();
        }
        return new ForkPipe(this.oldMeta, new DelayedInput(), this.output, newPipes);
    }

    public PropertyVisitor visit(Scope parent, PropertyVisitor visitor) {
        return this.output.visit(parent, this.input.visit(parent, visitor));
    }

    public String toString() {
        return "**fork(" + Iterables.join((String)", ", (Object[])this.pipes) + ")";
    }

    private boolean accepts(OutputType type) {
        return this.restricted == null || this.restricted == type;
    }

    private class MyInstance
    implements PipeInstance {
        private final Sink listener;
        private final PipeInstance[] instances;
        private final TimeQueue queue;
        private final Iterable[] values;
        private final List<TaskHandle> handles;
        private final ConcurrentLinkedQueue<Action> actions;
        private final boolean anyTimed;
        private volatile boolean started;
        private volatile boolean disposed;

        public MyInstance(Sink listener) {
            this.instances = new PipeInstance[ForkPipe.this.pipes.length];
            this.values = new Iterable[ForkPipe.this.pipes.length];
            this.handles = new ArrayList<TaskHandle>();
            this.actions = new ConcurrentLinkedQueue();
            this.started = false;
            this.disposed = false;
            this.listener = listener;
            this.queue = new TimeQueue((time, actions) -> {
                MyInstance myInstance = this;
                synchronized (myInstance) {
                    for (TaskAction action : actions) {
                        action.execute();
                    }
                    this.enqueueActions(time);
                }
            });
            boolean anyTimed = false;
            for (int i = 0; i < ForkPipe.this.pipes.length; ++i) {
                this.instances[i] = ForkPipe.this.pipes[i].newInstance((Sink)new InstanceSink(i));
                for (Output output : ForkPipe.this.pipes[i].metadata().output()) {
                    if (output.type() != OutputType.TIMED) continue;
                    this.queue.addTask((Period)((TimeOutput)output), (Task)new InstanceTask(i));
                    anyTimed = true;
                }
            }
            this.anyTimed = anyTimed;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void flow(Object obj) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                Long ts = this.anyTimed ? ForkPipe.this.input.timestamp(obj) : null;
                this.maybeFlip(ts, true);
                this.queue.notifyEvent();
                ForkPipe.this.input.flow(obj, this.instances);
                if (ForkPipe.this.accepts(OutputType.ITEM)) {
                    this.enqueueActions(ts);
                }
            }
            this.flushActions();
        }

        private void enqueueActions(Long ts) {
            if (this.disposed) {
                return;
            }
            this.actions.add(new Action(ts, this.copyOutput()));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void flowMany(Iterable iterable) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                Long ts = this.anyTimed ? ForkPipe.this.input.timestampMany(iterable) : null;
                this.maybeFlip(ts, true);
                this.queue.notifyEvent();
                ForkPipe.this.input.flowMany(iterable, this.instances);
                this.maybeFlip(ts, false);
                if (ForkPipe.this.accepts(OutputType.ITEM)) {
                    this.enqueueActions(ts);
                }
            }
            this.flushActions();
        }

        private void flushActions() {
            Action action;
            while ((action = this.actions.poll()) != null) {
                if (action.events == null) continue;
                ForkPipe.this.output.flush(this.listener, action.timestamp, action.events);
            }
        }

        private synchronized Iterable[] copyOutput() {
            Iterable[] localValues = null;
            for (int i = 0; i < ForkPipe.this.pipes.length; ++i) {
                if (this.values[i] == null) continue;
                if (localValues == null) {
                    localValues = new Iterable[ForkPipe.this.pipes.length];
                }
                localValues[i] = this.values[i];
                this.values[i] = null;
            }
            return localValues;
        }

        protected void maybeFlip(Long timestamp, boolean exclude) {
            if (!this.anyTimed || this.started || timestamp == null) {
                return;
            }
            this.advanceTo(timestamp - (long)(exclude ? 1 : 0));
        }

        public synchronized void turnOn(SchedulerContext context) {
            if (this.started) {
                throw new UnsupportedOperationException("Pipe instance already turned on");
            }
            this.advanceTo(context.now());
            for (int i = 0; i < this.instances.length; ++i) {
                PipeInstance instance = this.instances[i];
                for (Output output : ForkPipe.this.pipes[i].metadata().output()) {
                    if (output.type() != OutputType.TIMED) continue;
                    this.handles.add(context.schedule((Period)((TimeOutput)output), (begin, end) -> this.advanceTo(end)));
                }
                instance.turnOn(context);
            }
            this.started = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void advanceTo(long timestamp) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                this.queue.offer(timestamp);
                for (PipeInstance instance : this.instances) {
                    instance.advanceTo(timestamp);
                }
            }
            this.flushActions();
        }

        public void destroy() {
            this.destroy(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void destroy(boolean flushTimers) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                for (TaskHandle handle : this.handles) {
                    if (handle == null) continue;
                    handle.cancel();
                }
                if (flushTimers) {
                    this.queue.flush();
                }
                for (PipeInstance instance : this.instances) {
                    instance.destroy(false);
                }
                this.enqueueActions(null);
                this.disposed = true;
            }
            this.flushActions();
        }

        private class InstanceTask
        implements Task {
            private final int i;

            public InstanceTask(int i) {
                this.i = i;
            }

            public void run(long begin, long end) {
                MyInstance.this.instances[this.i].advanceTo(end);
            }
        }

        private class InstanceSink
        implements Sink {
            private final int i;

            public InstanceSink(int i) {
                this.i = i;
            }

            public void onEvent(RowFields fields, RowList rows) {
                ((MyInstance)MyInstance.this).values[this.i] = rows;
            }

            public void onRaw(RawEvent raw) {
                ((MyInstance)MyInstance.this).values[this.i] = raw;
            }
        }

        private class Action {
            private final Long timestamp;
            private final Iterable[] events;

            private Action(Long timestamp, Iterable[] events) {
                this.timestamp = timestamp;
                this.events = events;
            }
        }
    }
}

