package edu.iu.dsc.tws.comms.shuffle;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.util.CommonThreadPool;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger2.class */
public class FSKeyedSortedMerger2 implements Shuffle {
    private static final Logger LOG = Logger.getLogger(FSKeyedSortedMerger2.class.getName());
    private long maxBytesToKeepInMemory;
    private long maxBytesFile;
    private boolean groupByKey;
    private String folder;
    private String operationName;
    private MessageType keyType;
    private MessageType dataType;
    private Comparator keyComparator;
    private ComparatorWrapper comparatorWrapper;
    private int parallelIOAllowance;
    private volatile Semaphore concurrentIOs;
    private int target;
    private int noOfFileWritten = 0;
    private AtomicLong largestTupleSizeRecorded = new AtomicLong(Long.MIN_VALUE);
    private long numOfBytesInMemory = 0;
    private final Object exclusiveAccess = new Object();
    private volatile Semaphore fileWriteLock = new Semaphore(1);
    private FSStatus status = FSStatus.WRITING_MEMORY;
    private ArrayList<Tuple> recordsInMemory = new ArrayList<>();
    private ArrayList<Tuple> recordsToDisk = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger2$ComparatorWrapper.class */
    public class ComparatorWrapper implements Comparator<Tuple> {
        private Comparator comparator;

        ComparatorWrapper(Comparator comparator) {
            this.comparator = comparator;
        }

