package io.ray.serve.replica;

import io.ray.api.BaseActorHandle;
import io.ray.runtime.metric.Count;
import io.ray.runtime.metric.Gauge;
import io.ray.runtime.metric.Histogram;
import io.ray.runtime.metric.Metrics;
import io.ray.serve.api.Serve;
import io.ray.serve.common.Constants;
import io.ray.serve.config.DeploymentConfig;
import io.ray.serve.context.RequestContext;
import io.ray.serve.deployment.DeploymentId;
import io.ray.serve.deployment.DeploymentVersion;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.RequestMetadata;
import io.ray.serve.metrics.RayServeMetrics;
import io.ray.serve.router.Query;
import io.ray.serve.util.MessageFormatter;
import io.ray.serve.util.ReflectUtil;
import io.ray.shaded.com.google.common.collect.ImmutableMap;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ray/serve/replica/RayServeReplicaImpl.class */
public class RayServeReplicaImpl implements RayServeReplica {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RayServeReplicaImpl.class);
    private DeploymentId deploymentId;
    private DeploymentConfig config;
    private Object callable;
    private Count requestCounter;
    private Count errorCounter;
    private Count restartCounter;
    private Histogram processingLatencyTracker;
    private Gauge numProcessingItems;
    private DeploymentVersion version;
    private AtomicInteger numOngoingRequests = new AtomicInteger();
    private boolean isDeleted = false;
    private String deploymentName = Serve.getReplicaContext().getDeploymentName();
    private String replicaTag = Serve.getReplicaContext().getReplicaTag();
    private final Method checkHealthMethod = getRunnerMethod(Constants.CHECK_HEALTH_METHOD, null, true);
    private final Method callMethod = getRunnerMethod(Constants.CALL_METHOD, new Object[]{new Object()}, true);

    public RayServeReplicaImpl(Object obj, DeploymentConfig deploymentConfig, DeploymentVersion deploymentVersion, BaseActorHandle baseActorHandle, String str) {
        this.deploymentId = new DeploymentId(this.deploymentName, str);
        this.callable = obj;
        this.config = deploymentConfig;
        this.version = deploymentVersion;
        registerMetrics();
    }

    private void registerMetrics() {
        RayServeMetrics.execute(() -> {
            this.requestCounter = Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_REQUEST_COUNTER.getName()).description(RayServeMetrics.SERVE_DEPLOYMENT_REQUEST_COUNTER.getDescription()).unit("").tags(ImmutableMap.of(RayServeMetrics.TAG_DEPLOYMENT, this.deploymentName, RayServeMetrics.TAG_REPLICA, this.replicaTag)).register();
        });
        RayServeMetrics.execute(() -> {
            this.errorCounter = Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_ERROR_COUNTER.getName()).description(RayServeMetrics.SERVE_DEPLOYMENT_ERROR_COUNTER.getDescription()).unit("").tags(ImmutableMap.of(RayServeMetrics.TAG_DEPLOYMENT, this.deploymentName, RayServeMetrics.TAG_REPLICA, this.replicaTag)).register();
        });
        RayServeMetrics.execute(() -> {
            this.restartCounter = Metrics.count().name(RayServeMetrics.SERVE_DEPLOYMENT_REPLICA_STARTS.getName()).description(RayServeMetrics.SERVE_DEPLOYMENT_REPLICA_STARTS.getDescription()).unit("").tags(ImmutableMap.of(RayServeMetrics.TAG_DEPLOYMENT, this.deploymentName, RayServeMetrics.TAG_REPLICA, this.replicaTag)).register();
        });
        RayServeMetrics.execute(() -> {
            this.processingLatencyTracker = Metrics.histogram().name(RayServeMetrics.SERVE_DEPLOYMENT_PROCESSING_LATENCY_MS.getName()).description(RayServeMetrics.SERVE_DEPLOYMENT_PROCESSING_LATENCY_MS.getDescription()).unit("").boundaries(Constants.DEFAULT_LATENCY_BUCKET_MS).tags(ImmutableMap.of(RayServeMetrics.TAG_DEPLOYMENT, this.deploymentName, RayServeMetrics.TAG_REPLICA, this.replicaTag)).register();
        });
        RayServeMetrics.execute(() -> {
            this.numProcessingItems = Metrics.gauge().name(RayServeMetrics.SERVE_REPLICA_PROCESSING_QUERIES.getName()).description(RayServeMetrics.SERVE_REPLICA_PROCESSING_QUERIES.getDescription()).unit("").tags(ImmutableMap.of(RayServeMetrics.TAG_DEPLOYMENT, this.deploymentName, RayServeMetrics.TAG_REPLICA, this.replicaTag)).register();
        });
        RayServeMetrics.execute(() -> {
            this.restartCounter.inc(1.0d);
        });
    }

    @Override // io.ray.serve.replica.RayServeReplica
    public Object handleRequest(Object obj, Object obj2) {
        long currentTimeMillis = System.currentTimeMillis();
        Query query = new Query((RequestMetadata) obj, obj2);
        LOGGER.debug("Replica {} received request {}", this.replicaTag, query.getMetadata().getRequestId());
        this.numOngoingRequests.incrementAndGet();
        RayServeMetrics.execute(() -> {
            this.numProcessingItems.update(this.numOngoingRequests.get());
        });
        Object invokeSingle = invokeSingle(query);
        this.numOngoingRequests.decrementAndGet();
        LOGGER.debug("Replica {} finished request {} in {}ms", this.replicaTag, query.getMetadata().getRequestId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return invokeSingle;
    }

    private Object invokeSingle(Query query) {
        RayServeException rayServeException;
        long currentTimeMillis = System.currentTimeMillis();
        Method method = null;
        RequestMetadata metadata = query.getMetadata();
        try {
            try {
                RequestContext.set(metadata.getRoute(), metadata.getRequestId(), this.deploymentId.getApp(), metadata.getMultiplexedModelId());
                LOGGER.info("Started executing request {}", metadata.getRequestId());
                Object[] parseRequestItem = parseRequestItem(query);
                method = (parseRequestItem.length != 1 || this.callMethod == null) ? getRunnerMethod(query.getMetadata().getCallMethod(), parseRequestItem, false) : this.callMethod;
                Object invoke = method.invoke(this.callable, parseRequestItem);
                RayServeMetrics.execute(() -> {
                    this.requestCounter.inc(1.0d);
                });
                RayServeMetrics.execute(() -> {
                    this.processingLatencyTracker.update(System.currentTimeMillis() - currentTimeMillis);
                });
                RequestContext.clean();
                return invoke;
            } finally {
            }
        } catch (Throwable th) {
            RayServeMetrics.execute(() -> {
                this.processingLatencyTracker.update(System.currentTimeMillis() - currentTimeMillis);
            });
            RequestContext.clean();
            throw th;
        }
    }

    private Object[] parseRequestItem(Query query) {
        return query.getArgs() == null ? new Object[0] : query.getArgs() instanceof Object[] ? (Object[]) query.getArgs() : new Object[]{query.getArgs()};
    }

    private Method getRunnerMethod(String str, Object[] objArr, boolean z) {
        try {
            return ReflectUtil.getMethod(this.callable.getClass(), str, objArr);
        } catch (NoSuchMethodException e) {
            String format = MessageFormatter.format("Tried to call a method {} that does not exist. Available methods: {}", str, ReflectUtil.getMethodStrings(this.callable.getClass()));
            if (z) {
                LOGGER.warn(format);
                return null;
            }
            LOGGER.error(format, (Throwable) e);
            throw new RayServeException(format, e);
        }
    }

    @Override // io.ray.serve.replica.RayServeReplica
    public synchronized boolean prepareForShutdown() {
        while (true) {
            try {
                Thread.sleep((long) (this.config.getGracefulShutdownWaitLoopS().doubleValue() * 1000.0d));
            } catch (InterruptedException e) {
                LOGGER.error("Replica {} was interrupted in sheep when draining pending queries", this.replicaTag);
            }
            int numOngoingRequests = getNumOngoingRequests();
            if (numOngoingRequests <= 0) {
                break;
            }
            LOGGER.info("Waiting for an additional {}s to shut down because there are {} ongoing requests.", this.config.getGracefulShutdownWaitLoopS(), Integer.valueOf(numOngoingRequests));
        }
        LOGGER.info("Graceful shutdown complete; replica exiting.");
        try {
            if (!this.isDeleted) {
                ReflectUtil.getMethod(this.callable.getClass(), "del", new Object[0]).invoke(this.callable, new Object[0]);
            }
            return true;
        } catch (NoSuchMethodException e2) {
            LOGGER.warn("Deployment {} has no del method.", this.deploymentName);
            return true;
        } catch (Throwable th) {
            LOGGER.error("Exception during graceful shutdown of replica.");
            return true;
        } finally {
            this.isDeleted = true;
        }
    }

    @Override // io.ray.serve.replica.RayServeReplica
    public int getNumOngoingRequests() {
        return this.numOngoingRequests.get();
    }

    @Override // io.ray.serve.replica.RayServeReplica
    public DeploymentVersion reconfigure(byte[] bArr) {
        this.config = DeploymentConfig.fromProtoBytes(bArr);
        Object userConfig = this.config.getUserConfig();
        this.version = new DeploymentVersion(this.version.getCodeVersion(), this.config, this.version.getRayActorOptions());
        if (userConfig != null) {
            updateUserConfig(userConfig);
        }
        return this.version;
    }

    public void updateUserConfig(Object obj) {
        LOGGER.info("Replica {} of deployment {} reconfigure userConfig: {}", this.replicaTag, this.deploymentName, obj);
        try {
            try {
                ReflectUtil.getMethod(this.callable.getClass(), Constants.RECONFIGURE_METHOD, obj).invoke(this.callable, obj);
                LOGGER.info("Replica {} of deployment {} finished reconfiguring userConfig: {}", this.replicaTag, this.deploymentId, obj);
            } catch (NoSuchMethodException e) {
                String format = MessageFormatter.format("userConfig specified but deployment {} missing {} method", this.deploymentId, Constants.RECONFIGURE_METHOD);
                LOGGER.error(format);
                throw new RayServeException(format, e);
            } catch (Throwable th) {
                String format2 = MessageFormatter.format("Replica {} of deployment {} failed to reconfigure userConfig {}", this.replicaTag, this.deploymentId, obj);
                LOGGER.error(format2);
                throw new RayServeException(format2, th);
            }
        } catch (Throwable th2) {
            LOGGER.info("Replica {} of deployment {} finished reconfiguring userConfig: {}", this.replicaTag, this.deploymentId, obj);
            throw th2;
        }
    }

    public DeploymentVersion getVersion() {
        return this.version;
    }

    @Override // io.ray.serve.replica.RayServeReplica
    public boolean checkHealth() {
        if (this.checkHealthMethod == null) {
            return true;
        }
        boolean z = true;
        try {
            try {
                LOGGER.info("Replica {} of deployment {} check health of {}", this.replicaTag, this.deploymentName, this.callable.getClass().getName());
                Object invoke = this.checkHealthMethod.invoke(this.callable, new Object[0]);
                if (invoke instanceof Boolean) {
                    z = ((Boolean) invoke).booleanValue();
                } else {
                    Logger logger = LOGGER;
                    Object[] objArr = new Object[4];
                    objArr[0] = invoke == null ? "null" : invoke.getClass().getName() + ParameterizedMessage.ERROR_MSG_SEPARATOR + invoke;
                    objArr[1] = this.callable.getClass().getName();
                    objArr[2] = this.replicaTag;
                    objArr[3] = this.deploymentName;
                    logger.error("The health check result {} of {} in replica {} of deployment {} is illegal.", objArr);
                    z = false;
                }
                LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, Boolean.valueOf(z));
            } catch (Throwable th) {
                LOGGER.error("Replica {} of deployment {} failed to check health of {}", this.replicaTag, this.deploymentName, this.callable.getClass().getName(), th);
                z = false;
                LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, false);
            }
            return z;
        } catch (Throwable th2) {
            LOGGER.info("The health check result of {} in replica {} of deployment {} is {}.", this.callable.getClass().getName(), this.replicaTag, this.deploymentName, Boolean.valueOf(z));
            throw th2;
        }
    }

    public Object getCallable() {
        return this.callable;
    }
}
