/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.pipes;

import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import net.intelie.pipes.FullPipe;
import net.intelie.pipes.MapPropertySource;
import net.intelie.pipes.Pipe;
import net.intelie.pipes.PipeCompiler;
import net.intelie.pipes.PipeCompilers;
import net.intelie.pipes.PipeException;
import net.intelie.pipes.PipeInstance;
import net.intelie.pipes.PropertySource;
import net.intelie.pipes.RawEvent;
import net.intelie.pipes.Sink;
import net.intelie.pipes.Valve;
import net.intelie.pipes.filters.AndFilter;
import net.intelie.pipes.filters.Filter;
import net.intelie.pipes.filters.FilterOptimizer;
import net.intelie.pipes.filters.FilterRuntime;
import net.intelie.pipes.filters.ObjectSink;
import net.intelie.pipes.guava.collect.Lists;
import net.intelie.pipes.guava.collect.Sets;
import net.intelie.pipes.simple.Window;
import net.intelie.pipes.stateful.FullPipeImpl;
import net.intelie.pipes.stateless.EmptyPipe;
import net.intelie.pipes.time.DefaultScheduler;
import net.intelie.pipes.time.Scheduler;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.types.Metadata;
import net.intelie.pipes.util.AutomatonRepr;
import net.intelie.pipes.util.ThreadFactories;

public class PipeRuntime {
    private static final ThreadFactories FACTORIES = new ThreadFactories("pipes");
    private final AtomicLong queries = new AtomicLong(0L);
    private final PropertySource source;
    private final Scheduler scheduler;
    private final FilterRuntime.Default runtime;
    private final Set<RuntimeValve> valves;
    private final PipeCompiler<FullPipe> compiler;

    public PipeRuntime() {
        this((PropertySource)new MapPropertySource(), (Scheduler)new DefaultScheduler());
    }

    public PipeRuntime(PropertySource source, Scheduler scheduler) {
        this(source, scheduler, true);
    }

    public PipeRuntime(PropertySource source, Scheduler scheduler, boolean autoFlush) {
        this.source = source;
        this.scheduler = scheduler;
        this.runtime = new FilterRuntime.Default(autoFlush);
        this.valves = Sets.newHashSet();
        this.compiler = PipeCompilers.get().withSource(source);
    }

    public AutomatonRepr repr(boolean simplify) {
        return this.runtime.repr(simplify);
    }

    public void forceUpdate() {
        this.runtime.forceUpdate();
    }

    public int flow(Object obj) {
        return this.runtime.flow(obj);
    }

    public int flowMany(Iterable iterable) {
        return this.runtime.flowMany(iterable);
    }

    public Valve addOnline(String pipe, Sink sink) throws PipeException {
        return this.addOnline((Pipe)this.newCompiler().compile(pipe), sink);
    }

    public Valve addOnline(Pipe pipe, Sink sink) throws PipeException {
        return this.add(true, pipe, sink);
    }

    public Valve addOffline(String pipe, Sink sink) throws PipeException {
        return this.addOffline((Pipe)this.newCompiler().compile(pipe), sink);
    }

    public Valve addOffline(Pipe pipe, Sink sink) throws PipeException {
        return this.add(false, pipe, sink);
    }

    public Valve addFilter(Filter filter, ObjectSink sink) {
        return this.add(false, filter, (Pipe)new EmptyPipe(Metadata.RAW, new Window[0]), (Sink)new FilterSinkSink(sink));
    }

    public PipeCompiler<FullPipe> newCompiler() {
        return this.compiler;
    }

    public Valve add(boolean online, Pipe pipe, Sink sink) throws PipeException {
        ArrayList<Filter> filters = new ArrayList<Filter>();
        while (pipe instanceof FullPipe) {
            FullPipe filtered = (FullPipe)pipe;
            filters.add(filtered.filter());
            pipe = filtered.unfiltered();
        }
        return this.add(online, FilterOptimizer.optimize((Filter)new AndFilter(filters)), pipe, sink);
    }

