package edu.iu.dsc.tws.comms.shuffle;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.comms.utils.Heap;
import edu.iu.dsc.tws.comms.utils.HeapNode;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;

/* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger.class */
public class FSKeyedSortedMerger implements Shuffle {
    private static final Logger LOG = Logger.getLogger(FSKeyedSortedMerger.class.getName());
    private int maxBytesToKeepInMemory;
    private int maxRecordsInMemory;
    private String folder;
    private String operationName;
    private MessageType keyType;
    private MessageType dataType;
    private Comparator<Object> keyComparator;
    private int target;
    private int noOfFileWritten = 0;
    private List<Integer> bytesLength = new ArrayList();
    private List<Tuple> recordsInMemory = new ArrayList();
    private List<Tuple> objectsInMemory = new ArrayList();
    private List<Long> maxTupleSize = new ArrayList();
    private long numOfBytesInMemory = 0;
    private Lock lock = new ReentrantLock();
    private FSStatus status = FSStatus.WRITING;
    private KryoSerializer kryoSerializer = new KryoSerializer();

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger$ComparatorWrapper.class */
    private class ComparatorWrapper implements Comparator<Tuple> {
        private Comparator comparator;

        ComparatorWrapper(Comparator comparator) {
            this.comparator = comparator;
        }

