/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.OrderedBytes;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.Segments;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;

class WindowKeySchema
implements SegmentedBytesStore.KeySchema {
    private static final int SUFFIX_SIZE = 12;
    private static final byte[] MIN_SUFFIX = new byte[12];
    private StateSerdes<Bytes, byte[]> serdes;

    WindowKeySchema() {
    }

    @Override
    public void init(String topic) {
        this.serdes = new StateSerdes(topic, Serdes.Bytes(), Serdes.ByteArray());
    }

    @Override
    public Bytes upperRange(Bytes key, long to) {
        byte[] maxSuffix = ByteBuffer.allocate(12).putLong(to).putInt(Integer.MAX_VALUE).array();
        return OrderedBytes.upperRange(key, maxSuffix);
    }

    @Override
    public Bytes lowerRange(Bytes key, long from) {
        return OrderedBytes.lowerRange(key, MIN_SUFFIX);
    }

    @Override
    public Bytes lowerRangeFixedSize(Bytes key, long from) {
        return WindowStoreUtils.toBinaryKey(key, Math.max(0L, from), 0, this.serdes);
    }

    @Override
    public Bytes upperRangeFixedSize(Bytes key, long to) {
        return WindowStoreUtils.toBinaryKey(key, to, Integer.MAX_VALUE, this.serdes);
    }

    @Override
    public long segmentTimestamp(Bytes key) {
        return WindowStoreUtils.timestampFromBinaryKey(key.get());
    }

    @Override
    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
        return new HasNextCondition(){

            @Override
            public boolean hasNext(KeyValueIterator<Bytes, ?> iterator) {
                while (iterator.hasNext()) {
                    Bytes bytes = iterator.peekNextKey();
                    Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get());
                    long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
                    if (keyBytes.compareTo(binaryKeyFrom) >= 0 && keyBytes.compareTo(binaryKeyTo) <= 0 && time >= from && time <= to) {
                        return true;
                    }
                    iterator.next();
                }
                return false;
            }
        };
    }

    @Override
    public List<Segment> segmentsToSearch(Segments segments, long from, long to) {
        return segments.segments(from, to);
    }
}

