package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.2.jar:org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.class */
public class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]> {
    private final SessionStore<Bytes, byte[]> bytesStore;
    private StoreChangeLogger<Bytes, byte[]> changeLogger;
    private StateSerdes<Bytes, byte[]> innerStateSerde;
    private String topic;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangeLoggingSessionBytesStore(SessionStore<Bytes, byte[]> sessionStore) {
        super(sessionStore);
        this.bytesStore = sessionStore;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.bytesStore.init(processorContext, stateStore);
        this.topic = ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), this.bytesStore.name());
        this.innerStateSerde = WindowStoreUtils.getInnerStateSerde(this.topic);
        this.changeLogger = new StoreChangeLogger<>(name(), processorContext, this.innerStateSerde);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, long j, long j2) {
        return this.bytesStore.findSessions(bytes, j, j2);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes bytes, Bytes bytes2, long j, long j2) {
        return this.bytesStore.findSessions(bytes, bytes2, j, j2);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void remove(Windowed<Bytes> windowed) {
        this.bytesStore.remove(windowed);
        this.changeLogger.logChange(SessionKeySerde.toBinary(windowed, this.innerStateSerde.keySerializer(), this.topic), null);
    }

    @Override // org.apache.kafka.streams.state.SessionStore
    public void put(Windowed<Bytes> windowed, byte[] bArr) {
        this.bytesStore.put(windowed, bArr);
        this.changeLogger.logChange(SessionKeySerde.bytesToBinary(windowed), bArr);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes) {
        return findSessions(bytes, 0L, Long.MAX_VALUE);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlySessionStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2) {
        return findSessions(bytes, bytes2, 0L, Long.MAX_VALUE);
    }
}
