/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.operators.sort.BinaryMergeIterator;
import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.store.codegen.RecordComparator;
import org.apache.flink.table.store.file.sort.BinaryExternalMerger;
import org.apache.flink.table.store.file.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.store.file.sort.SortBuffer;
import org.apache.flink.util.MutableObjectIterator;

public class BinaryExternalSortBuffer
implements SortBuffer {
    private final BinaryRowDataSerializer serializer;
    private final int pageSize;
    private final BinaryInMemorySortBuffer inMemorySortBuffer;
    private final IOManager ioManager;
    private SpillChannelManager channelManager;
    private final int maxNumFileHandles;
    private final boolean compressionEnable;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;
    private final BinaryExternalMerger merger;
    private final FileIOChannel.Enumerator enumerator;
    private final List<ChannelWithMeta> spillChannelIDs;
    private int numRecords = 0;

    public BinaryExternalSortBuffer(BinaryRowDataSerializer serializer, RecordComparator comparator, int pageSize, BinaryInMemorySortBuffer inMemorySortBuffer, IOManager ioManager, int maxNumFileHandles) {
        this.serializer = serializer;
        this.pageSize = pageSize;
        this.inMemorySortBuffer = inMemorySortBuffer;
        this.ioManager = ioManager;
        this.channelManager = new SpillChannelManager();
        this.maxNumFileHandles = maxNumFileHandles;
        this.compressionEnable = true;
        this.compressionCodecFactory = new Lz4BlockCompressionFactory();
        this.compressionBlockSize = (int)MemorySize.parse("64 kb").getBytes();
        this.merger = new BinaryExternalMerger(ioManager, pageSize, maxNumFileHandles, this.channelManager, (BinaryRowDataSerializer)serializer.duplicate(), comparator, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize);
        this.enumerator = ioManager.createChannelEnumerator();
        this.spillChannelIDs = new ArrayList<ChannelWithMeta>();
    }

    @Override
    public int size() {
        return this.numRecords;
    }

    @Override
    public void clear() {
        this.numRecords = 0;
        this.inMemorySortBuffer.clear();
        this.spillChannelIDs.clear();
        this.channelManager.close();
        this.channelManager = new SpillChannelManager();
    }

    @Override
    public long getOccupancy() {
        return this.inMemorySortBuffer.getOccupancy();
    }

    @Override
    public boolean flushMemory() throws IOException {
        this.spill();
        return true;
    }

    @VisibleForTesting
    public void write(MutableObjectIterator<BinaryRowData> iterator) throws IOException {
        BinaryRowData row = this.serializer.createInstance();
        while ((row = iterator.next(row)) != null) {
            this.write(row);
        }
    }

    @Override
    public boolean write(RowData record) throws IOException {
        while (true) {
            boolean success;
            if (success = this.inMemorySortBuffer.write(record)) {
                ++this.numRecords;
                return true;
            }
            if (this.inMemorySortBuffer.isEmpty()) {
                throw new IOException("The record exceeds the maximum size of a sort buffer.");
            }
            this.spill();
            if (this.spillChannelIDs.size() < this.maxNumFileHandles) continue;
            List<ChannelWithMeta> merged = this.merger.mergeChannelList(this.spillChannelIDs);
            this.spillChannelIDs.clear();
            this.spillChannelIDs.addAll(merged);
        }
    }

    @Override
    public final MutableObjectIterator<BinaryRowData> sortedIterator() throws IOException {
        if (this.spillChannelIDs.isEmpty()) {
            return this.inMemorySortBuffer.sortedIterator();
        }
        return this.spilledIterator();
    }

    private MutableObjectIterator<BinaryRowData> spilledIterator() throws IOException {
        this.spill();
        ArrayList<FileIOChannel> openChannels = new ArrayList<FileIOChannel>();
        final BinaryMergeIterator iterator = this.merger.getMergingIterator(this.spillChannelIDs, openChannels);
        this.channelManager.addOpenChannels(openChannels);
        return new MutableObjectIterator<BinaryRowData>(){

            @Override
            public BinaryRowData next(BinaryRowData reuse) throws IOException {
                return this.next();
            }

            @Override
            public BinaryRowData next() throws IOException {
                BinaryRowData row = (BinaryRowData)iterator.next();
                return row == null ? null : row.copy();
            }
        };
    }

    private void spill() throws IOException {
        int blockCount;
        int bytesInLastBuffer;
        if (this.inMemorySortBuffer.isEmpty()) {
            return;
        }
        FileIOChannel.ID channel = this.enumerator.next();
        this.channelManager.addChannel(channel);
        AbstractChannelWriterOutputView output = null;
        try {
            output = FileChannelUtil.createOutputView(this.ioManager, channel, this.compressionEnable, this.compressionCodecFactory, this.compressionBlockSize, this.pageSize);
            new QuickSort().sort(this.inMemorySortBuffer);
            this.inMemorySortBuffer.writeToOutput(output);
            bytesInLastBuffer = output.close();
            blockCount = output.getBlockCount();
        }
        catch (IOException e) {
            if (output != null) {
                output.close();
                output.getChannel().deleteChannel();
            }
            throw e;
        }
        this.spillChannelIDs.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
        this.inMemorySortBuffer.clear();
    }
}

