/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.cluster.coordination;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CleanableResponseHandler;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.cluster.coordination.ValidateJoinRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

public class JoinValidationService {
    private static final Logger logger = LogManager.getLogger(JoinValidationService.class);
    public static final String JOIN_VALIDATE_ACTION_NAME = "internal:cluster/coordination/join/validate";
    public static final Setting<TimeValue> JOIN_VALIDATION_CACHE_TIMEOUT_SETTING = Setting.timeSetting("cluster.join_validation.cache_timeout", TimeValue.timeValueSeconds((long)60L), TimeValue.timeValueMillis((long)1L), Setting.Property.NodeScope);
    private static final TransportRequestOptions REQUEST_OPTIONS = TransportRequestOptions.of(null, TransportRequestOptions.Type.STATE);
    private final TimeValue cacheTimeout;
    private final TransportService transportService;
    private final Supplier<ClusterState> clusterStateSupplier;
    private final AtomicInteger queueSize = new AtomicInteger();
    private final Queue<AbstractRunnable> queue = new ConcurrentLinkedQueue<AbstractRunnable>();
    private final Map<Version, ReleasableBytesReference> statesByVersion = new HashMap<Version, ReleasableBytesReference>();
    private final RefCounted executeRefs;
    private final AbstractRunnable processor = new AbstractRunnable(){

        @Override
        protected void doRun() {
            JoinValidationService.this.processNextItem();
        }

        @Override
        public void onRejection(Exception e) {
            EsRejectedExecutionException esre;
            assert (e instanceof EsRejectedExecutionException && (esre = (EsRejectedExecutionException)e).isExecutorShutdown());
            JoinValidationService.this.onShutdown();
        }

        @Override
        public void onFailure(Exception e) {
            logger.error("unexpectedly failed to process queue item", (Throwable)e);
            assert (false) : e;
        }

        public String toString() {
            return "process next task of join validation service";
        }
    };
    private final AbstractRunnable cacheClearer = new AbstractRunnable(){

        @Override
        public void onFailure(Exception e) {
            logger.error("unexpectedly failed to clear cache", (Throwable)e);
            assert (false) : e;
        }

        @Override
        protected void doRun() {
            for (ReleasableBytesReference bytes : JoinValidationService.this.statesByVersion.values()) {
                bytes.decRef();
            }
            JoinValidationService.this.statesByVersion.clear();
            logger.trace("join validation cache cleared");
        }

        public String toString() {
            return "clear join validation cache";
        }
    };

    public JoinValidationService(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
        this.cacheTimeout = JOIN_VALIDATION_CACHE_TIMEOUT_SETTING.get(settings);
        this.transportService = transportService;
        this.clusterStateSupplier = clusterStateSupplier;
        this.executeRefs = AbstractRefCounted.of(() -> this.execute(this.cacheClearer));
        List<String> dataPaths = Environment.PATH_DATA_SETTING.get(settings);
        transportService.registerRequestHandler(JOIN_VALIDATE_ACTION_NAME, "cluster_coordination", ValidateJoinRequest::new, (request, channel, task) -> {
            ClusterState remoteState = request.getOrReadState();
            ClusterState localState = (ClusterState)clusterStateSupplier.get();
            if (localState.metadata().clusterUUIDCommitted() && !localState.metadata().clusterUUID().equals(remoteState.metadata().clusterUUID())) {
                throw new CoordinationStateRejectedException("This node previously joined a cluster with UUID [" + localState.metadata().clusterUUID() + "] and is now trying to join a different cluster with UUID [" + remoteState.metadata().clusterUUID() + "]. This is forbidden and usually indicates an incorrect discovery or cluster bootstrapping configuration. Note that the cluster UUID persists across restarts and can only be changed by deleting the contents of the node's data " + (dataPaths.size() == 1 ? "path " : "paths ") + dataPaths + " which will also remove any data held by this node.", new Object[0]);
            }
            joinValidators.forEach(joinValidator -> joinValidator.accept(transportService.getLocalNode(), remoteState));
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        });
    }

    public void validateJoin(DiscoveryNode discoveryNode, ActionListener<TransportResponse.Empty> listener) {
        if (discoveryNode.getVersion().onOrAfter(Version.V_8_3_0)) {
            if (this.executeRefs.tryIncRef()) {
                try {
                    this.execute(new JoinValidation(discoveryNode, listener));
                }
                finally {
                    this.executeRefs.decRef();
                }
            } else {
                listener.onFailure(new NodeClosedException(this.transportService.getLocalNode()));
            }
        } else {
            this.transportService.sendRequest(discoveryNode, JOIN_VALIDATE_ACTION_NAME, (TransportRequest)new ValidateJoinRequest(this.clusterStateSupplier.get()), REQUEST_OPTIONS, new ActionListenerResponseHandler<TransportResponse.Empty>(listener.delegateResponse((l, e) -> {
                logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", (Throwable)e);
                listener.onFailure(new IllegalStateException(String.format(Locale.ROOT, "failure when sending a join validation request from [%s] to [%s]", this.transportService.getLocalNode().descriptionWithoutAttributes(), discoveryNode.descriptionWithoutAttributes()), (Throwable)e));
            }), i -> TransportResponse.Empty.INSTANCE, "cluster_coordination"));
        }
    }

