/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.simple;

import net.intelie.pipes.GroupingTree;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.Row;
import net.intelie.pipes.Sink;
import net.intelie.pipes.Tree;
import net.intelie.pipes.model.BaseInstance;
import net.intelie.pipes.model.BasePipe;
import net.intelie.pipes.model.GroupingInsertMerger;
import net.intelie.pipes.model.TopLevelExpression;
import net.intelie.pipes.simple.IntermediateEvent;
import net.intelie.pipes.simple.TimeWindow;
import net.intelie.pipes.simple.WindowState;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.time.Task;
import net.intelie.pipes.time.TaskHandle;
import net.intelie.pipes.time.TimeQueue;
import net.intelie.pipes.types.ClauseInfo;
import net.intelie.pipes.types.CompositeInfo;
import net.intelie.pipes.types.FieldInfo;
import net.intelie.pipes.types.KeepOutputInfo;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.Output;
import net.intelie.pipes.types.RowFields;
import net.intelie.pipes.types.RowType;
import net.intelie.pipes.types.TimeOutput;
import net.intelie.pipes.types.Type;
import net.intelie.pipes.types.WindowInfo;

public class TimePipeReducer
extends BasePipe {
    private static final long serialVersionUID = 1L;
    private final TopLevelExpression query;
    private final TimeOutput output;
    private final TimeWindow window;
    private final RowFields fields = this.metadata().getRowFields();

    public TimePipeReducer(Metadata old, TopLevelExpression query, TimeOutput output, TimeWindow window) {
        super(TimePipeReducer.decideMetadata(old, query, output, window));
        this.query = query;
        this.output = output;
        this.window = window;
    }

    private static Metadata decideMetadata(Metadata old, TopLevelExpression query, TimeOutput output, TimeWindow window) {
        RowFields fields = new RowFields(new ClauseInfo(new FieldInfo[]{Row.TIMESTAMP_FIELD}), query.group().info(), query.select().info());
        return old.withType((Type)new RowType(fields)).withSafe(true).withWeight(query.weight() * window.estimateMultiplier((Period)output)).withWindow(TimePipeReducer.decideWindowInfo(query, output, window)).withOutput(new Output[]{output});
    }

    private static WindowInfo decideWindowInfo(TopLevelExpression query, TimeOutput output, TimeWindow window) {
        return new CompositeInfo(new WindowInfo[]{window.info(output.period()), new KeepOutputInfo(query.ttl(), output.period())});
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    private class MyInstance
    extends BaseInstance {
        private final Sink listener;
        private final WindowState windowState;
        private final GroupingInsertMerger merger;
        private final TimeQueue queue;
        private volatile boolean buffered;
        private volatile TaskHandle handle;

        public MyInstance(Sink listener) {
            this.windowState = TimePipeReducer.this.window.newState(TimePipeReducer.this.query);
            this.merger = TimePipeReducer.this.query.newInsertMerger();
            this.queue = new TimeQueue();
            this.buffered = false;
            this.handle = null;
            this.listener = listener;
            this.queue.addTask((Period)TimePipeReducer.this.output, (Task)new MyTask());
        }

        private void maybeFlip(long timestamp) {
            this.advanceTo(timestamp - 1L);
        }

        @Override
        public void advanceTo(long timestamp) {
            this.queue.offer(timestamp);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void flow(Object obj) {
            if (!(obj instanceof IntermediateEvent)) {
                return;
            }
            IntermediateEvent event = (IntermediateEvent)obj;
            if (this.handle == null) {
                this.maybeFlip(event.timestamp());
            }
            MyInstance myInstance = this;
            synchronized (myInstance) {
                this.queue.notifyEvent();
                this.buffered = true;
                this.merger.push((Tree)event.tree());
            }
        }

        @Override
        public synchronized void turnOn(SchedulerContext context) {
            if (this.handle != null) {
                throw new UnsupportedOperationException("Pipe instance already turned on");
            }
            this.advanceTo(context.now());
            this.handle = context.schedule((Period)TimePipeReducer.this.output, (begin, end) -> this.advanceTo(end));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void destroy(boolean flushTimers) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                if (this.handle != null) {
                    this.handle.cancel();
                }
            }
            if (flushTimers) {
                this.queue.flush();
            }
        }

        private synchronized GroupingTree flip() {
            GroupingTree rows = this.merger.get();
            this.merger.clear();
            this.buffered = false;
            return rows;
        }

        private class MyTask
        implements Task {
            private MyTask() {
            }

            public void run(long begin, long end) {
                if (MyInstance.this.buffered) {
                    MyInstance.this.listener.onEvent(TimePipeReducer.this.fields, MyInstance.this.windowState.yield(end, begin, end, MyInstance.this.flip()));
                }
            }
        }
    }
}

