package org.linkedopenactors.code.csvimporter;

import de.naturzukunft.rdf4j.vocabulary.SCHEMA_ORG;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.IterableUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleNamespace;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/linkedopenactors/code/csvimporter/AbstractCsvImporter.class */
public abstract class AbstractCsvImporter implements CsvImporter {
    private static final Logger log = LoggerFactory.getLogger(AbstractCsvImporter.class);
    private Set<Namespace> additionalNamespaces = new HashSet();

    @Override // org.linkedopenactors.code.csvimporter.CsvImporter
    public long doImport(Repository repository, InputStream inputStream, Namespace namespace) {
        log.debug("->" + getClass().getSimpleName() + " starting initialization.");
        this.additionalNamespaces.add(namespace);
        try {
            ListUtils.partition(IterableUtils.toList(CSVFormat.DEFAULT.withFirstRecordAsHeader().withHeader(getHeaderEnum()).parse(new InputStreamReader(inputStream))), 5000).forEach(list -> {
                log.debug("processing partial list of csvRecords: " + list.size());
                toRdfStore(repository, (Set) ((Stream) list.stream().parallel()).map(cSVRecord -> {
                    return convert(cSVRecord, namespace);
                }).map((v0) -> {
                    return v0.getModel();
                }).flatMap(model -> {
                    return model.stream();
                }).collect(Collectors.toSet()));
            });
            RepositoryConnection connection = repository.getConnection();
            try {
                int size = Iterations.asList(connection.getStatements((Resource) null, RDF.TYPE, SCHEMA_ORG.CreativeWork, new Resource[0])).size();
                log.debug("initial load added " + size + " creativeWorks (LOA Publications)");
                long j = size;
                if (connection != null) {
                    connection.close();
                }
                return j;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("error while initial load", e);
        }
    }

    protected abstract SubjectModelPair convert(CSVRecord cSVRecord, Namespace namespace);

    protected abstract Class<? extends Enum<?>> getHeaderEnum();

    private Set<Statement> toRdfStore(Repository repository, Set<Statement> set) {
        log.debug("now adding " + set.size() + " statements to the repository " + repository);
        RepositoryConnection connection = repository.getConnection();
        try {
            connection.begin();
            try {
                getNamespaces().forEach(namespace -> {
                    connection.setNamespace(namespace.getPrefix(), namespace.getName());
                });
                connection.add(set, new Resource[0]);
                connection.commit();
                log.debug("statements added sucessfully");
                if (connection != null) {
                    connection.close();
                }
                return set;
            } catch (Throwable th) {
                log.error("ROLLBACK: " + th.getMessage(), th);
                connection.rollback();
                Set<Statement> emptySet = Collections.emptySet();
                if (connection != null) {
                    connection.close();
                }
                return emptySet;
            }
        } catch (Throwable th2) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    protected Set<Namespace> getNamespaces() {
        HashSet hashSet = new HashSet();
        hashSet.add(new SimpleNamespace("schema", "https://schema.org/"));
        hashSet.add(new SimpleNamespace("as", "https://www.w3.org/ns/activitystreams#"));
        hashSet.addAll(getAdditionalNamespaces());
        return hashSet;
    }

    protected Set<Namespace> getAdditionalNamespaces() {
        return this.additionalNamespaces;
    }
}