    public void stop() {
        this.executeRefs.decRef();
    }

    boolean isIdle() {
        return this.queue.isEmpty() && this.queueSize.get() == 0 && this.statesByVersion.isEmpty();
    }

    private void execute(AbstractRunnable task) {
        assert (task == this.cacheClearer || this.executeRefs.hasReferences());
        this.queue.add(task);
        if (this.queueSize.getAndIncrement() == 0) {
            this.runProcessor();
        }
    }

    private void runProcessor() {
        this.transportService.getThreadPool().executor("cluster_coordination").execute(this.processor);
    }

    private void processNextItem() {
        if (!this.executeRefs.hasReferences()) {
            this.onShutdown();
            return;
        }
        AbstractRunnable nextItem = this.queue.poll();
        assert (nextItem != null);
        try {
            nextItem.run();
        }
        finally {
            try {
                int remaining = this.queueSize.decrementAndGet();
                assert (remaining >= 0);
                if (remaining > 0) {
                    this.runProcessor();
                }
            }
            catch (Exception e) {
                assert (false) : e;
                throw e;
            }
        }
    }

    private void onShutdown() {
        try {
            this.cacheClearer.run();
            do {
                AbstractRunnable nextItem = this.queue.poll();
                assert (nextItem != null);
                if (nextItem == this.cacheClearer) continue;
                nextItem.onFailure(new NodeClosedException(this.transportService.getLocalNode()));
            } while (this.queueSize.decrementAndGet() > 0);
        }
        catch (Exception e) {
            assert (false) : e;
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReleasableBytesReference serializeClusterState(DiscoveryNode discoveryNode) {
        RecyclerBytesStreamOutput bytesStream = this.transportService.newNetworkBytesStream();
        boolean success = false;
        try {
            ClusterState clusterState = this.clusterStateSupplier.get();
            Version version = discoveryNode.getVersion();
            try (OutputStreamStreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream)));){
                stream.setVersion(version);
                clusterState.writeTo(stream);
            }
            catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", (Throwable)e, discoveryNode);
            }
            ReleasableBytesReference newBytes = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
            logger.trace("serialized join validation cluster state version [{}] for node version [{}] with size [{}]", (Object)clusterState.version(), (Object)version, (Object)newBytes.length());
            ReleasableBytesReference previousBytes = this.statesByVersion.put(version, newBytes);
            assert (previousBytes == null);
            success = true;
            ReleasableBytesReference releasableBytesReference = newBytes;
            return releasableBytesReference;
        }
        finally {
            if (!success) {
                bytesStream.close();
                assert (false);
            }
        }
    }

    private class JoinValidation
    extends ActionRunnable<TransportResponse.Empty> {
        private final DiscoveryNode discoveryNode;

        JoinValidation(DiscoveryNode discoveryNode, ActionListener<TransportResponse.Empty> listener) {
            super(listener);
            this.discoveryNode = discoveryNode;
        }

        @Override
        protected void doRun() throws Exception {
            assert (this.discoveryNode.getVersion().onOrAfter(Version.V_8_3_0)) : this.discoveryNode.getVersion();
            ReleasableBytesReference cachedBytes = JoinValidationService.this.statesByVersion.get(this.discoveryNode.getVersion());
            ReleasableBytesReference bytes = Objects.requireNonNullElseGet(cachedBytes, () -> JoinValidationService.this.serializeClusterState(this.discoveryNode));
            assert (bytes.hasReferences()) : "already closed";
            bytes.incRef();
            JoinValidationService.this.transportService.sendRequest(this.discoveryNode, JoinValidationService.JOIN_VALIDATE_ACTION_NAME, (TransportRequest)new BytesTransportRequest(bytes, this.discoveryNode.getVersion().transportVersion), REQUEST_OPTIONS, new CleanableResponseHandler<TransportResponse.Empty>(this.listener, in -> TransportResponse.Empty.INSTANCE, "cluster_coordination", bytes::decRef));
            if (cachedBytes == null) {
                JoinValidationService.this.transportService.getThreadPool().schedule(new Runnable(){

                    @Override
                    public void run() {
                        JoinValidationService.this.execute(JoinValidationService.this.cacheClearer);
                    }

                    public String toString() {
                        return JoinValidationService.this.cacheClearer + " after timeout";
                    }
                }, JoinValidationService.this.cacheTimeout, "cluster_coordination");
            }
        }

        @Override
        public String toString() {
            return "send cached join validation request to " + this.discoveryNode;
        }
    }
}

