/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.Collectors;
import net.intelie.pipes.ArgQueue;
import net.intelie.pipes.CompilerContext;
import net.intelie.pipes.Export;
import net.intelie.pipes.Help;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeException;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.PropertyVisitor;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.RowList;
import net.intelie.pipes.Scope;
import net.intelie.pipes.Sink;
import net.intelie.pipes.simple.Window;
import net.intelie.pipes.stateful.RedirectToSink;
import net.intelie.pipes.stateless.EmptyPipe;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.types.CompositeInfo;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.RowFields;
import net.intelie.pipes.types.WindowInfo;
import net.intelie.pipes.util.Preconditions;

@Export(value={"@sequence"})
@Help(omit=true)
public class SequencePipe
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Pipe[] pipes;
    private final Metadata metadata;
    private final int splitId;

    public SequencePipe(ArgQueue queue) throws PipeException {
        this(SequencePipe.makePipes(queue));
    }

    private static Pipe[] makePipes(ArgQueue queue) throws PipeException {
        CompilerContext context = queue.context();
        ArrayList<Pipe> pipes = new ArrayList<Pipe>();
        while (queue.hasNext()) {
            Pipe pipe = (Pipe)queue.withContext(context).get(Pipe.class);
            pipes.add(pipe);
            context = context.newSource(pipe.metadata());
        }
        return pipes.toArray(new Pipe[pipes.size()]);
    }

    public SequencePipe(Pipe ... pipes) {
        Preconditions.checkArgument((pipes != null && pipes.length > 0 ? 1 : 0) != 0, (Object)"must have any pipe");
        this.pipes = Arrays.copyOf(pipes, pipes.length);
        this.splitId = SequencePipe.calculateSplitId(pipes);
        this.metadata = SequencePipe.decideMetadata(pipes);
    }

    private static Metadata decideMetadata(Pipe[] pipes) {
        Metadata last = pipes[pipes.length - 1].metadata();
        ArrayList<WindowInfo> windows = new ArrayList<WindowInfo>();
        long weight = 0L;
        for (Pipe pipe : pipes) {
            Metadata meta = pipe.metadata();
            weight += meta.weight();
            windows.add(meta.window());
        }
        return last.withWeight(weight).withWindow((WindowInfo)new CompositeInfo(windows));
    }

    private static int calculateSplitId(Pipe[] pipes) {
        for (int i = 0; i < pipes.length; ++i) {
            if (!pipes[i].split()) continue;
            return i;
        }
        return pipes.length;
    }

    public Pipe mapper() {
        if (!this.split()) {
            return this;
        }
        Pipe[] mapper = Arrays.copyOfRange(this.pipes, 0, this.splitId + 1);
        mapper[this.splitId] = mapper[this.splitId].mapper();
        return new SequencePipe(mapper);
    }

    public Pipe reducer() {
        if (!this.split()) {
            return new EmptyPipe(this.metadata, new Window[0]);
        }
        Pipe[] reducer = Arrays.copyOfRange(this.pipes, this.splitId, this.pipes.length);
        reducer[0] = reducer[0].reducer();
        return new SequencePipe(reducer);
    }

    public PipeInstance newInstance(Sink listener) {
        PipeInstance[] handles = new PipeInstance[this.pipes.length];
        listener = new SafeSink(listener);
        int i = this.pipes.length;
        while (i-- > 0) {
            handles[i] = this.pipes[i].newInstance(listener);
            listener = new RedirectToSink(handles[i]);
        }
        return new FirstHandle(handles);
    }

    public PropertyVisitor visit(Scope parent, PropertyVisitor visitor) {
        for (Pipe pipe : this.pipes) {
            visitor = pipe.visit(parent, visitor);
        }
        return visitor;
    }

    public Metadata metadata() {
        return this.metadata;
    }

    public boolean split() {
        return this.splitId < this.pipes.length;
    }

    public String toString() {
        return Arrays.stream(this.pipes).map(String::valueOf).filter(x -> !x.isEmpty()).collect(Collectors.joining(" => "));
    }

    private class SafeSink
    implements Sink {
        private final Sink listener;

        public SafeSink(Sink listener) {
            this.listener = listener;
        }

        public void onEvent(RowFields fields, RowList rows) {
            if (this.listener != null) {
                this.listener.onEvent(fields, rows);
            }
        }

        public void onRaw(RawEvent raw) {
            if (this.listener != null) {
                this.listener.onRaw(raw);
            }
        }
    }

    private static class FirstHandle
    implements PipeInstance {
        private final PipeInstance[] handles;

        public FirstHandle(PipeInstance[] handles) {
            this.handles = handles;
        }

        public void flow(Object obj) {
            this.handles[0].flow(obj);
        }

        public void flowMany(Iterable iterable) {
            this.handles[0].flowMany(iterable);
        }

        public void turnOn(SchedulerContext context) {
            for (PipeInstance handle : this.handles) {
                handle.turnOn(context);
            }
        }

        public void advanceTo(long timestamp) {
            for (PipeInstance handle : this.handles) {
                handle.advanceTo(timestamp);
            }
        }

        public void destroy(boolean flushTimers) {
            for (int i = 0; i < this.handles.length; ++i) {
                this.handles[i].destroy(flushTimers || i < this.handles.length - 1);
            }
        }

        public void destroy() {
            this.destroy(true);
        }
    }
}

