/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonKeys;
import org.redisson.api.RFuture;
import org.redisson.api.RKeysReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class RedissonKeysReactive
implements RKeysReactive {
    private final CommandReactiveService commandExecutor;
    private final RedissonKeys instance;

    public RedissonKeysReactive(CommandReactiveService commandExecutor) {
        this.instance = new RedissonKeys(commandExecutor);
        this.commandExecutor = commandExecutor;
    }

    @Override
    public Publisher<Integer> getSlot(final String key) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Integer>>(){

            @Override
            public RFuture<Integer> get() {
                return RedissonKeysReactive.this.instance.getSlotAsync(key);
            }
        });
    }

    @Override
    public Publisher<String> getKeysByPattern(String pattern) {
        ArrayList<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
        for (MasterSlaveEntry entry : this.commandExecutor.getConnectionManager().getEntrySet()) {
            publishers.add(this.createKeysIterator(entry, pattern));
        }
        return Flux.merge(publishers);
    }

    @Override
    public Publisher<String> getKeys() {
        return this.getKeysByPattern(null);
    }

    private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) {
        if (pattern == null) {
            return this.commandExecutor.writeReactive(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
        }
        return this.commandExecutor.writeReactive(entry, (Codec)StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
    }

    private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern) {
        return Flux.create((Consumer)new Consumer<FluxSink<String>>(){

            @Override
            public void accept(final FluxSink<String> emitter) {
                emitter.onRequest(new LongConsumer(){
                    private List<String> firstValues;
                    private long nextIterPos;
                    private long currentIndex;

                    @Override
                    public void accept(long value) {
                        this.currentIndex = value;
                        this.nextValues((FluxSink<String>)emitter);
                    }

                    protected void nextValues(final FluxSink<String> emitter2) {
                        RedissonKeysReactive.this.scanIterator(entry, this.nextIterPos, pattern).subscribe((Subscriber)new Subscriber<ListScanResult<String>>(){

                            public void onSubscribe(Subscription s) {
                                s.request(Long.MAX_VALUE);
                            }

                            public void onNext(ListScanResult<String> res) {
                                long prevIterPos = nextIterPos;
                                if (nextIterPos == 0L && firstValues == null) {
                                    firstValues = res.getValues();
                                } else if (res.getValues().equals(firstValues)) {
                                    emitter2.complete();
                                    currentIndex = 0L;
                                    return;
                                }
                                nextIterPos = res.getPos();
                                if (prevIterPos == nextIterPos) {
                                    nextIterPos = -1L;
                                }
                                for (String val : res.getValues()) {
                                    emitter2.next((Object)val);
                                    currentIndex--;
                                    if (currentIndex != 0L) continue;
                                    emitter2.complete();
                                    return;
                                }
                                if (nextIterPos == -1L) {
                                    emitter2.complete();
                                    currentIndex = 0L;
                                }
                            }

                            public void onError(Throwable error) {
                                emitter2.error(error);
                            }

                            public void onComplete() {
                                if (currentIndex == 0L) {
                                    return;
                                }
                                this.nextValues((FluxSink<String>)emitter2);
                            }
                        });
                    }
                });
            }
        });
    }

    @Override
    public Publisher<Collection<String>> findKeysByPattern(final String pattern) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Collection<String>>>(){

            @Override
            public RFuture<Collection<String>> get() {
                return RedissonKeysReactive.this.instance.findKeysByPatternAsync(pattern);
            }
        });
    }

    @Override
    public Publisher<String> randomKey() {
        return this.commandExecutor.reactive(new Supplier<RFuture<String>>(){

            @Override
            public RFuture<String> get() {
                return RedissonKeysReactive.this.instance.randomKeyAsync();
            }
        });
    }

    @Override
    public Publisher<Long> deleteByPattern(final String pattern) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.deleteByPatternAsync(pattern);
            }
        });
    }

    @Override
    public Publisher<Long> delete(final String ... keys) {
        return this.commandExecutor.reactive(new Supplier<RFuture<Long>>(){

            @Override
            public RFuture<Long> get() {
                return RedissonKeysReactive.this.instance.deleteAsync(keys);
            }
        });
    }

    @Override
    public Publisher<Void> flushdb() {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.flushdbAsync();
            }
        });
    }

    @Override
    public Publisher<Void> flushall() {
        return this.commandExecutor.reactive(new Supplier<RFuture<Void>>(){

            @Override
            public RFuture<Void> get() {
                return RedissonKeysReactive.this.instance.flushallAsync();
            }
        });
    }
}

