package br.pucrio.tecgraf.soma.job.log.monitor.impl;

import br.pucrio.tecgraf.soma.job.log.monitor.event.FileChunk;
import br.pucrio.tecgraf.soma.job.log.monitor.event.FileChunkEvent;
import br.pucrio.tecgraf.soma.job.log.monitor.model.LogFileMonitoredResource;
import br.pucrio.tecgraf.soma.job.log.reader.FileReader;
import br.pucrio.tecgraf.soma.job.log.watcher.event.FileEvent;
import br.pucrio.tecgraf.soma.job.log.watcher.impl.DefaultFileWatcher;
import br.pucrio.tecgraf.soma.job.log.watcher.impl.JobLogFileWatcher;
import br.pucrio.tecgraf.soma.job.log.watcher.impl.PollingFileWatcher;
import br.pucrio.tecgraf.soma.job.log.watcher.interfaces.IFileWatchEventListener;
import br.pucrio.tecgraf.soma.job.log.watcher.interfaces.IFileWatcher;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitor;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorEvent;
import br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitorListener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/job/log/monitor/impl/JobLogMonitor.class */
public class JobLogMonitor implements ResourceMonitor {
    public static final String TIMESTAMP_PARAMETER = "timestamp";
    public static final String ENCODING_PARAMETER = "encoding";
    private final Logger LOG;
    private final Map<String, LogFileMonitoredResource> monitoredResourcesByPath;
    private final Map<String, JobLogFileWatcher> watchersByParentPath;
    private final Map<String, Set<ResourceMonitorListener<ResourceMonitorEvent>>> listenersByPath;
    private final ExecutorService threadPool;
    private final FileReader fileReader;
    private boolean enableWatcherPolling;
    private Integer watcherThreadPoolSize;
    private Integer watcherPollingIntervalMillis;
    private boolean allListenersWereRemoved;

    public JobLogMonitor(Integer num, boolean z, Integer num2, Integer num3, Charset charset, boolean z2) {
        this(Executors.newCachedThreadPool(), new FileReader(num, charset, z2), z, num2, num3, new ConcurrentHashMap(), new ConcurrentHashMap(), new ConcurrentHashMap());
    }

    public JobLogMonitor(ExecutorService executorService, FileReader fileReader, boolean z, Integer num, Integer num2, Map<String, LogFileMonitoredResource> map, Map<String, JobLogFileWatcher> map2, Map<String, Set<ResourceMonitorListener<ResourceMonitorEvent>>> map3) {
        this.LOG = LoggerFactory.getLogger((Class<?>) JobLogMonitor.class);
        this.allListenersWereRemoved = false;
        this.monitoredResourcesByPath = map;
        this.watchersByParentPath = map2;
        this.listenersByPath = map3;
        this.fileReader = fileReader;
        this.threadPool = executorService;
        this.enableWatcherPolling = z;
        this.watcherThreadPoolSize = num;
        this.watcherPollingIntervalMillis = num2;
    }

    @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitor
    public synchronized void addListener(String str, ResourceMonitorListener<ResourceMonitorEvent> resourceMonitorListener, Map<String, Object> map) {
        this.LOG.info("Starting monitor with: [filepath={}, timestamp={}, encoding={}]", str, getTimestampFromArgs(map), getEncodingFromArgs(map));
        createMonitoredResource(str, resourceMonitorListener, getTimestampFromArgs(map), getEncodingFromArgs(map));
        try {
            createFileWatcher(str);
        } catch (FileNotFoundException e) {
            this.LOG.error("File {} not found", str);
            resourceMonitorListener.onError(e);
        } catch (IOException e2) {
            this.LOG.error("Cannot create watch service!");
            resourceMonitorListener.onError(e2);
        }
        resourceMonitorListener.onSubscribe(createSubscription(str, resourceMonitorListener));
    }