        @Override // java.util.Comparator
        public int compare(Tuple tuple, Tuple tuple2) {
            return this.comparator.compare(tuple.getKey(), tuple2.getKey());
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger$FSIterator.class */
    private class FSIterator implements Iterator<Object> {
        private Heap heap;
        private Map<Integer, OpenFilePart> openFiles = new HashMap();
        private int numValuesInHeap = 0;

        FSIterator() {
            this.heap = new Heap(FSKeyedSortedMerger.this.noOfFileWritten + 1, FSKeyedSortedMerger.this.keyComparator);
            for (int i = 0; i < FSKeyedSortedMerger.this.noOfFileWritten; i++) {
                this.openFiles.put(Integer.valueOf(i), FileLoader.openPart(FSKeyedSortedMerger.this.getSaveFileName(i), 0L, ((Long) FSKeyedSortedMerger.this.maxTupleSize.get(i)).longValue(), FSKeyedSortedMerger.this.keyType, FSKeyedSortedMerger.this.dataType, FSKeyedSortedMerger.this.kryoSerializer));
            }
            this.openFiles.put(-1, new OpenFilePart(FSKeyedSortedMerger.this.objectsInMemory, 0L, 0L, null));
            for (Map.Entry<Integer, OpenFilePart> entry : this.openFiles.entrySet()) {
                OpenFilePart value = entry.getValue();
                if (value.hasNext()) {
                    this.heap.insert(value.next(), entry.getKey().intValue());
                    this.numValuesInHeap++;
                } else {
                    FSKeyedSortedMerger.LOG.log(Level.WARNING, String.format("File part without any initial values: %s target %d", value.getFileName(), Integer.valueOf(FSKeyedSortedMerger.this.target)));
                }
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.numValuesInHeap > 0;
        }

        @Override // java.util.Iterator
        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Object next2() {
            HeapNode extractMin = this.heap.extractMin();
            this.numValuesInHeap--;
            OpenFilePart openFilePart = this.openFiles.get(Integer.valueOf(extractMin.listNo));
            if (openFilePart.hasNext()) {
                this.heap.insert(openFilePart.next(), extractMin.listNo);
                this.numValuesInHeap++;
            } else if (openFilePart.getFileSize() <= openFilePart.getReadOffSet()) {
                this.openFiles.remove(Integer.valueOf(extractMin.listNo));
            } else {
                OpenFilePart openPart = FileLoader.openPart(FSKeyedSortedMerger.this.getSaveFileName(extractMin.listNo), ((Long) FSKeyedSortedMerger.this.maxTupleSize.get(extractMin.listNo)).longValue(), openFilePart.getReadOffSet(), FSKeyedSortedMerger.this.keyType, FSKeyedSortedMerger.this.dataType, FSKeyedSortedMerger.this.kryoSerializer);
                this.openFiles.put(Integer.valueOf(extractMin.listNo), openPart);
                if (openPart.hasNext()) {
                    this.heap.insert(openPart.next(), extractMin.listNo);
                    this.numValuesInHeap++;
                }
            }
            return extractMin.data;
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger$FSStatus.class */
    private enum FSStatus {
        WRITING,
        READING,
        DONE
    }

    public FSKeyedSortedMerger(int i, int i2, String str, String str2, MessageType messageType, MessageType messageType2, Comparator<Object> comparator, int i3) {
        this.maxBytesToKeepInMemory = i;
        this.maxRecordsInMemory = i2;
        this.folder = str;
        this.operationName = str2;
        this.keyType = messageType;
        this.dataType = messageType2;
        this.keyComparator = comparator;
        this.target = i3;
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void add(Object obj, byte[] bArr, int i) {
        if (this.status == FSStatus.READING) {
            throw new RuntimeException("Cannot add after switching to reading");
        }
        this.lock.lock();
        try {
            this.recordsInMemory.add(new Tuple(obj, bArr));
            this.bytesLength.add(Integer.valueOf(i));
            this.numOfBytesInMemory += i;
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void switchToReading() {
        this.lock.lock();
        try {
            this.status = FSStatus.READING;
            deserializeObjects();
            this.objectsInMemory.sort(new ComparatorWrapper(this.keyComparator));
        } finally {
            this.lock.unlock();
        }
    }

    private void deserializeObjects() {
        for (int i = 0; i < this.recordsInMemory.size(); i++) {
            Tuple tuple = this.recordsInMemory.get(i);
            this.objectsInMemory.add(new Tuple(tuple.getKey(), this.dataType.getDataPacker().unpackFromByteArray((byte[]) tuple.getValue())));
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void run() {
        this.lock.lock();
        try {
            if (this.numOfBytesInMemory > this.maxBytesToKeepInMemory || this.recordsInMemory.size() > this.maxRecordsInMemory) {
                List<Tuple> list = this.recordsInMemory;
                this.recordsInMemory = new ArrayList();
                list.sort(new ComparatorWrapper(this.keyComparator));
                this.maxTupleSize.add(Long.valueOf(FileLoader.saveKeyValues(list, this.bytesLength, this.numOfBytesInMemory, getSaveFileName(this.noOfFileWritten), this.keyType)));
                this.recordsInMemory.clear();
                this.bytesLength.clear();
                this.noOfFileWritten++;
                this.numOfBytesInMemory = 0L;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public Iterator<Object> readIterator() {
        return new Iterator<Object>() { // from class: edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger.1
            private FSIterator fsIterator;
            private Tuple nextTuple;
            private Iterator itOfCurrentKey;

            /* JADX WARN: Multi-variable type inference failed */
            {
                this.fsIterator = new FSIterator();
                this.nextTuple = this.fsIterator.hasNext() ? this.fsIterator.next2() : null;
                this.itOfCurrentKey = null;
            }

            private void skipKeys() {
                if (this.itOfCurrentKey != null) {
                    while (this.itOfCurrentKey.hasNext()) {
                        this.itOfCurrentKey.next();
                    }
                }
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                skipKeys();
                return this.nextTuple != null;
            }

            @Override // java.util.Iterator
            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public Object next2() {
                if (!hasNext()) {
                    throw new NoSuchElementException("There are no more keys to iterate");
                }
                final Object key = this.nextTuple.getKey();
                this.itOfCurrentKey = new Iterator<Object>() { // from class: edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger.1.1
                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return AnonymousClass1.this.nextTuple != null && AnonymousClass1.this.nextTuple.getKey().equals(key);
                    }

                    @Override // java.util.Iterator
                    public Object next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException("There are no more values for key " + key);
                        }
                        Object value = AnonymousClass1.this.nextTuple.getValue();
                        if (AnonymousClass1.this.fsIterator.hasNext()) {
                            AnonymousClass1.this.nextTuple = AnonymousClass1.this.fsIterator.next2();
                        } else {
                            AnonymousClass1.this.nextTuple = null;
                        }
                        return value;
                    }
                };
                Tuple tuple = new Tuple();
                tuple.setKey(key);
                tuple.setValue(this.itOfCurrentKey);
                return tuple;
            }
        };
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void clean() {
        File file = new File(getSaveFolderName());
        try {
            FileUtils.cleanDirectory(file);
        } catch (IOException e) {
            LOG.log(Level.SEVERE, "Failed to clear directory: " + file, (Throwable) e);
        }
        this.status = FSStatus.DONE;
    }

    private String getSaveFolderName() {
        return this.folder + "/" + this.operationName;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSaveFileName(int i) {
        return this.folder + "/" + this.operationName + "/part_" + i;
    }

    private String getSizesFileName(int i) {
        return this.folder + "/" + this.operationName + "/part_sizes_" + i;
    }
}
