package edu.iu.dsc.tws.dataset.partition;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/* loaded from: input_file:edu/iu/dsc/tws/dataset/partition/BufferedCollectionPartition.class */
public abstract class BufferedCollectionPartition<T> extends CollectionPartition<T> implements Closeable {
    private static final long DEFAULT_MAX_BUFFERED_BYTES = 10000000;
    private static final String EXTENSION = ".pbck";
    private long maxFramesInMemory;
    private MessageType dataType;
    private List<Path> filesList;
    private long fileCounter;
    private List<byte[]> buffers;
    private long bufferedBytes;
    private long maxBufferedBytes;
    private FileSystem fileSystem;
    private Path rootPath;
    private String reference;
    private List<byte[]> currentFileCache;
    private int cachedFileIndex;
    private static final Logger LOG = Logger.getLogger(BufferedCollectionPartition.class.getName());
    private static final MessageType DEFAULT_DATA_TYPE = MessageTypes.OBJECT;

    public BufferedCollectionPartition(long j, MessageType messageType, long j2, Config config, String str) {
        this.filesList = new ArrayList();
        this.buffers = new ArrayList();
        this.bufferedBytes = 0L;
        this.currentFileCache = new ArrayList();
        this.cachedFileIndex = -1;
        this.reference = str;
        this.maxFramesInMemory = j;
        this.maxBufferedBytes = j2;
        this.dataType = messageType;
        try {
            this.fileSystem = getFileSystem(config);
            this.rootPath = getRootPath(config);
            this.fileSystem.mkdirs(this.rootPath);
        } catch (IOException e) {
            throw new Twister2RuntimeException("Failed to initialize and create a directory to hold the partition", e);
        }
    }

