/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes.simple;

import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.Sink;
import net.intelie.pipes.model.BaseInstance;
import net.intelie.pipes.simple.IntermediateEvent;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.types.Metadata;

public class SplittablePipe
implements Pipe {
    private static final long serialVersionUID = 1L;
    private final Pipe mapper;
    private final Pipe reducer;
    private final String repr;

    public SplittablePipe(Pipe mapper, Pipe reducer, String repr) {
        this.mapper = mapper;
        this.reducer = reducer;
        this.repr = repr;
    }

    public PipeInstance newInstance(Sink listener) {
        return new MyInstance(listener);
    }

    public Metadata metadata() {
        return this.reducer.metadata();
    }

    public boolean split() {
        return true;
    }

    public Pipe mapper() {
        return this.mapper;
    }

    public Pipe reducer() {
        return this.reducer;
    }

    public String toString() {
        return this.repr;
    }

    private class MyInstance
    extends BaseInstance {
        private final PipeInstance mapperInstance;
        private final PipeInstance reducerInstance;

        private MyInstance(Sink listener) {
            this.mapperInstance = SplittablePipe.this.mapper.newInstance((Sink)new Sink.Empty(){

                public void onRaw(RawEvent raw) {
                    MyInstance.this.reducerInstance.flowMany((Iterable)raw);
                    if (raw.size() >= 1 && raw.get(0) instanceof IntermediateEvent) {
                        MyInstance.this.reducerInstance.advanceTo(((IntermediateEvent)raw.get(0)).timestamp());
                    }
                }
            });
            this.reducerInstance = SplittablePipe.this.reducer.newInstance(listener);
        }

        @Override
        public void flow(Object obj) {
            this.mapperInstance.flow(obj);
        }

        @Override
        public synchronized void turnOn(SchedulerContext context) {
            this.mapperInstance.turnOn(context);
            this.reducerInstance.turnOn(context);
        }

        @Override
        public void advanceTo(long timestamp) {
            this.mapperInstance.advanceTo(timestamp);
            this.reducerInstance.advanceTo(timestamp);
        }

        @Override
        public void destroy(boolean flushTimers) {
            this.mapperInstance.destroy(flushTimers);
            this.reducerInstance.destroy(flushTimers);
        }
    }
}