    private Flow.Subscription createSubscription(final String str, final ResourceMonitorListener<ResourceMonitorEvent> resourceMonitorListener) {
        return new Flow.Subscription() { // from class: br.pucrio.tecgraf.soma.job.log.monitor.impl.JobLogMonitor.1
            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                JobLogMonitor.this.removeListener(str, resourceMonitorListener);
            }
        };
    }

    @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitor
    public synchronized void removeListener(String str, ResourceMonitorListener<ResourceMonitorEvent> resourceMonitorListener) {
        this.LOG.info("Stopping monitor for ID [{}]", str);
        this.allListenersWereRemoved = false;
        this.listenersByPath.get(str).remove(resourceMonitorListener);
        if (this.listenersByPath.get(str).isEmpty()) {
            this.listenersByPath.remove(str);
            this.monitoredResourcesByPath.remove(str);
            String parentDirFromPath = getParentDirFromPath(str);
            JobLogFileWatcher jobLogFileWatcher = this.watchersByParentPath.get(parentDirFromPath);
            if (jobLogFileWatcher != null) {
                jobLogFileWatcher.removeMonitoredFilePath(str);
                if (jobLogFileWatcher.isMonitoredFilePathsEmpty()) {
                    try {
                        this.LOG.debug("Closing watcher for events...");
                        jobLogFileWatcher.close();
                    } catch (IOException e) {
                        this.LOG.error("Cannot close the watcher for directory: {}", parentDirFromPath, e);
                        e.printStackTrace();
                    }
                    this.LOG.debug("WATCHER: removing watcher from MAP");
                    this.watchersByParentPath.remove(parentDirFromPath);
                }
            } else {
                this.LOG.debug("Ignoring non existent watcher removal for file path {}!", str);
            }
            this.allListenersWereRemoved = true;
        }
    }

    @Override // br.pucrio.tecgraf.soma.logsmonitor.monitor.ResourceMonitor
    public List<ResourceMonitorEvent> getEvents(String str, Long l, Long l2) {
        FileChunk readFile;
        LogFileMonitoredResource logFileMonitoredResource = this.monitoredResourcesByPath.get(str);
        LinkedList linkedList = new LinkedList();
        if (l != null && l2 != null) {
            Long l3 = l;
            while (l3.longValue() < l2.longValue() && (readFile = readFile(str, l3, Integer.valueOf((int) (l2.longValue() - l3.longValue())), logFileMonitoredResource.getEncoding())) != null) {
                FileChunkEvent fileChunkEvent = new FileChunkEvent(readFile);
                l3 = fileChunkEvent.getEndSeqnum();
                linkedList.add(fileChunkEvent);
            }
        }
        return linkedList;
    }

    public void finishMonitoring() {
        this.LOG.info("Shutting down executor threads...");
        this.threadPool.shutdown();
    }

    public boolean isAllListenersWereRemoved() {
        return this.allListenersWereRemoved;
    }

    private synchronized void createFileWatcher(String str) throws IOException {
        String parentDirFromPath = getParentDirFromPath(str);
        File file = Paths.get(str, new String[0]).toFile();
        this.LOG.debug("File Name and Parent Dir Name: {} : {} ({})", new File(str).getName(), new File(parentDirFromPath).getName(), str);
        if (!file.exists() || !file.isFile()) {
            this.LOG.debug("File {} not found", str);
            throw new FileNotFoundException("File not found");
        }
        if (!file.canRead()) {
            this.LOG.debug("File {} is not readable", str);
            throw new FileNotFoundException("File is not readable");
        }
        if (this.watchersByParentPath.containsKey(parentDirFromPath)) {
            this.LOG.debug("Add file Path to watcher: {}", str);
            this.watchersByParentPath.get(parentDirFromPath).addMonitoredFilePath(str);
        } else {
            JobLogFileWatcher createJobLogFileWatcher = createJobLogFileWatcher();
            createJobLogFileWatcher.addMonitoredFilePath(str);
            this.watchersByParentPath.put(parentDirFromPath, createJobLogFileWatcher);
            this.threadPool.execute(() -> {
                this.LOG.debug("Thread is executing for filePath: [{}]", str);
                try {
                    this.LOG.debug("Adding FileWatchEventListener...");
                    createJobLogFileWatcher.addFileWatchEventListener(createFileWatchEventListener());
                    this.LOG.debug("Registering WatchService...");
                    createJobLogFileWatcher.register();
                    this.LOG.debug("Start watch service to take for events...");
                    createJobLogFileWatcher.startWatch();
                } catch (IOException e) {
                    this.LOG.error("Watch was not registered", (Throwable) e);
                    notifyErrorsToListeners(str, e);
                } catch (InterruptedException e2) {
                    this.LOG.error("Watcher was interrupted:", (Throwable) e2);
                    notifyErrorsToListeners(str, e2);
                }
                this.LOG.debug("Thread is stopping for path: [{}]", str);
            });
        }
    }

    protected JobLogFileWatcher createJobLogFileWatcher() throws IOException {
        IFileWatcher defaultFileWatcher;
        if (this.enableWatcherPolling) {
            defaultFileWatcher = new PollingFileWatcher(this.watcherPollingIntervalMillis);
            this.LOG.debug("Creating PollingFileWatcher...");
        } else {
            defaultFileWatcher = new DefaultFileWatcher(this.watcherThreadPoolSize);
            this.LOG.debug("Creating DefaultFileWatcher...");
        }
        return new JobLogFileWatcher(defaultFileWatcher);
    }

    private IFileWatchEventListener createFileWatchEventListener() {
        return new IFileWatchEventListener() { // from class: br.pucrio.tecgraf.soma.job.log.monitor.impl.JobLogMonitor.2
            @Override // br.pucrio.tecgraf.soma.job.log.watcher.interfaces.IFileWatchEventListener
            public void onFileModified(FileEvent fileEvent) {
                File source = fileEvent.getSource();
                String absolutePath = Paths.get(source.getAbsolutePath(), new String[0]).toFile().getAbsolutePath();
                LogFileMonitoredResource logFileMonitoredResource = JobLogMonitor.this.monitoredResourcesByPath.get(absolutePath);
                FileChunk readFile = JobLogMonitor.this.readFile(absolutePath, logFileMonitoredResource.getCurrentTimestamp(), null, logFileMonitoredResource.getEncoding());
                if (readFile != null) {
                    JobLogMonitor.this.LOG.debug("Subscription details [file: {}, current timestamp: {}, new timestamp: {}] ", source.getName(), logFileMonitoredResource.getCurrentTimestamp(), readFile.getFileLength());
                    for (ResourceMonitorListener<ResourceMonitorEvent> resourceMonitorListener : JobLogMonitor.this.listenersByPath.get(absolutePath)) {
                        JobLogMonitor.this.LOG.debug("Got monitor listener for {}", absolutePath);
                        logFileMonitoredResource.setCurrentTimestamp(readFile.getFileLength());
                        JobLogMonitor.this.threadPool.execute(() -> {
                            JobLogMonitor.this.LOG.debug("Calling monitor listener onNext");
                            resourceMonitorListener.onNext(new FileChunkEvent(readFile));
                        });
                    }
                }
            }
        };
    }

    private void createMonitoredResource(String str, ResourceMonitorListener<ResourceMonitorEvent> resourceMonitorListener, Long l, Charset charset) {
        if (!this.monitoredResourcesByPath.containsKey(str)) {
            this.monitoredResourcesByPath.put(str, new LogFileMonitoredResource(str, l, charset));
        }
        if (!this.listenersByPath.containsKey(str)) {
            this.listenersByPath.put(str, new HashSet());
        }
        if (this.listenersByPath.get(str).add(resourceMonitorListener)) {
            return;
        }
        this.LOG.debug("Listener for path {} already registered!", str);
    }

    private void notifyErrorsToListeners(String str, Throwable th) {
        Set<ResourceMonitorListener<ResourceMonitorEvent>> set = this.listenersByPath.get(str);
        if (set == null || set.isEmpty()) {
            return;
        }
        set.forEach(resourceMonitorListener -> {
            resourceMonitorListener.onError(th);
        });
    }

    private FileChunk readFile(String str, Long l, Integer num, Charset charset) {
        try {
            FileChunk readFile = this.fileReader.readFile(str, l, num, charset);
            if (readFile == null || readFile.getData().length() == 0) {
                return null;
            }
            this.LOG.debug("FileChunk [file: {}, fileLength:{}, offset:{}, data length: {}] ", readFile.getPath().toFile().getName(), readFile.getFileLength(), readFile.getOffset(), readFile.getLength());
            return readFile;
        } catch (IOException e) {
            this.LOG.error("Cannot read file chunk from file [{}, {}, {}, {}]", str, l, this.fileReader.getMaxLengthSize(), e.getMessage());
            notifyErrorsToListeners(str, e);
            return null;
        }
    }

    private Long getTimestampFromArgs(Map<String, Object> map) {
        return (Long) map.get("timestamp");
    }

    private Charset getEncodingFromArgs(Map<String, Object> map) {
        return (Charset) map.get("encoding");
    }

    private String getParentDirFromPath(String str) {
        return Paths.get(str, new String[0]).getParent().toString();
    }
}
