package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.2.jar:org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.class */
public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
    private final KTableImpl<K, ?, V> parent1;
    private final KTableImpl<K, ?, V> parent2;
    private final String queryableName;
    private boolean sendOldValues = false;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.2.jar:org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.class */
    private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
        private KeyValueStore<K, V> store;
        private TupleForwarder<K, V> tupleForwarder;

        private KTableKTableJoinMergeProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            if (KTableKTableJoinMerger.this.queryableName != null) {
                this.store = (KeyValueStore) processorContext.getStateStore(KTableKTableJoinMerger.this.queryableName);
                this.tupleForwarder = new TupleForwarder<>(this.store, processorContext, new ForwardingCacheFlushListener(processorContext, KTableKTableJoinMerger.this.sendOldValues), KTableKTableJoinMerger.this.sendOldValues);
            }
        }

        public void process(K k, Change<V> change) {
            if (KTableKTableJoinMerger.this.queryableName == null) {
                context().forward(k, change);
            } else {
                this.store.put(k, change.newValue);
                this.tupleForwarder.maybeForward(k, change.newValue, change.oldValue);
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableKTableJoinMergeProcessor) obj, (Change) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KTableKTableJoinMerger(KTableImpl<K, ?, V> kTableImpl, KTableImpl<K, ?, V> kTableImpl2, String str) {
        this.parent1 = kTableImpl;
        this.parent2 = kTableImpl2;
        this.queryableName = str;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, Change<V>> get() {
        return new KTableKTableJoinMergeProcessor();
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, V> view() {
        return this.queryableName != null ? new KTableMaterializedValueGetterSupplier(this.queryableName) : new KTableValueGetterSupplier<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger.1
            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public KTableValueGetter<K, V> get() {
                return KTableKTableJoinMerger.this.parent1.valueGetterSupplier().get();
            }

            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public String[] storeNames() {
                String[] storeNames = KTableKTableJoinMerger.this.parent1.valueGetterSupplier().storeNames();
                String[] storeNames2 = KTableKTableJoinMerger.this.parent2.valueGetterSupplier().storeNames();
                String[] strArr = new String[storeNames.length + storeNames2.length];
                int i = 0;
                for (String str : storeNames) {
                    strArr[i] = str;
                    i++;
                }
                for (String str2 : storeNames2) {
                    strArr[i] = str2;
                    i++;
                }
                return strArr;
            }
        };
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public void enableSendingOldValues() {
        this.parent1.enableSendingOldValues();
        this.parent2.enableSendingOldValues();
        this.sendOldValues = true;
    }
}
