package edu.iu.dsc.tws.task.dataobjects;

import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.api.formatters.LocalTextInputPartitioner;
import edu.iu.dsc.tws.data.fs.io.InputSplit;
import edu.iu.dsc.tws.dataset.DataSource;
import edu.iu.dsc.tws.executor.core.ExecutionRuntime;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/dataobjects/DataObjectSource.class */
public class DataObjectSource<T> extends BaseSource {
    private static final Logger LOG = Logger.getLogger(DataObjectSource.class.getName());
    private static final long serialVersionUID = -1;
    private DataSource<?, ?> source;
    private String edgeName;
    private String dataDirectory;

    public DataObjectSource() {
    }

    public DataObjectSource(String str, String str2) {
        this.edgeName = str;
        this.dataDirectory = str2;
    }

    public String getDataDirectory() {
        return this.dataDirectory;
    }

    public void setDataDirectory(String str) {
        this.dataDirectory = str;
    }

    public String getEdgeName() {
        return this.edgeName;
    }

    public void setEdgeName(String str) {
        this.edgeName = str;
    }

    public void execute() {
        InputSplit nextSplit = this.source.getNextSplit(this.context.taskIndex());
        while (nextSplit != null) {
            while (!nextSplit.reachedEnd()) {
                try {
                    Object nextRecord = nextSplit.nextRecord((Object) null);
                    if (nextRecord != null) {
                        this.context.write(getEdgeName(), nextRecord);
                    }
                } catch (IOException e) {
                    LOG.log(Level.SEVERE, "Failed to read the input", (Throwable) e);
                }
            }
            nextSplit = this.source.getNextSplit(this.context.taskIndex());
        }
        this.context.end(getEdgeName());
    }

    public void prepare(Config config, TaskContext taskContext) {
        super.prepare(config, taskContext);
        this.source = ((ExecutionRuntime) config.get("_twister2.runtime_")).createInput(config, taskContext, new LocalTextInputPartitioner(new Path(getDataDirectory()), taskContext.getParallelism(), config));
    }
}
