package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.2.jar:org/apache/kafka/streams/processor/internals/AssignedTasks.class */
class AssignedTasks implements RestoringTasks {
    private final Logger log;
    private final String taskTypeName;
    private Map<TaskId, Task> created = new HashMap();
    private Map<TaskId, Task> suspended = new HashMap();
    private Map<TaskId, Task> restoring = new HashMap();
    private Set<TopicPartition> restoredPartitions = new HashSet();
    private Set<TaskId> previousActiveTasks = new HashSet();
    private Map<TaskId, Task> running = new ConcurrentHashMap();
    private Map<TopicPartition, Task> runningByPartition = new HashMap();
    private Map<TopicPartition, Task> restoringByPartition = new HashMap();
    private int committed = 0;
    private final TaskAction maybeCommitAction = new TaskAction() { // from class: org.apache.kafka.streams.processor.internals.AssignedTasks.1
        @Override // org.apache.kafka.streams.processor.internals.TaskAction
        public String name() {
            return "maybeCommit";
        }

        @Override // org.apache.kafka.streams.processor.internals.TaskAction
        public void apply(Task task) {
            if (task.commitNeeded()) {
                AssignedTasks.access$008(AssignedTasks.this);
                task.commit();
                AssignedTasks.this.log.debug("Committed active task {} per user request in", task.id());
            }
        }
    };
    private final TaskAction commitAction = new TaskAction() { // from class: org.apache.kafka.streams.processor.internals.AssignedTasks.2
        @Override // org.apache.kafka.streams.processor.internals.TaskAction
        public String name() {
            return "commit";
        }

        @Override // org.apache.kafka.streams.processor.internals.TaskAction
        public void apply(Task task) {
            task.commit();
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public AssignedTasks(LogContext logContext, String str) {
        this.taskTypeName = str;
        this.log = logContext.logger(getClass());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addNewTask(Task task) {
        this.created.put(task.id(), task);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> uninitializedPartitions() {
        if (this.created.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        for (Map.Entry<TaskId, Task> entry : this.created.entrySet()) {
            if (entry.getValue().hasStateStores()) {
                hashSet.addAll(entry.getValue().partitions());
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> initializeNewTasks() {
        HashSet hashSet = new HashSet();
        if (!this.created.isEmpty()) {
            this.log.debug("Initializing {}s {}", this.taskTypeName, this.created.keySet());
        }
        Iterator<Map.Entry<TaskId, Task>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, Task> next = it.next();
            try {
                if (next.getValue().initializeStateStores()) {
                    transitionToRunning(next.getValue(), hashSet);
                } else {
                    this.log.debug("Transitioning {} {} to restoring", this.taskTypeName, next.getKey());
                    addToRestoring(next.getValue());
                }
                it.remove();
            } catch (LockException e) {
                this.log.trace("Could not create {} {} due to {}; will retry", this.taskTypeName, next.getKey(), e.getMessage());
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> updateRestored(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return Collections.emptySet();
        }
        this.log.trace("{} changelog partitions that have completed restoring so far: {}", this.taskTypeName, collection);
        HashSet hashSet = new HashSet();
        this.restoredPartitions.addAll(collection);
        Iterator<Map.Entry<TaskId, Task>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            Task value = it.next().getValue();
            if (this.restoredPartitions.containsAll(value.changelogPartitions())) {
                transitionToRunning(value, hashSet);
                it.remove();
                this.log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state", this.taskTypeName, value.id(), value.changelogPartitions());
            } else if (this.log.isTraceEnabled()) {
                HashSet hashSet2 = new HashSet(value.changelogPartitions());
                hashSet2.removeAll(this.restoredPartitions);
                this.log.trace("{} {} cannot resume processing yet since some of its changelog partitions have not completed restoring: {}", this.taskTypeName, value.id(), hashSet2);
            }
        }
        if (allTasksRunning()) {
            this.restoredPartitions.clear();
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean allTasksRunning() {
        return this.created.isEmpty() && this.suspended.isEmpty() && this.restoring.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<Task> running() {
        return this.running.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException suspend() {
        AtomicReference atomicReference = new AtomicReference(null);
        this.log.trace("Suspending running {} {}", this.taskTypeName, runningTaskIds());
        atomicReference.compareAndSet(null, suspendTasks(this.running.values()));
        this.log.trace("Close restoring {} {}", this.taskTypeName, this.restoring.keySet());
        atomicReference.compareAndSet(null, closeNonRunningTasks(this.restoring.values()));
        this.log.trace("Close created {} {}", this.taskTypeName, this.created.keySet());
        atomicReference.compareAndSet(null, closeNonRunningTasks(this.created.values()));
        this.previousActiveTasks.clear();
        this.previousActiveTasks.addAll(this.running.keySet());
        this.running.clear();
        this.restoring.clear();
        this.created.clear();
        this.runningByPartition.clear();
        this.restoringByPartition.clear();
        return (RuntimeException) atomicReference.get();
    }

    private RuntimeException closeNonRunningTasks(Collection<Task> collection) {
        RuntimeException runtimeException = null;
        for (Task task : collection) {
            try {
                task.close(false, false);
            } catch (RuntimeException e) {
                this.log.error("Failed to close {}, {}", this.taskTypeName, task.id(), e);
                if (runtimeException == null) {
                    runtimeException = e;
                }
            }
        }
        return runtimeException;
    }

    private RuntimeException suspendTasks(Collection<Task> collection) {
        RuntimeException runtimeException = null;
        Iterator<Task> it = collection.iterator();
        while (it.hasNext()) {
            Task next = it.next();
            try {
                next.suspend();
                this.suspended.put(next.id(), next);
            } catch (TaskMigratedException e) {
                closeZombieTask(next);
                it.remove();
            } catch (RuntimeException e2) {
                this.log.error("Suspending {} {} failed due to the following error:", this.taskTypeName, next.id(), e2);
                try {
                    next.close(false, false);
                } catch (Exception e3) {
                    this.log.error("After suspending failed, closing the same {} {} failed again due to the following error:", this.taskTypeName, next.id(), e3);
                }
                if (runtimeException == null) {
                    runtimeException = e2;
                }
            }
        }
        return runtimeException;
    }

    private void closeZombieTask(Task task) {
        this.log.warn("{} {} got migrated to another thread already. Closing it as zombie.", this.taskTypeName, task.id());
        try {
            task.close(false, true);
        } catch (Exception e) {
            this.log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", this.taskTypeName, task.id(), e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> set) {
        if (!this.suspended.containsKey(taskId)) {
            return false;
        }
        Task task = this.suspended.get(taskId);
        this.log.trace("found suspended {} {}", this.taskTypeName, taskId);
        if (!task.partitions().equals(set)) {
            this.log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, set, task.partitions());
            return false;
        }
        this.suspended.remove(taskId);
        task.resume();
        try {
            transitionToRunning(task, new HashSet());
            this.log.trace("resuming suspended {} {}", this.taskTypeName, task.id());
            return true;
        } catch (TaskMigratedException e) {
            closeZombieTask(task);
            this.suspended.remove(taskId);
            this.running.remove(task.id());
            throw e;
        }
    }

    private void addToRestoring(Task task) {
        this.restoring.put(task.id(), task);
        Iterator<TopicPartition> it = task.partitions().iterator();
        while (it.hasNext()) {
            this.restoringByPartition.put(it.next(), task);
        }
        Iterator<TopicPartition> it2 = task.changelogPartitions().iterator();
        while (it2.hasNext()) {
            this.restoringByPartition.put(it2.next(), task);
        }
    }

    private void transitionToRunning(Task task, Set<TopicPartition> set) {
        this.log.debug("transitioning {} {} to running", this.taskTypeName, task.id());
        this.running.put(task.id(), task);
        task.initializeTopology();
        for (TopicPartition topicPartition : task.partitions()) {
            this.runningByPartition.put(topicPartition, task);
            if (task.hasStateStores()) {
                set.add(topicPartition);
            }
        }
        Iterator<TopicPartition> it = task.changelogPartitions().iterator();
        while (it.hasNext()) {
            this.runningByPartition.put(it.next(), task);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RestoringTasks
    public Task restoringTaskFor(TopicPartition topicPartition) {
        return this.restoringByPartition.get(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Task runningTaskFor(TopicPartition topicPartition) {
        return this.runningByPartition.get(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, Task> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        describe(sb, this.running.values(), str, "Running:");
        describe(sb, this.suspended.values(), str, "Suspended:");
        describe(sb, this.restoring.values(), str, "Restoring:");
        describe(sb, this.created.values(), str, "New:");
        return sb.toString();
    }

    private void describe(StringBuilder sb, Collection<Task> collection, String str, String str2) {
        sb.append(str).append(str2);
        Iterator<Task> it = collection.iterator();
        while (it.hasNext()) {
            sb.append(str).append(it.next().toString(str + "\t\t"));
        }
        sb.append(StringUtils.LF);
    }

    private List<Task> allTasks() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.running.values());
        arrayList.addAll(this.suspended.values());
        arrayList.addAll(this.restoring.values());
        arrayList.addAll(this.created.values());
        return arrayList;
    }

    Collection<Task> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> allAssignedTaskIds() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.running.keySet());
        hashSet.addAll(this.suspended.keySet());
        hashSet.addAll(this.restoring.keySet());
        hashSet.addAll(this.created.keySet());
        return hashSet;
    }

    void clear() {
        this.runningByPartition.clear();
        this.restoringByPartition.clear();
        this.running.clear();
        this.created.clear();
        this.suspended.clear();
        this.restoredPartitions.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> previousTaskIds() {
        return this.previousActiveTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commit() {
        applyToRunningTasks(this.commitAction);
        return this.running.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommit() {
        this.committed = 0;
        applyToRunningTasks(this.maybeCommitAction);
        return this.committed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process() {
        int i = 0;
        Iterator<Map.Entry<TaskId, Task>> it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            Task value = it.next().getValue();
            try {
                if (value.process()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                closeZombieTask(value);
                it.remove();
                throw e;
            } catch (RuntimeException e2) {
                this.log.error("Failed to process {} {} due to the following error:", this.taskTypeName, value.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        int i = 0;
        Iterator<Map.Entry<TaskId, Task>> it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            Task value = it.next().getValue();
            try {
                if (value.maybePunctuateStreamTime()) {
                    i++;
                }
                if (value.maybePunctuateSystemTime()) {
                    i++;
                }
            } catch (TaskMigratedException e) {
                closeZombieTask(value);
                it.remove();
                throw e;
            } catch (KafkaException e2) {
                this.log.error("Failed to punctuate {} {} due to the following error:", this.taskTypeName, value.id(), e2);
                throw e2;
            }
        }
        return i;
    }

    private void applyToRunningTasks(TaskAction taskAction) {
        RuntimeException runtimeException = null;
        Iterator<Task> it = running().iterator();
        while (it.hasNext()) {
            Task next = it.next();
            try {
                taskAction.apply(next);
            } catch (TaskMigratedException e) {
                closeZombieTask(next);
                it.remove();
                if (runtimeException == null) {
                    runtimeException = e;
                }
            } catch (RuntimeException e2) {
                this.log.error("Failed to {} {} {} due to the following error:", taskAction.name(), this.taskTypeName, next.id(), e2);
                if (runtimeException == null) {
                    runtimeException = e2;
                }
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeNonAssignedSuspendedTasks(Map<TaskId, Set<TopicPartition>> map) {
        Iterator<Task> it = this.suspended.values().iterator();
        while (it.hasNext()) {
            Task next = it.next();
            if (!map.containsKey(next.id()) || !next.partitions().equals(map.get(next.id()))) {
                this.log.debug("Closing suspended and not re-assigned {} {}", this.taskTypeName, next.id());
                try {
                    try {
                        next.closeSuspended(true, false, null);
                        it.remove();
                    } catch (Exception e) {
                        this.log.error("Failed to remove suspended {} {} due to the following error:", this.taskTypeName, next.id(), e);
                        it.remove();
                    }
                } catch (Throwable th) {
                    it.remove();
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close(boolean z) {
        close(allTasks(), z);
        clear();
    }

    private void close(Collection<Task> collection, boolean z) {
        for (Task task : collection) {
            try {
                task.close(z, false);
            } catch (Throwable th) {
                this.log.error("Failed while closing {} {} due to the following error:", task.getClass().getSimpleName(), task.id(), th);
            }
        }
    }

    static /* synthetic */ int access$008(AssignedTasks assignedTasks) {
        int i = assignedTasks.committed;
        assignedTasks.committed = i + 1;
        return i;
    }
}
