package br.pucrio.tecgraf.soma.websocketnotifier.application.service;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/websocketnotifier/application/service/KtableService.class */
public class KtableService {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) KtableService.class);
    private final String KSQLDB_URL;
    private final String KAFKA_NOTIFICATION_TOPIC;

    @Autowired
    public KtableService(ServiceConfiguration serviceConfiguration) {
        this.KSQLDB_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_KSQLDB_URL.option.getLongName());
        this.KAFKA_NOTIFICATION_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_NOTIFICATION_TOPIC.option.getLongName());
    }

    private String getLastUserOffsetKQuery(WebsocketNotifierInfo websocketNotifierInfo) {
        return "{\"ksql\": \"select * from ktable_websocket_notifier where ROWKEY = '" + websocketNotifierInfo.getUser() + "|+|" + websocketNotifierInfo.getApplication() + "|+|" + websocketNotifierInfo.getTopicName() + "';\", \"streamsProperties\": {}}";
    }

    private String getKStreamCreateKSQL() {
        return "{ \"ksql\": \"CREATE STREAM websocket_notifier_stream (user VARCHAR, application VARCHAR, topic_name VARCHAR, offset BIGINT) WITH (KAFKA_TOPIC = '" + this.KAFKA_NOTIFICATION_TOPIC + "', VALUE_FORMAT='JSON');\",  \"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"} }";
    }

    private String getKTableCreateKSQL() {
        return "{ \"ksql\": \"CREATE TABLE ktable_websocket_notifier AS SELECT user, application, topic_name, MAX(offset) AS offset FROM WEBSOCKET_NOTIFIER_STREAM GROUP BY user, application, topic_name;\",  \"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"} }";
    }

    private WebTarget getWebTarget(String str) {
        return ClientBuilder.newClient().target(str);
    }

    protected Response makeRequest(String str, String str2, String str3) {
        this.logger.info("makeRequest {} {}", str2, str3);
        return getWebTarget(str2).path(str3).request("application/vnd.ksql.v1+json").post(Entity.json(str));
    }

    public WebsocketNotifierInfo getLastUserOffset(WebsocketNotifierInfo websocketNotifierInfo) {
        String lastUserOffsetKQuery = getLastUserOffsetKQuery(websocketNotifierInfo);
        try {
            this.logger.debug(lastUserOffsetKQuery);
            Response makeRequest = makeRequest(lastUserOffsetKQuery, this.KSQLDB_URL, "query");
            if (makeRequest.getStatus() != 200) {
                this.logger.error("Error during getLastUserOffset.", makeRequest.getEntity());
            }
            return parseJsonArray(websocketNotifierInfo, (String) makeRequest.readEntity(String.class));
        } catch (ProcessingException e) {
            this.logger.error("Error during getLastUserOffset.", (Throwable) e);
            return websocketNotifierInfo;
        }
    }

    private boolean ktableDoesNotExists(Object obj) {
        if (!(obj instanceof JsonObject)) {
            return false;
        }
        JsonObject jsonObject = (JsonObject) obj;
        return jsonObject.has("@type") && jsonObject.get("@type").getAsString().equals("statement_error") && jsonObject.get("error_code").getAsString().equals("40001");
    }

    private boolean ktableReturnHasTuple(Object obj) {
        return (obj instanceof JsonArray) && ((JsonArray) obj).size() > 1;
    }

    private WebsocketNotifierInfo parseJsonArray(WebsocketNotifierInfo websocketNotifierInfo, String str) {
        JsonElement parse = new JsonParser().parse(str);
        if (ktableDoesNotExists(parse)) {
            if (createKtableNotifier()) {
                return getLastUserOffset(websocketNotifierInfo);
            }
            this.logger.info("Websocket notification topic does not exist yet.");
            return websocketNotifierInfo;
        }
        if (ktableReturnHasTuple(parse)) {
            websocketNotifierInfo.setOffset(Integer.valueOf(((JsonArray) parse).get(1).getAsJsonObject().get("row").getAsJsonObject().get("columns").getAsJsonArray().get(5).getAsInt()));
            return websocketNotifierInfo;
        }
        this.logger.warn("Json return empty from KSQL query: {}", str);
        return websocketNotifierInfo;
    }

    private boolean createKSQLAccessStructure(String str) {
        try {
            Response makeRequest = makeRequest(str, this.KSQLDB_URL, "ksql");
            this.logger.debug("Response create KSQL ACCESS STRUCTURE", makeRequest.readEntity(String.class));
            if (makeRequest.getStatus() == 200) {
                return true;
            }
            this.logger.error("Error while creating KSQL ACCESS STRUCTURE URI {} RESPONSE {} ", this.KSQLDB_URL, makeRequest);
            return false;
        } catch (ProcessingException e) {
            this.logger.error("Response create KSQL ACCESS STRUCTURE", (Throwable) e);
            return false;
        }
    }

    protected boolean createKtableNotifier() {
        this.logger.info("Creating KTABLE websocket notifier");
        return createKSQLAccessStructure(getKStreamCreateKSQL()) && createKSQLAccessStructure(getKTableCreateKSQL());
    }
}
