/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.redisson.RedissonTransferQueue;
import org.redisson.api.RFuture;
import org.redisson.reactive.ElementsStream;
import org.redisson.reactive.PublisherAdder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class RedissonTransferQueueReactive<V> {
    private final RedissonTransferQueue<V> queue;

    public RedissonTransferQueueReactive(RedissonTransferQueue<V> queue) {
        this.queue = queue;
    }

    public Flux<V> takeElements() {
        return ElementsStream.takeElements(this.queue::takeAsync);
    }

    public Publisher<V> iterator() {
        return Flux.create((Consumer)new Consumer<FluxSink<V>>(){

            @Override
            public void accept(final FluxSink<V> emitter) {
                emitter.onRequest(new LongConsumer(){
                    int currentIndex = 0;

                    @Override
                    public void accept(long value) {
                        this.onRequest(true, emitter, value);
                    }

                    protected void onRequest(boolean forward, FluxSink<V> emitter2, long n) {
                        RedissonTransferQueueReactive.this.queue.getValueAsync(this.currentIndex).onComplete((value, e) -> {
                            if (e != null) {
                                emitter2.error(e);
                                return;
                            }
                            if (value != null) {
                                emitter2.next(value);
                                this.currentIndex = forward ? ++this.currentIndex : --this.currentIndex;
                            }
                            if (value == null) {
                                emitter2.complete();
                                return;
                            }
                            if (n - 1L == 0L) {
                                return;
                            }
                            this.onRequest(forward, emitter2, n - 1L);
                        });
                    }
                });
            }
        });
    }

    public Publisher<Boolean> addAll(Publisher<? extends V> c) {
        return new PublisherAdder<V>(){

            @Override
            public RFuture<Boolean> add(Object o) {
                return RedissonTransferQueueReactive.this.queue.addAsync(o);
            }
        }.addAll(c);
    }
}

