package org.apache.flink.table.planner.delegation;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.plan.nodes.resource.NodeResourceUtil;

@Internal
/* loaded from: input_file:org/apache/flink/table/planner/delegation/BatchExecutor.class */
public class BatchExecutor extends ExecutorBase {
    @VisibleForTesting
    public BatchExecutor(StreamExecutionEnvironment streamExecutionEnvironment) {
        super(streamExecutionEnvironment);
    }

    public JobExecutionResult execute(String str) throws Exception {
        return getExecutionEnvironment().execute(generateStreamGraph(str));
    }

    private void setBatchProperties(StreamExecutionEnvironment streamExecutionEnvironment) {
        ExecutionConfig config = streamExecutionEnvironment.getConfig();
        config.enableObjectReuse();
        config.setLatencyTrackingInterval(-1L);
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamExecutionEnvironment.setBufferTimeout(-1L);
        if (isShuffleModeAllBatch()) {
            config.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
        }
    }

    @Override // org.apache.flink.table.planner.delegation.ExecutorBase
    public StreamGraph generateStreamGraph(List<Transformation<?>> list, String str) {
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        setBatchProperties(executionEnvironment);
        executionEnvironment.getClass();
        list.forEach(executionEnvironment::addOperator);
        StreamGraph streamGraph = executionEnvironment.getStreamGraph(getNonEmptyJobName(str));
        ResourceSpec fromManagedMem = NodeResourceUtil.fromManagedMem(0);
        streamGraph.getStreamNodes().forEach(streamNode -> {
            if (streamNode.getMinResources().equals(ResourceSpec.DEFAULT)) {
                streamNode.setResources(fromManagedMem, fromManagedMem);
            }
        });
        streamGraph.setChaining(true);
        streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
        streamGraph.setStateBackend((StateBackend) null);
        if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
            throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
        }
        if (isShuffleModeAllBatch()) {
            streamGraph.setBlockingConnectionsBetweenChains(true);
        }
        return streamGraph;
    }

    private boolean isShuffleModeAllBatch() {
        String string = this.tableConfig.getConfiguration().getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE);
        if (string.equalsIgnoreCase(ShuffleMode.BATCH.toString())) {
            return true;
        }
        if (string.equalsIgnoreCase(ShuffleMode.PIPELINED.toString())) {
            return false;
        }
        throw new IllegalArgumentException(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.key() + " can only be set to " + ShuffleMode.BATCH.toString() + " or " + ShuffleMode.PIPELINED.toString());
    }
}