    public BufferedCollectionPartition(long j, Config config) {
        this(j, DEFAULT_DATA_TYPE, DEFAULT_MAX_BUFFERED_BYTES, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(long j, Config config, String str) {
        this(j, DEFAULT_DATA_TYPE, DEFAULT_MAX_BUFFERED_BYTES, config, str);
        loadFromFS();
    }

    public BufferedCollectionPartition(long j, MessageType messageType, Config config) {
        this(j, messageType, DEFAULT_MAX_BUFFERED_BYTES, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(MessageType messageType, long j, Config config) {
        this(0L, messageType, j, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(MessageType messageType, long j, Config config, String str) {
        this(0L, messageType, j, config, str);
        loadFromFS();
    }

    protected abstract FileSystem getFileSystem(Config config) throws IOException;

    protected abstract Path getRootPath(Config config);

    private void loadFromFS() {
        try {
            this.filesList = (List) Arrays.stream(this.fileSystem.listFiles(this.rootPath)).map((v0) -> {
                return v0.getPath();
            }).filter(path -> {
                return path.getName().contains(EXTENSION);
            }).sorted(Comparator.comparingLong(path2 -> {
                return Long.parseLong(path2.getName().replace(EXTENSION, ""));
            })).collect(Collectors.toList());
            this.fileCounter = r0.length;
        } catch (IOException e) {
            throw new Twister2RuntimeException("Failed to load frames from file system", e);
        }
    }

    @Override // edu.iu.dsc.tws.dataset.partition.CollectionPartition
    public void add(T t) {
        if (this.dataList.size() < this.maxFramesInMemory) {
            super.add(t);
            return;
        }
        this.buffers.add(this.dataType.getDataPacker().packToByteArray(t));
        this.bufferedBytes += r0.length;
        if (this.bufferedBytes > this.maxBufferedBytes) {
            flush();
        }
    }

    @Override // edu.iu.dsc.tws.dataset.partition.CollectionPartition
    public void addAll(Collection<T> collection) {
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            add(it.next());
        }
    }

    @Override // edu.iu.dsc.tws.dataset.partition.CollectionPartition
    public DataPartitionConsumer<T> getConsumer() {
        final Iterator<T> it = this.dataList.iterator();
        final Iterator<Path> it2 = this.filesList.iterator();
        final Iterator<byte[]> it3 = this.buffers.iterator();
        return new DataPartitionConsumer<T>() { // from class: edu.iu.dsc.tws.dataset.partition.BufferedCollectionPartition.1
            private Queue<byte[]> bufferFromDisk = new LinkedList();

            public boolean hasNext() {
                return it.hasNext() || it2.hasNext() || it3.hasNext() || !this.bufferFromDisk.isEmpty();
            }

            public T next() {
                if (!this.bufferFromDisk.isEmpty()) {
                    return (T) BufferedCollectionPartition.this.dataType.getDataPacker().unpackFromByteArray(this.bufferFromDisk.poll());
                }
                if (it.hasNext()) {
                    return (T) it.next();
                }
                if (!it2.hasNext()) {
                    if (it3.hasNext()) {
                        return (T) BufferedCollectionPartition.this.dataType.getDataPacker().unpackFromByteArray((byte[]) it3.next());
                    }
                    throw new Twister2RuntimeException("No more frames available in this partition");
                }
                Path path = (Path) it2.next();
                try {
                    DataInputStream dataInputStream = new DataInputStream(BufferedCollectionPartition.this.fileSystem.open(path));
                    long readLong = dataInputStream.readLong();
                    for (long j = 0; j < readLong; j++) {
                        byte[] bArr = new byte[dataInputStream.readInt()];
                        dataInputStream.read(bArr);
                        this.bufferFromDisk.add(bArr);
                    }
                    return (T) next();
                } catch (IOException e) {
                    throw new Twister2RuntimeException("Failed to read value from the temp file : " + path.toString(), e);
                }
            }
        };
    }

    public void dispose() {
        this.buffers = null;
        this.dataList = null;
    }

    @Override // edu.iu.dsc.tws.dataset.partition.CollectionPartition
    public void clear() {
        for (Path path : this.filesList) {
            try {
                this.fileSystem.delete(path, true);
            } catch (IOException e) {
                throw new Twister2RuntimeException("Failed to delete the temporary file : " + path.toString(), e);
            }
        }
        super.clear();
        this.filesList.clear();
        this.buffers.clear();
        this.bufferedBytes = 0L;
        this.fileCounter = 0L;
    }

    public void flush() {
        Path path = this.rootPath;
        StringBuilder sb = new StringBuilder();
        long j = this.fileCounter;
        this.fileCounter = j + 1;
        Path path2 = new Path(path, sb.append(j).append(EXTENSION).toString());
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(this.fileSystem.create(path2));
            try {
                dataOutputStream.writeLong(this.buffers.size());
                for (byte[] bArr : this.buffers) {
                    dataOutputStream.writeInt(bArr.length);
                    dataOutputStream.write(bArr);
                }
                dataOutputStream.close();
                this.filesList.add(path2);
                this.buffers.clear();
                this.bufferedBytes = 0L;
            } finally {
            }
        } catch (IOException e) {
            throw new Twister2RuntimeException("Couldn't flush partitions to the disk", e);
        }
    }

    public boolean hasIndexInMemory(int i) {
        return i < this.dataList.size();
    }

    public T get(int i) {
        if (i < this.dataList.size()) {
            return this.dataList.get(i);
        }
        long size = this.dataList.size();
        for (int i2 = 0; i2 < this.filesList.size(); i2++) {
            Path path = this.filesList.get(i2);
            try {
                DataInputStream dataInputStream = new DataInputStream(this.fileSystem.open(path));
                long readLong = dataInputStream.readLong();
                if (i < size + readLong) {
                    if (this.cachedFileIndex != i2) {
                        this.cachedFileIndex = i2;
                        this.currentFileCache = new ArrayList();
                        for (long j = 0; j < readLong; j++) {
                            byte[] bArr = new byte[dataInputStream.readInt()];
                            dataInputStream.read(bArr);
                            this.currentFileCache.add(bArr);
                        }
                    }
                    return (T) this.dataType.getDataPacker().unpackFromByteArray(this.currentFileCache.get((int) ((i - this.dataList.size()) - size)));
                }
                size += readLong;
            } catch (IOException e) {
                throw new Twister2RuntimeException("Failed to read from file : " + path);
            }
        }
        return (T) this.dataType.getDataPacker().unpackFromByteArray(this.buffers.get((int) (i - size)));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        flush();
    }

    public String getReference() {
        return this.reference;
    }
}