    public synchronized Valve add(boolean online, Filter filter, Pipe pipe, Sink sink) {
        if (sink == null) {
            sink = new Sink.Empty(){};
        }
        PipeInstance instance = pipe.newInstance(sink);
        if (online) {
            this.turnOnDefault(instance);
        }
        return this.add(instance, filter, pipe);
    }

    public synchronized Valve add(boolean online, Valve valve) {
        if (!valve.dettach()) {
            throw new IllegalArgumentException("Cannot add an already dettached valve");
        }
        PipeInstance instance = valve.instance();
        if (online) {
            this.turnOnDefault(instance);
        }
        return this.add(instance, valve.filter(), valve.pipe());
    }

    public synchronized Valve add(PipeInstance instance, Filter filter, Pipe pipe) {
        long query = this.queries.incrementAndGet();
        this.runtime.register(query, filter, new PipeSendingValve(instance));
        RuntimeValve newValve = new RuntimeValve(filter, pipe, instance, query);
        this.valves.add(newValve);
        return newValve;
    }

    public PipeRuntime isolate() {
        return new PipeRuntime(this.source, this.scheduler);
    }

    private synchronized boolean remove(Valve valve) {
        return this.valves.remove(valve);
    }

    public synchronized int size() {
        return this.valves.size();
    }

    public synchronized void destroy() {
        ArrayList<RuntimeValve> list = Lists.newArrayList(this.valves);
        for (Valve valve : list) {
            valve.destroy();
        }
    }

    private void turnOnDefault(PipeInstance instance) {
        SchedulerContext context = this.scheduler.newContext();
        instance.turnOn(context);
        context.start();
    }

    public void flush() {
        this.runtime.flush();
    }

    public PropertySource source() {
        return this.source;
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public class RuntimeValve
    implements Valve {
        private final Filter filter;
        private final Pipe pipe;
        private final PipeInstance instance;
        private final long query;

        public RuntimeValve(Filter filter, Pipe pipe, PipeInstance instance, long query) {
            this.filter = filter;
            this.pipe = pipe;
            this.instance = instance;
            this.query = query;
        }

        public long id() {
            return this.query;
        }

        public Filter filter() {
            return this.filter;
        }

        public Pipe pipe() {
            return this.pipe;
        }

        public PipeInstance instance() {
            return this.instance;
        }

        public Metadata metadata() {
            return this.pipe.metadata();
        }

        public void flowUnfiltered(Object obj) {
            this.instance.flow(obj);
        }

        public void flowManyUnfiltered(Iterable iterable) {
            this.instance.flowMany(iterable);
        }

        public void turnOn() {
            PipeRuntime.this.turnOnDefault(this.instance);
        }

        public void turnOn(SchedulerContext context) {
            this.instance.turnOn(context);
        }

        public void advanceTo(long time) {
            this.instance.advanceTo(time);
        }

        public boolean dettach() {
            PipeRuntime.this.runtime.unregister(this.query);
            return PipeRuntime.this.remove(this);
        }

        public boolean destroy() {
            if (!this.dettach()) {
                return false;
            }
            this.instance.destroy();
            return true;
        }

        public String toString() {
            return FullPipeImpl.makeString(this.filter, this.pipe);
        }
    }

    private static class PipeSendingValve
    implements ObjectSink {
        private final PipeInstance instance;

        public PipeSendingValve(PipeInstance instance) {
            this.instance = instance;
        }

        public void onSingle(Object obj) {
            this.instance.flow(obj);
        }

        public void onBatch(Iterable obj) {
            this.instance.flowMany(obj);
        }
    }

    private static class FilterSinkSink
    extends Sink.Empty {
        private final ObjectSink sink;

        public FilterSinkSink(ObjectSink sink) {
            this.sink = sink;
        }

        public void onRaw(RawEvent raw) {
            this.sink.onBatch((Iterable)raw);
        }
    }
}

