/*
 * Decompiled with CFR 0.152.
 */
package tech.powerjob.worker.core.executor;

import akka.actor.ActorSelection;
import com.google.common.base.Stopwatch;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.serialize.SerializerUtils;
import tech.powerjob.worker.common.ThreadLocalStore;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.common.utils.WorkflowContextUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;

public class ProcessorRunnable
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ProcessorRunnable.class);
    private final InstanceInfo instanceInfo;
    private final ActorSelection taskTrackerActor;
    private final TaskDO task;
    private final BasicProcessor processor;
    private final OmsLogger omsLogger;
    private final ClassLoader classLoader;
    private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private final WorkerRuntime workerRuntime;

    public void innerRun() throws InterruptedException {
        ProcessResult processResult;
        String taskId = this.task.getTaskId();
        Long instanceId = this.task.getInstanceId();
        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", new Object[]{instanceId, taskId, this.task.getTaskName()});
        ThreadLocalStore.setTask(this.task);
        ThreadLocalStore.setRuntimeMeta(this.workerRuntime);
        WorkflowContext workflowContext = this.constructWorkflowContext();
        TaskContext taskContext = this.constructTaskContext();
        taskContext.setWorkflowContext(workflowContext);
        this.reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);
        ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
        if ("OMS_ROOT_TASK".equals(this.task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
            this.handleBroadcastRootTask(instanceId, taskContext);
            return;
        }
        if ("OMS_LAST_TASK".equals(this.task.getTaskName())) {
            this.handleLastTask(taskId, instanceId, taskContext, executeType);
            return;
        }
        try {
            processResult = this.processor.process(taskContext);
            if (processResult == null) {
                processResult = new ProcessResult(false, "ProcessResult can't be null");
            }
        }
        catch (Throwable e) {
            log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", new Object[]{instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e});
            processResult = new ProcessResult(false, e.toString());
        }
        this.reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, this.suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
    }

    private TaskContext constructTaskContext() {
        TaskContext taskContext = new TaskContext();
        BeanUtils.copyProperties((Object)this.task, (Object)taskContext);
        taskContext.setJobId(this.instanceInfo.getJobId());
        taskContext.setMaxRetryTimes(this.instanceInfo.getTaskRetryNum());
        taskContext.setCurrentRetryTimes(this.task.getFailedCnt());
        taskContext.setJobParams(this.instanceInfo.getJobParams());
        taskContext.setInstanceParams(this.instanceInfo.getInstanceParams());
        taskContext.setOmsLogger(this.omsLogger);
        if (this.task.getTaskContent() != null && this.task.getTaskContent().length > 0) {
            taskContext.setSubTask(SerializerUtils.deSerialized((byte[])this.task.getTaskContent()));
        }
        taskContext.setUserContext(this.workerRuntime.getWorkerConfig().getUserContext());
        return taskContext;
    }

    private WorkflowContext constructWorkflowContext() {
        return new WorkflowContext(this.instanceInfo.getWfInstanceId(), this.instanceInfo.getInstanceParams());
    }

    private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
        ProcessResult processResult;
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", (Object)instanceId, (Object)taskId);
        List<TaskResult> taskResults = this.workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, this.task.getSubInstanceId());
        try {
            switch (executeType) {
                case BROADCAST: {
                    if (this.processor instanceof BroadcastProcessor) {
                        BroadcastProcessor broadcastProcessor = (BroadcastProcessor)this.processor;
                        processResult = broadcastProcessor.postProcess(taskContext, taskResults);
                        break;
                    }
                    processResult = BroadcastProcessor.defaultResult(taskResults);
                    break;
                }
                case MAP_REDUCE: {
                    if (this.processor instanceof MapReduceProcessor) {
                        MapReduceProcessor mapReduceProcessor = (MapReduceProcessor)this.processor;
                        processResult = mapReduceProcessor.reduce(taskContext, taskResults);
                        break;
                    }
                    processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
                    break;
                }
                default: {
                    processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
                    break;
                }
            }
        }
        catch (Throwable e) {
            processResult = new ProcessResult(false, e.toString());
            log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", new Object[]{instanceId, taskId, e});
        }
        TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
        this.reportStatus(status, this.suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());
        log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", (Object)instanceId, (Object)stopwatch);
    }

    private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) {
        ProcessResult processResult;
        if (this.processor instanceof BroadcastProcessor) {
            BroadcastProcessor broadcastProcessor = (BroadcastProcessor)this.processor;
            try {
                processResult = broadcastProcessor.preProcess(taskContext);
            }
            catch (Throwable e) {
                log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", (Object)instanceId, (Object)e);
                processResult = new ProcessResult(false, e.toString());
            }
        } else {
            processResult = new ProcessResult(true, "NO_PREPOST_TASK");
        }
        this.reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, this.suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData());
    }

    private void reportStatus(TaskStatus status, String result, Integer cmd, Map<String, String> appendedWfContext) {
        ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
        req.setInstanceId(this.task.getInstanceId());
        req.setSubInstanceId(this.task.getSubInstanceId());
        req.setTaskId(this.task.getTaskId());
        req.setStatus(status.getValue());
        req.setResult(result);
        req.setReportTime(System.currentTimeMillis());
        req.setCmd(cmd);
        if (this.instanceInfo.getWfInstanceId() != null && WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
            log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", (Object)this.instanceInfo.getInstanceId(), (Object)this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
            appendedWfContext = Collections.emptyMap();
        }
        req.setAppendedWfContext(appendedWfContext);
        if (TaskStatus.finishedStatus.contains(status.getValue())) {
            boolean success = AkkaUtils.reliableTransmit(this.taskTrackerActor, req);
            if (!success) {
                this.statusReportRetryQueue.add(req);
                log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", new Object[]{this.task.getInstanceId(), this.task.getTaskId(), status, result});
            }
        } else {
            this.taskTrackerActor.tell((Object)req, null);
        }
    }

    @Override
    public void run() {
        Thread.currentThread().setContextClassLoader(this.classLoader);
        try {
            this.innerRun();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (Throwable e) {
            this.reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);
            log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", (Object)this.task.getInstanceId(), (Object)e);
        }
        finally {
            ThreadLocalStore.clear();
        }
    }

    private String suit(String result) {
        if (StringUtils.isEmpty((Object)result)) {
            return "";
        }
        int maxLength = this.workerRuntime.getWorkerConfig().getMaxResultLength();
        if (result.length() <= maxLength) {
            return result;
        }
        log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.", new Object[]{this.task.getInstanceId(), this.task.getTaskId(), result.length(), maxLength});
        return result.substring(0, maxLength).concat("...");
    }

    public ProcessorRunnable(InstanceInfo instanceInfo, ActorSelection taskTrackerActor, TaskDO task, BasicProcessor processor, OmsLogger omsLogger, ClassLoader classLoader, Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue, WorkerRuntime workerRuntime) {
        this.instanceInfo = instanceInfo;
        this.taskTrackerActor = taskTrackerActor;
        this.task = task;
        this.processor = processor;
        this.omsLogger = omsLogger;
        this.classLoader = classLoader;
        this.statusReportRetryQueue = statusReportRetryQueue;
        this.workerRuntime = workerRuntime;
    }
}

