package com.mulesoft.connectivity.rest.commons.api.source;

import com.mulesoft.connectivity.rest.commons.api.config.RestConfiguration;
import com.mulesoft.connectivity.rest.commons.api.connection.RestConnection;
import com.mulesoft.connectivity.rest.commons.api.operation.HttpResponseResult;
import com.mulesoft.connectivity.rest.commons.internal.http.HttpResponseAttributes;
import com.mulesoft.connectivity.rest.commons.internal.util.MediaTypeUtils;
import com.mulesoft.connectivity.rest.commons.internal.util.RestUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.el.ExpressionLanguage;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectivity/rest/commons/api/source/RestPollingSource.class */
public abstract class RestPollingSource<C extends RestConnection, S extends Serializable, A> extends PollingSource<InputStream, A> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RestPollingSource.class);

    @Connection
    protected ConnectionProvider<C> connectionProvider;

    @Inject
    private ExpressionLanguage expressionLanguage;
    protected C connection;

    @Config
    protected RestConfiguration config;
    protected RestPollingSourceStrategy<S, A> restPollingSourceStrategy;

    protected ExpressionLanguage getExpressionLanguage() {
        return this.expressionLanguage;
    }

    protected abstract RestPollingSourceStrategy<S, A> getRestPollingSourceStrategy() throws MuleException;

    protected MediaType getDefaultResponseMediaType() {
        return MediaType.APPLICATION_JSON;
    }

    protected void doStart() throws MuleException {
        this.connection = (C) this.connectionProvider.connect();
        this.restPollingSourceStrategy = getRestPollingSourceStrategy();
    }

    protected void doStop() {
        this.connectionProvider.disconnect(this.connection);
    }

    private HttpResponseResult send(HttpRequest httpRequest, PollContext<InputStream, A> pollContext, MediaType mediaType) {
        String simpleName = getClass().getSimpleName();
        try {
            return this.connection.send(httpRequest, mediaType);
        } catch (AccessTokenExpiredException e) {
            pollContext.onConnectionException(new ConnectionException(e));
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Exception found while executing poll: '%s'. Access token expiration. '%s'", simpleName, e.getMessage())), e);
        } catch (Exception e2) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Exception found while executing poll: '%s'. '%s'", simpleName, e2.getMessage())), e2);
        }
    }

    public void poll(PollContext<InputStream, A> pollContext) {
        Optional<S> apply = this.restPollingSourceStrategy.getLastWatermark().apply(pollContext);
        HttpResponseResult send = send(getRequest(apply), pollContext, MediaTypeUtils.resolveDefaultResponseMediaType(this.config, getDefaultResponseMediaType()));
        TypedValue<String> consumeStringAndClose = RestUtils.consumeStringAndClose(send.getEntityContent(), send.getMediaType(), send.getCharset());
        if (StringUtils.isBlank((String) consumeStringAndClose.getValue())) {
            return;
        }
        HttpResponseAttributes httpResponseAttributes = send.getHttpResponseAttributes();
        List<TypedValue<String>> extractItems = this.restPollingSourceStrategy.extractItems(apply, consumeStringAndClose, httpResponseAttributes.getStatusCode(), httpResponseAttributes.getReasonPhrase(), httpResponseAttributes.getHeaders());
        if (Objects.isNull(extractItems)) {
            throw new IllegalArgumentException("Extracted items must not be null. An empty list must be returned instead of null.");
        }
        for (TypedValue<String> typedValue : extractItems) {
            pollContext.accept(getPollItemConsumer(apply, consumeStringAndClose, typedValue, this.restPollingSourceStrategy.getItemAttributes(httpResponseAttributes.getStatusCode(), httpResponseAttributes.getReasonPhrase(), httpResponseAttributes.getHeaders(), typedValue)));
        }
    }

    private Consumer<PollContext.PollItem<InputStream, A>> getPollItemConsumer(Optional<S> optional, TypedValue<String> typedValue, TypedValue<String> typedValue2, A a) {
        return pollItem -> {
            pollItem.setResult(Result.builder().output(new ByteArrayInputStream(((String) typedValue2.getValue()).getBytes())).mediaType(typedValue2.getDataType().getMediaType()).attributes(a).build());
            pollItem.setWatermark(this.restPollingSourceStrategy.getItemWatermark(optional, typedValue, typedValue2));
            pollItem.setId(this.restPollingSourceStrategy.getItemIdentity(optional, typedValue, typedValue2));
        };
    }

    public void onRejectedItem(Result<InputStream, A> result, SourceCallbackContext sourceCallbackContext) {
        if (result.getOutput() != null) {
            RestUtils.closeStream(result.getOutput());
        }
        LOGGER.debug("Item Rejected");
    }

    protected abstract HttpRequest getRequest(Optional<S> optional);
}
