package io.streamthoughts.kafka.connect.filepulse.fs.client;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
import io.streamthoughts.kafka.connect.filepulse.fs.stream.ConnectionAwareInputStream;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import java.net.URI;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClient.class */
public class SftpClient {
    static final String IS_REGULAR_FILE = "is.regular.file";
    public static final String CHANNEL_TYPE = "sftp";
    private final SftpFilesystemListingConfig config;
    private static final Logger log = LoggerFactory.getLogger(SftpClient.class);
    static final List<Integer> TRANSIENT_SFTP_ERROR_CODES = List.of(6, 7, 5);

    public SftpClient(SftpFilesystemListingConfig sftpFilesystemListingConfig) {
        this.config = sftpFilesystemListingConfig;
    }

    public Stream<FileObjectMeta> listFiles(String str) {
        log.info("Listing files in path '{}'", str);
        return listAll(str).filter(this::isRegularFile).map(this::buildFileMetadata).peek(fileObjectMeta -> {
            log.debug("Found object '{}'", fileObjectMeta);
        });
    }

    public Stream<ChannelSftp.LsEntry> listAll(String str) {
        return (Stream) retryAction(() -> {
            return (Stream) doInCloseableConnection(sftpConnection -> {
                return listAllCore(str, sftpConnection);
            });
        });
    }

    Stream<ChannelSftp.LsEntry> listAllCore(String str, SftpConnection sftpConnection) {
        try {
            return sftpConnection.getChannel().ls(str).stream();
        } catch (SftpException e) {
            throw new ConnectFilePulseException("Cannot list files", e);
        }
    }

    public FileObjectMeta buildFileMetadata(ChannelSftp.LsEntry lsEntry) {
        return buildFileMetadata(lsEntry.getFilename(), lsEntry.getAttrs());
    }

    FileObjectMeta buildFileMetadata(String str, SftpATTRS sftpATTRS) {
        return new GenericFileObjectMeta.Builder().withName(str).withUri(buildUri(str)).withLastModified(Instant.ofEpochSecond(sftpATTRS.getMTime())).withContentLength(sftpATTRS.getSize()).withUserDefinedMetadata(Map.of(IS_REGULAR_FILE, Boolean.valueOf(sftpATTRS.isReg()))).build();
    }

    private boolean isRegularFile(ChannelSftp.LsEntry lsEntry) {
        return lsEntry.getAttrs().isReg();
    }

    public SftpATTRS statFile(String str) {
        return (SftpATTRS) retryAction(() -> {
            return (SftpATTRS) doInCloseableConnection(sftpConnection -> {
                return statFileCore(str, sftpConnection);
            });
        });
    }

    SftpATTRS statFileCore(String str, SftpConnection sftpConnection) {
        log.info("Getting attributes for file '{}'", str);
        try {
            return sftpConnection.getChannel().stat(str);
        } catch (SftpException e) {
            throw new ConnectFilePulseException("Cannot stat file: " + str, e);
        }
    }

    public Stream<FileObjectMeta> getObjectMetadata(URI uri) {
        String uri2 = uri.toString();
        String name = FilenameUtils.getName(uri2);
        return Stream.of((SftpATTRS) retryAction(() -> {
            return (SftpATTRS) doInCloseableConnection(sftpConnection -> {
                return statFileCore(uri2, sftpConnection);
            });
        })).map(sftpATTRS -> {
            return buildFileMetadata(name, sftpATTRS);
        });
    }

    public URI buildUri(String str) {
        return URI.create(String.format("%s/%s", this.config.getSftpListingDirectoryPath(), str));
    }

    public ConnectionAwareInputStream sftpFileInputStream(URI uri) {
        log.info("Getting InputStream for '{}'", uri);
        String uri2 = uri.toString();
        return (ConnectionAwareInputStream) retryAction(() -> {
            return (ConnectionAwareInputStream) doInConnection(sftpConnection -> {
                try {
                    return new ConnectionAwareInputStream(sftpConnection, uri2, sftpConnection.getChannel().get(uri2));
                } catch (SftpException e) {
                    log.error("Cannot open sftp InputStream for " + uri2, e);
                    throw new ConnectFilePulseException(e);
                }
            });
        });
    }

    public boolean delete(URI uri) {
        log.info("Deleting file on '{}'", uri);
        String uri2 = uri.toString();
        return ((Boolean) retryAction(() -> {
            return (Boolean) doInCloseableConnection(sftpConnection -> {
                try {
                    sftpConnection.getChannel().rm(uri2);
                    return true;
                } catch (SftpException e) {
                    log.error("Failed to remove file from " + uri, e);
                    if (isRetryableException(e)) {
                        throw new ConnectFilePulseException(e);
                    }
                    return false;
                }
            });
        })).booleanValue();
    }

    public boolean move(URI uri, URI uri2) {
        log.info("Moving file from '{}' to {} ", uri, uri2);
        return ((Boolean) retryAction(() -> {
            return (Boolean) doInCloseableConnection(sftpConnection -> {
                try {
                    sftpConnection.getChannel().rename(uri.toString(), uri2.toString());
                    return true;
                } catch (SftpException e) {
                    log.error("Failed to move file from {} to {} ", new Object[]{uri, uri2, e});
                    if (isRetryableException(e)) {
                        throw new ConnectFilePulseException(e);
                    }
                    return false;
                }
            });
        })).booleanValue();
    }

    <R> R doInConnection(Function<SftpConnection, R> function) {
        return function.apply(createSftpConnection());
    }

    <R> R doInCloseableConnection(Function<SftpConnection, R> function) {
        SftpConnection createSftpConnection = createSftpConnection();
        try {
            R apply = function.apply(createSftpConnection);
            if (createSftpConnection != null) {
                createSftpConnection.close();
            }
            return apply;
        } catch (Throwable th) {
            if (createSftpConnection != null) {
                try {
                    createSftpConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    SftpConnection createSftpConnection() {
        return new SftpConnection(this.config);
    }

    <T> T retryAction(Supplier<T> supplier) {
        try {
            return supplier.get();
        } catch (ConnectFilePulseException e) {
            return (T) retryActionCore(this.config.getSftpRetries(), supplier, e);
        }
    }

    <T> T retryActionCore(Integer num, Supplier<T> supplier, ConnectFilePulseException connectFilePulseException) {
        if (num.intValue() == 0) {
            throw connectFilePulseException;
        }
        log.debug("Retrying action, attempt {}", Integer.valueOf((this.config.getSftpRetries().intValue() - num.intValue()) + 1));
        try {
            return supplier.get();
        } catch (ConnectFilePulseException e) {
            wait(this.config);
            return (T) retryActionCore(Integer.valueOf(num.intValue() - 1), supplier, e);
        }
    }

    private void wait(SftpFilesystemListingConfig sftpFilesystemListingConfig) {
        try {
            Thread.sleep(sftpFilesystemListingConfig.getSftpDelayBetweenRetriesMs().intValue());
        } catch (InterruptedException e) {
            throw new ConnectFilePulseException(buildConnectErrorMsg(), e);
        }
    }

    private String buildConnectErrorMsg() {
        return String.format("Cannot connect as user %s to %s:%d", this.config.getSftpListingUser(), this.config.getSftpListingHost(), this.config.getSftpListingPort());
    }

    private static boolean isRetryableException(SftpException sftpException) {
        return TRANSIENT_SFTP_ERROR_CODES.stream().anyMatch(num -> {
            return sftpException.id == num.intValue();
        });
    }
}
