/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.simple;

import java.util.ArrayList;
import net.intelie.pipes.ArrayRowList;
import net.intelie.pipes.CompilerContext;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeException;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.Property;
import net.intelie.pipes.PropertyVisitor;
import net.intelie.pipes.Row;
import net.intelie.pipes.RowList;
import net.intelie.pipes.Scope;
import net.intelie.pipes.Sink;
import net.intelie.pipes.guava.collect.Iterables;
import net.intelie.pipes.model.BaseInstance;
import net.intelie.pipes.model.GroupingState;
import net.intelie.pipes.model.TopLevelExpression;
import net.intelie.pipes.simple.BatchWindow;
import net.intelie.pipes.simple.NoWindow;
import net.intelie.pipes.simple.Window;
import net.intelie.pipes.simple.WindowState;
import net.intelie.pipes.stateless.EmptyPipe;
import net.intelie.pipes.types.BatchOutput;
import net.intelie.pipes.types.ClauseInfo;
import net.intelie.pipes.types.FieldInfo;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.RowFields;
import net.intelie.pipes.types.RowType;
import net.intelie.pipes.types.Type;

public class BatchAggregationPipe
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Property<Double> timestamp;
    private final TopLevelExpression query;
    private final BatchOutput output;
    private final BatchWindow window;
    private final Metadata metadata;
    private final RowFields fields;

    public BatchAggregationPipe(CompilerContext config, TopLevelExpression query, BatchOutput output, BatchWindow window) throws PipeException {
        this.timestamp = BatchAggregationPipe.checkTimestamp(config);
        this.query = query;
        this.output = output;
        this.window = window;
        this.fields = new RowFields(new ClauseInfo(new FieldInfo[]{Row.TIMESTAMP_FIELD}), query.group().info(), query.select().info());
        this.metadata = config.metadata().withType((Type)new RowType(this.fields)).withSafe(true).withWeight(query.weight() * window.estimateMultiplier(output.amount())).withWindow(window.info(output.amount())).withOutput((Iterable)config.metadata().output());
    }

    private static Property<Double> checkTimestamp(CompilerContext config) throws PipeException {
        if (!config.metadata().safe()) {
            throw new PipeException((Object)"Batch aggregation pipe cannot be distributed. Please run it in a safe environment. Consider preceding it with a time aggregation pipe or an '@unsafe' pipe.");
        }
        return config.timestamp();
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    public Metadata metadata() {
        return this.metadata;
    }

    public boolean split() {
        return true;
    }

    public Pipe mapper() {
        return this;
    }

    public Pipe reducer() {
        return new EmptyPipe(this.metadata, new Window[0]);
    }

    public String toString() {
        return this.query + (this.window instanceof NoWindow ? "" : " " + this.window) + " " + this.output;
    }

    public PropertyVisitor visit(Scope parent, PropertyVisitor visitor) {
        this.timestamp.visit(parent, visitor);
        return this.query.visit(parent, visitor);
    }

    private class MyInstance
    extends BaseInstance {
        private final GroupingState state;
        private final Sink listener;
        private final WindowState windowState;
        private volatile Double latestTimestamp;
        private volatile long windowStart;
        private volatile long windowEnd;

        private MyInstance(Sink listener) {
            this.state = BatchAggregationPipe.this.query.newState(0);
            this.latestTimestamp = null;
            this.windowStart = 0L;
            this.windowEnd = 0L;
            this.listener = listener;
            this.windowState = BatchAggregationPipe.this.window.newState(BatchAggregationPipe.this.query);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void flowMany(Iterable iterable) {
            ArrayList list = new ArrayList();
            MyInstance myInstance = this;
            synchronized (myInstance) {
                for (Object o : iterable) {
                    this.state.yield(null, o);
                    this.latestTimestamp = (Double)BatchAggregationPipe.this.timestamp.eval(null, o);
                }
                if (++this.windowEnd - this.windowStart == (long)BatchAggregationPipe.this.output.amount()) {
                    Iterables.addAll(list, this.flush(this.windowStart, this.windowEnd));
                    this.windowStart = this.windowEnd;
                }
            }
            if (!list.isEmpty()) {
                this.listener.onEvent(BatchAggregationPipe.this.fields, (RowList)new ArrayRowList(list));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void destroy(boolean flushTimers) {
            RowList list = null;
            MyInstance myInstance = this;
            synchronized (myInstance) {
                if (flushTimers && this.windowStart != this.windowEnd) {
                    list = this.flush(this.windowStart, this.windowEnd);
                }
            }
            if (list != null) {
                this.listener.onEvent(BatchAggregationPipe.this.fields, list);
            }
        }

        private RowList flush(long begin, long end) {
            Long timestamp = this.latestTimestamp != null ? Long.valueOf(this.latestTimestamp.longValue()) : null;
            return this.windowState.yield(timestamp, begin, end, this.state.flip());
        }
    }
}

