/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.simple;

import net.intelie.pipes.ArrayRawEvent;
import net.intelie.pipes.GroupingTree;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.Property;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.Sink;
import net.intelie.pipes.model.BaseInstance;
import net.intelie.pipes.model.GroupingState;
import net.intelie.pipes.model.TopLevelExpression;
import net.intelie.pipes.simple.IntermediateEvent;
import net.intelie.pipes.simple.Window;
import net.intelie.pipes.stateless.EmptyPipe;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.time.Task;
import net.intelie.pipes.time.TaskHandle;
import net.intelie.pipes.time.TimeQueue;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.types.Output;
import net.intelie.pipes.types.TimeOutput;
import net.intelie.pipes.types.Type;
import net.intelie.pipes.types.WindowInfo;

public class TimePipeMapper
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Property<Double> timestamp;
    private final TopLevelExpression query;
    private final TimeOutput output;
    private final Metadata metadata;

    public TimePipeMapper(Metadata old, Property<Double> timestamp, TopLevelExpression query, TimeOutput output) {
        this.timestamp = timestamp;
        this.query = query;
        this.output = output;
        this.metadata = old.withType((Type)Type.OBJECT).withSafe(true).withWeight(query.weight()).withWindow(WindowInfo.NONE).withOutput(new Output[]{output});
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    public Metadata metadata() {
        return this.metadata;
    }

    public boolean split() {
        return false;
    }

    public Pipe mapper() {
        return this;
    }

    public Pipe reducer() {
        return new EmptyPipe(this.metadata, new Window[0]);
    }

    public String toString() {
        return this.query + " " + this.output;
    }

    private class MyInstance
    extends BaseInstance {
        private final GroupingState state;
        private final Sink listener;
        private final TimeQueue queue;
        private TaskHandle handle;

        private MyInstance(Sink listener) {
            this.state = TimePipeMapper.this.query.newState(0);
            this.queue = new TimeQueue();
            this.handle = null;
            this.listener = listener;
            this.queue.addTask((Period)TimePipeMapper.this.output, (Task)new MyTask());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void flow(Object obj) {
            if (this.handle == null) {
                this.maybeFlip((Double)TimePipeMapper.this.timestamp.eval(null, obj));
            }
            MyInstance myInstance = this;
            synchronized (myInstance) {
                this.queue.notifyEvent();
                this.state.yield(null, obj);
            }
        }

        @Override
        public synchronized void turnOn(SchedulerContext context) {
            if (this.handle != null) {
                throw new UnsupportedOperationException("Pipe instance already turned on");
            }
            this.advanceTo(context.now());
            this.handle = context.schedule((Period)TimePipeMapper.this.output, (begin, end) -> this.advanceTo(end));
        }

        private void maybeFlip(Double timestamp) {
            if (timestamp == null) {
                return;
            }
            this.advanceTo(timestamp.longValue() - 1L);
        }

        @Override
        public void advanceTo(long timestamp) {
            this.queue.offer(timestamp);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void destroy(boolean flushTimers) {
            MyInstance myInstance = this;
            synchronized (myInstance) {
                if (this.handle != null) {
                    this.handle.cancel();
                }
            }
            if (flushTimers) {
                this.queue.flush();
            }
        }

        private synchronized GroupingTree flip() {
            return this.state.flip();
        }

        private class MyTask
        implements Task {
            private MyTask() {
            }

            public void run(long begin, long end) {
                MyInstance.this.listener.onRaw((RawEvent)ArrayRawEvent.fromArray((Object[])new Object[]{new IntermediateEvent(end, MyInstance.this.flip())}));
            }
        }
    }
}

