package edu.iu.dsc.tws.comms.shuffle;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;

/* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSMerger.class */
public class FSMerger implements Shuffle {
    private static final Logger LOG = Logger.getLogger(FSMerger.class.getName());
    private long maxBytesToKeepInMemory;
    private long maxRecordsInMemory;
    private String folder;
    private String operationName;
    private MessageType valueType;
    private int noOfFileWritten = 0;
    private List<Integer> bytesLength = new ArrayList();
    private List<byte[]> bytesInMemory = new ArrayList();
    private List<Object> objectsInMemory = new ArrayList();
    private List<Integer> filePartBytes = new ArrayList();
    private long numOfBytesInMemory = 0;
    private Lock lock = new ReentrantLock();
    private Condition notFull = this.lock.newCondition();
    private FSStatus status = FSStatus.WRITING;
    private KryoSerializer kryoSerializer = new KryoSerializer();

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSMerger$FSIterator.class */
    private class FSIterator implements Iterator<Object> {
        private int currentFileIndex = -1;
        private int currentIndex = 0;
        private Iterator<Object> it;
        private List<Object> openValues;

        FSIterator() {
            this.it = FSMerger.this.objectsInMemory.iterator();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.currentFileIndex == -1) {
                if (this.it.hasNext()) {
                    return true;
                }
                if (FSMerger.this.noOfFileWritten <= 0) {
                    return false;
                }
                openFilePart();
            }
            if (this.currentFileIndex < 0) {
                return false;
            }
            if (this.currentIndex < this.openValues.size()) {
                return true;
            }
            if (this.currentFileIndex >= FSMerger.this.noOfFileWritten - 1) {
                return false;
            }
            openFilePart();
            return true;
        }

        private void openFilePart() {
            this.currentFileIndex++;
            this.openValues = FileLoader.readFile(FSMerger.this.getSaveFileName(this.currentFileIndex), FSMerger.this.valueType, FSMerger.this.kryoSerializer);
            this.currentIndex = 0;
        }

        @Override // java.util.Iterator
        public Object next() {
            if (this.currentFileIndex == -1) {
                return this.it.next();
            }
            if (this.currentFileIndex < 0) {
                return null;
            }
            Object obj = this.openValues.get(this.currentIndex);
            this.currentIndex++;
            return obj;
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSMerger$FSStatus.class */
    private enum FSStatus {
        WRITING,
        READING
    }

    public FSMerger(long j, long j2, String str, String str2, MessageType messageType) {
        this.maxBytesToKeepInMemory = j;
        this.maxRecordsInMemory = j2;
        this.folder = str;
        this.operationName = str2;
        this.valueType = messageType;
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void add(byte[] bArr, int i) {
        if (this.status == FSStatus.READING) {
            throw new RuntimeException("Cannot add after switching to reading");
        }
        this.lock.lock();
        try {
            this.bytesInMemory.add(bArr);
            this.bytesLength.add(Integer.valueOf(i));
            this.numOfBytesInMemory += i;
            if (this.numOfBytesInMemory > this.maxBytesToKeepInMemory || this.bytesInMemory.size() > this.maxRecordsInMemory) {
                this.notFull.signal();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void switchToReading() {
        this.status = FSStatus.READING;
        deserializeObjects();
    }

    private void deserializeObjects() {
        for (int i = 0; i < this.bytesInMemory.size(); i++) {
            this.objectsInMemory.add(this.valueType.getDataPacker().unpackFromByteArray(this.bytesInMemory.get(i)));
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void run() {
        this.lock.lock();
        try {
            if (this.numOfBytesInMemory > this.maxBytesToKeepInMemory || this.bytesInMemory.size() > this.maxRecordsInMemory) {
                LOG.log(Level.FINE, String.format("Save objects bytes %d objects %d", Long.valueOf(this.numOfBytesInMemory), Integer.valueOf(this.bytesInMemory.size())));
                FileLoader.saveObjects(this.bytesInMemory, this.bytesLength, this.numOfBytesInMemory, getSaveFileName(this.noOfFileWritten));
                this.bytesInMemory.clear();
                this.bytesLength.clear();
                this.noOfFileWritten++;
                this.numOfBytesInMemory = 0L;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public Iterator<Object> readIterator() {
        return new FSIterator();
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void clean() {
        File file = new File(getSaveFolderName());
        try {
            FileUtils.cleanDirectory(file);
        } catch (IOException e) {
            LOG.log(Level.SEVERE, "Failed to clear directory: " + file, (Throwable) e);
        }
    }

    private String getSaveFolderName() {
        return this.folder + "/" + this.operationName;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSaveFileName(int i) {
        return this.folder + "/" + this.operationName + "/part_" + i;
    }

    private String getSizesFileName(int i) {
        return this.folder + "/" + this.operationName + "/part_sizes_" + i;
    }
}
