/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import net.intelie.pipes.ArgQueue;
import net.intelie.pipes.ArrayRawEvent;
import net.intelie.pipes.ArrayRowList;
import net.intelie.pipes.Export;
import net.intelie.pipes.Fallback;
import net.intelie.pipes.GroupBy;
import net.intelie.pipes.Help;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeException;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.Property;
import net.intelie.pipes.PropertyVisitor;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.Row;
import net.intelie.pipes.RowList;
import net.intelie.pipes.Scope;
import net.intelie.pipes.Sink;
import net.intelie.pipes.UnsafeRow;
import net.intelie.pipes.model.BaseInstance;
import net.intelie.pipes.modules.FallbackToConstant;
import net.intelie.pipes.simple.Window;
import net.intelie.pipes.stateless.EmptyPipe;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.RowFields;
import net.intelie.pipes.types.SeqType;
import net.intelie.pipes.types.Type;
import net.intelie.pipes.util.Preconditions;
import net.intelie.pipes.util.Throttler;

@Export(value={"@throttle"})
@Help(key="pipe-throttle")
public class ThrottlePipe
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Metadata metadata;
    private final int points;
    private final Iterable<Period> period;
    private final String periodRepr;
    private final GroupBy group;
    private final Property<Double> timestamp;
    private final RowFields fields;

    public ThrottlePipe(ArgQueue queue) throws PipeException {
        queue.ensureSafe();
        this.group = queue.groupBy().ensureNoExpiry();
        this.points = ((Double)queue.constantValue((Type)Type.NUMBER).getSafe(new FallbackToConstant<Double>(1.0))).intValue();
        this.periodRepr = String.valueOf(queue.copyFromHere().getSafe());
        this.period = this.getPeriod(queue);
        this.timestamp = queue.context().timestamp();
        Preconditions.checkArgument((this.points >= 1 ? 1 : 0) != 0, (Object)"Points must be positive");
        Metadata old = queue.context().metadata();
        this.metadata = old.resetWeights((long)this.decideWeight(this.points, this.group));
        this.fields = this.metadata.getRowFields();
    }

    private Iterable<Period> getPeriod(ArgQueue queue) throws PipeException {
        return (Iterable)queue.constantValue((Type)new SeqType((Type)Type.PERIOD)).getSafe((Fallback)new Fallback<Iterable>(){

            public Iterable create(ArgQueue queue) throws PipeException {
                return Arrays.asList((Period)queue.constantValue((Type)Type.PERIOD).get());
            }
        });
    }

    private int decideWeight(int points, GroupBy group) {
        return (points * 8 + (group.size() + 1) * 32) * (group.size() == 0 ? 1 : 64);
    }

    public boolean split() {
        return true;
    }

    public Pipe mapper() {
        return new EmptyPipe(this.metadata, new Window[0]);
    }

    public Pipe reducer() {
        return this;
    }

    public Metadata metadata() {
        return this.metadata;
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    public PropertyVisitor visit(Scope parent, PropertyVisitor visitor) {
        this.timestamp.visit(parent, visitor);
        this.group.visit(parent, visitor);
        return visitor;
    }

    public String toString() {
        String s = "@throttle " + (this.points != 1 ? this.points + ", " : "") + this.periodRepr;
        if (this.group.size() > 0) {
            s = s + " " + this.group;
        }
        return s;
    }

    private class MyInstance
    extends BaseInstance {
        private final Sink listener;
        private final Throttler throttler;
        private final UnsafeRow buffer;

        public MyInstance(Sink listener) {
            this.listener = listener;
            this.buffer = new UnsafeRow(ThrottlePipe.this.group.size());
            this.throttler = new Throttler(ThrottlePipe.this.points, ThrottlePipe.this.period);
        }

        private boolean offer(Object obj) {
            Double x = (Double)ThrottlePipe.this.timestamp.eval(null, obj);
            if (x == null) {
                return false;
            }
            ThrottlePipe.this.group.evalUnsafe(null, obj, this.buffer);
            return this.throttler.offer(this.buffer, x.longValue());
        }

        @Override
        public void flowMany(Iterable iterable) {
            if (ThrottlePipe.this.fields != null) {
                List<Row> rows = this.extract(iterable, Row.class);
                if (rows.size() > 0) {
                    this.listener.onEvent(ThrottlePipe.this.fields, (RowList)new ArrayRowList(rows));
                }
            } else {
                List<Object> rows = this.extract(iterable, Object.class);
                if (rows.size() > 0) {
                    this.listener.onRaw((RawEvent)new ArrayRawEvent(rows));
                }
            }
        }

        private synchronized <T> List<T> extract(Iterable iterable, Class<T> clazz) {
            ArrayList<T> rows = new ArrayList<T>();
            if (iterable != null) {
                for (Object o : iterable) {
                    if (!clazz.isInstance(o) || !this.offer(o)) continue;
                    rows.add(clazz.cast(o));
                }
            }
            return rows;
        }
    }
}