        @Override // java.util.Comparator
        public int compare(Tuple tuple, Tuple tuple2) {
            return this.comparator.compare(tuple.getKey(), tuple2.getKey());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger2$FSIterator.class */
    public class FSIterator implements RestorableIterator<Object> {
        private static final String RP_SAME_KEY_READER = "SAME_KEY_READER";
        private static final String RP_FILE_READERS = "FILE_READERS";
        private PriorityQueue<ControlledReader<Tuple>> controlledFileReaders;
        private ControlledReader<Tuple> sameKeyReader;
        private RestorePoint restorePoint;
        private ControlledFileReaderFlags meta;

        FSIterator() {
            this.controlledFileReaders = new PriorityQueue<>(1 + FSKeyedSortedMerger2.this.noOfFileWritten);
            this.meta = new ControlledFileReaderFlags(Math.max(FSKeyedSortedMerger2.this.numOfBytesInMemory, FSKeyedSortedMerger2.this.largestTupleSizeRecorded.get()), FSKeyedSortedMerger2.this.keyComparator);
            if (!FSKeyedSortedMerger2.this.recordsInMemory.isEmpty()) {
                ControlledMemoryReader controlledMemoryReader = new ControlledMemoryReader(FSKeyedSortedMerger2.this.recordsInMemory, FSKeyedSortedMerger2.this.keyComparator);
                if (controlledMemoryReader.hasNext()) {
                    this.controlledFileReaders.add(controlledMemoryReader);
                }
            }
            for (int i = 0; i < FSKeyedSortedMerger2.this.noOfFileWritten; i++) {
                ControlledFileReader controlledFileReader = new ControlledFileReader(this.meta, FSKeyedSortedMerger2.this.getSaveFileName(i), FSKeyedSortedMerger2.this.keyType, FSKeyedSortedMerger2.this.dataType, FSKeyedSortedMerger2.this.keyComparator);
                if (controlledFileReader.hasNext()) {
                    this.controlledFileReaders.add(controlledFileReader);
                } else {
                    controlledFileReader.releaseResources();
                    FSKeyedSortedMerger2.LOG.warning("Found a controlled file reader without any data");
                }
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return (this.sameKeyReader == null && this.controlledFileReaders.isEmpty()) ? false : true;
        }

        @Override // java.util.Iterator
        public Tuple next() {
            ControlledReader<Tuple> controlledReader = this.sameKeyReader;
            if (controlledReader == null || !controlledReader.hasNext()) {
                controlledReader = this.controlledFileReaders.poll();
                controlledReader.open();
            }
            Tuple tuple = (Tuple) controlledReader.next();
            if (controlledReader.hasNext() && tuple.getKey().equals(controlledReader.nextKey())) {
                this.sameKeyReader = controlledReader;
            } else if (controlledReader.hasNext()) {
                this.sameKeyReader = null;
                this.controlledFileReaders.add(controlledReader);
            } else {
                this.sameKeyReader = null;
                controlledReader.releaseResources();
            }
            return tuple;
        }

        @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
        public void createRestorePoint() {
            this.restorePoint = new RestorePoint();
            if (this.sameKeyReader != null) {
                this.sameKeyReader.createRestorePoint();
                this.restorePoint.put(RP_SAME_KEY_READER, this.sameKeyReader);
            }
            ArrayList arrayList = new ArrayList(this.controlledFileReaders);
            arrayList.forEach((v0) -> {
                v0.createRestorePoint();
            });
            this.restorePoint.put(RP_FILE_READERS, arrayList);
        }

        @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
        public void restore() {
            if (!hasRestorePoint()) {
                throw new RuntimeException("Couldn't find a valid restore point to restore from.");
            }
            this.meta.reset();
            this.sameKeyReader = (ControlledReader) this.restorePoint.get(RP_SAME_KEY_READER);
            if (this.sameKeyReader != null) {
                this.sameKeyReader.restore();
            }
            this.controlledFileReaders.clear();
            ((List) this.restorePoint.get(RP_FILE_READERS)).forEach(controlledReader -> {
                controlledReader.restore();
                this.controlledFileReaders.add(controlledReader);
            });
        }

        @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
        public boolean hasRestorePoint() {
            return this.restorePoint != null;
        }

        @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
        public void clearRestorePoint() {
            this.restorePoint = null;
            this.controlledFileReaders.iterator().forEachRemaining((v0) -> {
                v0.clearRestorePoint();
            });
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger2$FSStatus.class */
    private enum FSStatus {
        WRITING_MEMORY,
        WRITING_DISK,
        READING,
        DONE
    }

    /* loaded from: input_file:edu/iu/dsc/tws/comms/shuffle/FSKeyedSortedMerger2$FileSaveWorker.class */
    private class FileSaveWorker implements Runnable {
        private ArrayList<Tuple> referenceToRecordsInMemory;
        private String fileName;
        private long bytesInMemory;

        FileSaveWorker(ArrayList<Tuple> arrayList, int i, long j) {
            this.referenceToRecordsInMemory = arrayList;
            this.bytesInMemory = j;
            this.fileName = FSKeyedSortedMerger2.this.getSaveFileName(i);
        }

        @Override // java.lang.Runnable
        public void run() {
            FSKeyedSortedMerger2.LOG.info(String.format("Shuffle saving to temporary file bytes %d, file %s", Long.valueOf(this.bytesInMemory), this.fileName));
            this.referenceToRecordsInMemory.sort(FSKeyedSortedMerger2.this.comparatorWrapper);
            FSKeyedSortedMerger2.this.largestTupleSizeRecorded.set(Math.max(FSKeyedSortedMerger2.this.largestTupleSizeRecorded.get(), FileLoader.saveKeyValues(this.referenceToRecordsInMemory, this.bytesInMemory, this.fileName, FSKeyedSortedMerger2.this.keyType)));
            FSKeyedSortedMerger2.this.concurrentIOs.release();
            synchronized (FSKeyedSortedMerger2.this.exclusiveAccess) {
                if (FSKeyedSortedMerger2.this.concurrentIOs.availablePermits() == FSKeyedSortedMerger2.this.parallelIOAllowance) {
                    FSKeyedSortedMerger2.this.fileWriteLock.release();
                }
            }
        }
    }

    public FSKeyedSortedMerger2(long j, long j2, String str, String str2, MessageType messageType, MessageType messageType2, Comparator comparator, int i, boolean z, int i2) {
        this.maxBytesToKeepInMemory = j;
        this.maxBytesFile = j2;
        this.groupByKey = z;
        this.folder = str;
        this.operationName = str2;
        this.keyType = messageType;
        this.dataType = messageType2;
        this.keyComparator = comparator;
        this.comparatorWrapper = new ComparatorWrapper(this.keyComparator);
        this.parallelIOAllowance = i2;
        this.concurrentIOs = new Semaphore(i2);
        this.target = i;
        LOG.info("Disk merger configured. Folder : " + this.folder + ", Bytes in memory :" + j + ", File size: " + this.maxBytesFile);
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public synchronized void add(Tuple tuple) {
        if (this.status == FSStatus.READING) {
            throw new RuntimeException("Cannot add after switching to reading");
        }
        if (this.status != FSStatus.WRITING_MEMORY) {
            this.recordsToDisk.add(tuple);
            this.numOfBytesInMemory += ((byte[]) tuple.getValue()).length;
            return;
        }
        this.recordsInMemory.add(tuple);
        this.numOfBytesInMemory += ((byte[]) tuple.getValue()).length;
        if (this.numOfBytesInMemory >= this.maxBytesToKeepInMemory) {
            this.status = FSStatus.WRITING_DISK;
            this.numOfBytesInMemory = 0L;
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public synchronized void add(Object obj, byte[] bArr, int i) {
        if (this.status == FSStatus.READING) {
            throw new RuntimeException("Cannot add after switching to reading");
        }
        if (this.status != FSStatus.WRITING_MEMORY) {
            this.recordsToDisk.add(new Tuple(obj, bArr));
            this.numOfBytesInMemory += bArr.length;
            return;
        }
        this.recordsInMemory.add(new Tuple(obj, bArr));
        this.numOfBytesInMemory += bArr.length;
        if (this.numOfBytesInMemory >= this.maxBytesToKeepInMemory) {
            LOG.info(String.format("Switching to write to disk memory %d >= maxMemory %d", Long.valueOf(this.numOfBytesInMemory), Long.valueOf(this.maxBytesToKeepInMemory)));
            this.status = FSStatus.WRITING_DISK;
            this.numOfBytesInMemory = 0L;
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public synchronized void switchToReading() {
        try {
            try {
                this.fileWriteLock.acquire();
                LOG.info(String.format("Reading from %d files", Integer.valueOf(this.noOfFileWritten)));
                this.status = FSStatus.READING;
                this.recordsInMemory.addAll(this.recordsToDisk);
                this.recordsToDisk = new ArrayList<>();
                this.numOfBytesInMemory = 0L;
                deserializeObjects();
                long currentTimeMillis = System.currentTimeMillis();
                this.recordsInMemory.sort(this.comparatorWrapper);
                LOG.info("Memory sorting time: " + (System.currentTimeMillis() - currentTimeMillis));
                this.fileWriteLock.release();
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Couldn't switch to reading", (Throwable) e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.fileWriteLock.release();
            throw th;
        }
    }

    private void deserializeObjects() {
        int threadCount = CommonThreadPool.getThreadCount() + 1;
        ArrayList arrayList = new ArrayList();
        int size = this.recordsInMemory.size() / threadCount;
        if (this.recordsInMemory.size() % threadCount != 0) {
            size++;
        }
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= this.recordsInMemory.size()) {
                break;
            }
            int min = Math.min(this.recordsInMemory.size(), i2 + size);
            if (min == this.recordsInMemory.size()) {
                ListIterator<Tuple> listIterator = this.recordsInMemory.listIterator(i2);
                int i3 = min - i2;
                for (int i4 = 0; listIterator.hasNext() && i4 < i3; i4++) {
                    Tuple next = listIterator.next();
                    next.setValue(this.dataType.getDataPacker().unpackFromByteArray((byte[]) next.getValue()));
                }
            } else {
                arrayList.add(CommonThreadPool.getExecutor().submit(() -> {
                    ListIterator<Tuple> listIterator2 = this.recordsInMemory.listIterator(i2);
                    int i5 = min - i2;
                    for (int i6 = 0; listIterator2.hasNext() && i6 < i5; i6++) {
                        Tuple next2 = listIterator2.next();
                        next2.setValue(this.dataType.getDataPacker().unpackFromByteArray((byte[]) next2.getValue()));
                    }
                    return true;
                }));
            }
            i = i2 + size;
        }
        for (int i5 = 0; i5 < arrayList.size(); i5++) {
            try {
                ((Future) arrayList.get(i5)).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Error in deserializing records in memory", e);
            }
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public synchronized void run() {
        if (this.status == FSStatus.WRITING_DISK && this.numOfBytesInMemory >= this.maxBytesFile) {
            if (this.concurrentIOs.availablePermits() == 0) {
                LOG.fine("Communication thread will block on disk IO thread, since " + this.parallelIOAllowance + " io operations are already ongoing.");
            }
            try {
                synchronized (this.exclusiveAccess) {
                    this.fileWriteLock.tryAcquire();
                }
                this.concurrentIOs.acquire();
                ArrayList<Tuple> arrayList = this.recordsToDisk;
                int i = this.noOfFileWritten;
                long j = this.numOfBytesInMemory;
                if (arrayList != null) {
                    FileSaveWorker fileSaveWorker = new FileSaveWorker(arrayList, i, j);
                    if (CommonThreadPool.isActive()) {
                        CommonThreadPool.getExecutor().execute(fileSaveWorker);
                    } else {
                        fileSaveWorker.run();
                    }
                }
                this.recordsToDisk = new ArrayList<>();
                this.noOfFileWritten++;
                this.numOfBytesInMemory = 0L;
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Couldn't write to the file", (Throwable) e);
                this.concurrentIOs.release();
            }
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public RestorableIterator<Object> readIterator() {
        try {
            return new RestorableIterator<Object>() { // from class: edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger2.1
                private static final String RP_NEXT_TUPLE = "NEXT_TUPLE";
                private static final String RP_IT_OF_CURR_KEY = "IT_OF_CURR_KEY";
                private FSIterator fsIterator;
                private Tuple nextTuple;
                private Iterator itOfCurrentKey;
                private RestorePoint restorePoint;

                {
                    this.fsIterator = new FSIterator();
                    this.nextTuple = this.fsIterator.hasNext() ? this.fsIterator.next() : null;
                    this.itOfCurrentKey = null;
                }

                @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
                public void createRestorePoint() {
                    this.restorePoint = new RestorePoint();
                    this.restorePoint.put(RP_NEXT_TUPLE, this.nextTuple);
                    if (FSKeyedSortedMerger2.this.groupByKey) {
                        this.restorePoint.put(RP_IT_OF_CURR_KEY, this.itOfCurrentKey);
                    }
                    this.fsIterator.createRestorePoint();
                }

                @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
                public void restore() {
                    if (!hasRestorePoint()) {
                        throw new RuntimeException("Couldn't find a valid restore point to restore from.");
                    }
                    this.nextTuple = (Tuple) this.restorePoint.get(RP_NEXT_TUPLE);
                    this.itOfCurrentKey = (Iterator) this.restorePoint.get(RP_IT_OF_CURR_KEY);
                    this.fsIterator.restore();
                }

                @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
                public boolean hasRestorePoint() {
                    return this.restorePoint != null;
                }

                @Override // edu.iu.dsc.tws.comms.shuffle.Restorable
                public void clearRestorePoint() {
                    this.restorePoint = null;
                    this.fsIterator.clearRestorePoint();
                }

                private void skipKeys() {
                    if (this.itOfCurrentKey != null) {
                        while (this.itOfCurrentKey.hasNext()) {
                            this.itOfCurrentKey.next();
                        }
                    }
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    skipKeys();
                    return this.nextTuple != null;
                }

                @Override // java.util.Iterator
                public Tuple<Object, Object> next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException("There are no more keys to iterate");
                    }
                    final Object key = this.nextTuple.getKey();
                    Tuple<Object, Object> tuple = new Tuple<>();
                    tuple.setKey(key);
                    if (FSKeyedSortedMerger2.this.groupByKey) {
                        this.itOfCurrentKey = new Iterator<Object>() { // from class: edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger2.1.1
                            @Override // java.util.Iterator
                            public boolean hasNext() {
                                return AnonymousClass1.this.nextTuple != null && AnonymousClass1.this.nextTuple.getKey().equals(key);
                            }

                            @Override // java.util.Iterator
                            public Object next() {
                                if (!hasNext()) {
                                    throw new NoSuchElementException("There are no more values for key " + key);
                                }
                                Object value = AnonymousClass1.this.nextTuple.getValue();
                                if (AnonymousClass1.this.fsIterator.hasNext()) {
                                    AnonymousClass1.this.nextTuple = AnonymousClass1.this.fsIterator.next();
                                } else {
                                    AnonymousClass1.this.nextTuple = null;
                                }
                                return value;
                            }
                        };
                        tuple.setValue(this.itOfCurrentKey);
                    } else {
                        tuple.setValue(this.nextTuple.getValue());
                        if (this.fsIterator.hasNext()) {
                            this.nextTuple = this.fsIterator.next();
                        } else {
                            this.nextTuple = null;
                        }
                    }
                    return tuple;
                }
            };
        } catch (Exception e) {
            LOG.log(Level.SEVERE, "Error in creating iterator", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // edu.iu.dsc.tws.comms.shuffle.Shuffle
    public void clean() {
        for (int i = 0; i < this.noOfFileWritten; i++) {
            File file = new File(getSaveFileName(i));
            if (file.exists() && !file.delete()) {
                LOG.warning("Couldn't delete file : " + file.getName());
            }
        }
        new File(getSaveFolderName()).deleteOnExit();
        this.status = FSStatus.DONE;
    }

    private String getSaveFolderName() {
        return this.folder + "/" + this.operationName;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSaveFileName(int i) {
        return getSaveFolderName() + "/part_" + i;
    }

    private String getSizesFileName(int i) {
        return this.folder + "/" + this.operationName + "/part_sizes_" + i;
    }
}
