package io.bigdime.handler.webhdfs;

import com.google.common.base.Preconditions;
import io.bigdime.alert.Logger;
import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.ActionEvent;
import io.bigdime.core.AdaptorConfigurationException;
import io.bigdime.core.HandlerException;
import io.bigdime.core.SinkHandlerException;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.commons.PropertyHelper;
import io.bigdime.core.config.AdaptorConfig;
import io.bigdime.core.handler.AbstractHandler;
import io.bigdime.core.handler.HandlerJournal;
import io.bigdime.core.runtimeinfo.RuntimeInfo;
import io.bigdime.core.runtimeinfo.RuntimeInfoStore;
import io.bigdime.core.runtimeinfo.RuntimeInfoStoreException;
import io.bigdime.handler.constants.WebHDFSWriterHandlerConstants;
import io.bigdime.libs.hdfs.WebHdfs;
import io.bigdime.libs.hdfs.WebHdfsWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:io/bigdime/handler/webhdfs/WebHDFSWriterHandler.class */
public class WebHDFSWriterHandler extends AbstractHandler {
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger(WebHDFSWriterHandler.class));
    private String hostNames;
    private int port;
    private String hdfsFileName;
    private String hdfsFileNamePrefix = "";
    private String hdfsFileNameExtension = "";
    private String hdfsPath;
    private String hdfsUser;
    private String hdfsOverwrite;
    private String hdfsPermission;
    private WebHdfs webHdfs;

    @Autowired
    private RuntimeInfoStore<RuntimeInfo> runtimeInfoStore;
    private String handlerPhase;
    private String channelDesc;
    private Map<String, String> tokenToHeaderNameMap;

    public void build() throws AdaptorConfigurationException {
        super.build();
        try {
            this.handlerPhase = "building WebHDFSWriterHandler";
            logger.info(this.handlerPhase, "building WebHDFSWriterHandler");
            this.hostNames = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HOST_NAMES);
            this.port = PropertyHelper.getIntProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.PORT);
            this.hdfsFileNamePrefix = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_FILE_NAME_PREFIX, "");
            this.hdfsFileNameExtension = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_FILE_NAME_EXTENSION, "");
            this.channelDesc = (String) getPropertyMap().get("channel-desc");
            logger.info(this.handlerPhase, "hdfsFileNamePrefix={} hdfsFileNameExtension={} channelDesc={} hdfsFileName={}", new Object[]{this.hdfsFileNamePrefix, this.hdfsFileNameExtension, this.channelDesc, this.hdfsFileName});
            this.hdfsPath = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_PATH);
            this.hdfsUser = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_USER);
            logger.info(this.handlerPhase, "hdfsUser={}", new Object[]{this.hdfsUser});
            this.hdfsPermission = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_PERMISSIONS);
            this.hdfsOverwrite = PropertyHelper.getStringProperty(getPropertyMap(), WebHDFSWriterHandlerConstants.HDFS_OVERWRITE);
            this.tokenToHeaderNameMap = new LinkedHashMap();
            Matcher matcher = Pattern.compile("\\$\\{(\\w+)\\}+").matcher(this.hdfsPath);
            while (matcher.find()) {
                this.tokenToHeaderNameMap.put(matcher.group(), matcher.group(1));
            }
            logger.info("building WebHDFSWriterHandler", "hostNames={} port={} hdfsFileName={} hdfsPath={} hdfsUser={} hdfsPermission={} hdfsOverwrite={}", new Object[]{this.hostNames, Integer.valueOf(this.port), this.hdfsFileName, this.hdfsPath, this.hdfsUser, this.hdfsPermission, this.hdfsOverwrite});
        } catch (Exception e) {
            throw new AdaptorConfigurationException(e);
        }
    }

    public ActionEvent.Status process() throws HandlerException {
        this.handlerPhase = "processing WebHDFSWriterHandler";
        logger.info(this.handlerPhase, "webHdfs processing event");
        WebHDFSWriterHandlerJournal journal = getJournal(WebHDFSWriterHandlerJournal.class);
        if (journal == null || journal.getEventList() == null) {
            List<ActionEvent> eventList = getHandlerContext().getEventList();
            Preconditions.checkNotNull(eventList, "eventList in HandlerContext can't be null");
            logger.info(this.handlerPhase, "journal is null, actionEvents.size={} id={} ", new Object[]{Integer.valueOf(eventList.size()), getId()});
            return process0(eventList);
        }
        List<ActionEvent> eventList2 = journal.getEventList();
        AdaptorLogger adaptorLogger = logger;
        String str = this.handlerPhase;
        Object[] objArr = new Object[1];
        objArr[0] = Boolean.valueOf(eventList2 == null);
        adaptorLogger.info(str, "journal is not null, actionEvents==null={}", objArr);
        if (eventList2 == null || eventList2.isEmpty()) {
            return null;
        }
        return process0(journal.getEventList());
    }

    private String getPreviousHdfsPath(WebHDFSWriterHandlerJournal webHDFSWriterHandlerJournal) {
        return webHDFSWriterHandlerJournal.getCurrentHdfsPath();
    }

    private String getPreviousHdfsPathWithName(WebHDFSWriterHandlerJournal webHDFSWriterHandlerJournal) {
        return webHDFSWriterHandlerJournal.getCurrentHdfsPathWithName();
    }

    private String getPreviousHdfsFileName(WebHDFSWriterHandlerJournal webHDFSWriterHandlerJournal) {
        return webHDFSWriterHandlerJournal.getCurrentHdfsFileName();
    }

    private void buildFileName(ActionEvent actionEvent) {
        this.hdfsFileName = new HdfsFileNameBuilder().withChannelDesc(this.channelDesc).withPrefix(this.hdfsFileNamePrefix).withSourceFileName((String) actionEvent.getHeaders().get("sourceFileName")).withExtension(this.hdfsFileNameExtension).build();
    }

    private ActionEvent.Status process0(List<ActionEvent> list) throws HandlerException {
        HandlerJournal handlerJournal = (WebHDFSWriterHandlerJournal) getJournal(WebHDFSWriterHandlerJournal.class);
        if (handlerJournal == null) {
            logger.debug(this.handlerPhase, "jounral is null, initializing");
            handlerJournal = new WebHDFSWriterHandlerJournal();
            getHandlerContext().setJournal(getId(), handlerJournal);
        }
        logger.debug(this.handlerPhase, "previousHdfsPath={}", new Object[]{getPreviousHdfsPath(handlerJournal)});
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ActionEvent.Status status = ActionEvent.Status.READY;
        try {
            Iterator<ActionEvent> it = list.iterator();
            boolean z = true;
            String str = null;
            String str2 = null;
            ActionEvent actionEvent = null;
            ActionEvent actionEvent2 = null;
            HdfsFilePathBuilder hdfsFilePathBuilder = null;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                actionEvent = it.next();
                buildFileName(actionEvent);
                hdfsFilePathBuilder = new HdfsFilePathBuilder();
                str = hdfsFilePathBuilder.withActionEvent(actionEvent).withHdfsPath(this.hdfsPath).withTokenHeaderMap(this.tokenToHeaderNameMap).build();
                actionEvent.getHeaders().put(WebHDFSWriterHandlerConstants.HDFS_PATH, hdfsFilePathBuilder.getBaseHdfsPath());
                if (actionEvent.getHeaders().get("cleanupRequired") != null && ((String) actionEvent.getHeaders().get("cleanupRequired")).equalsIgnoreCase("true")) {
                    logger.debug(this.handlerPhase, "\"will cleanuphdfs dir={}\"", new Object[]{actionEvent.getHeaders()});
                    handleExceptionCondition(actionEvent);
                }
                str2 = str.endsWith(File.separator) ? str + this.hdfsFileName : str + File.separator + this.hdfsFileName;
                logger.debug(this.handlerPhase, "previousHdfsPath={} detokenizedHdfsPath={} previousHdfsPathWithName={} detokenizedHdfsPathWithName={} previousHdfsFileName={}", new Object[]{getPreviousHdfsPath(handlerJournal), str, getPreviousHdfsPathWithName(handlerJournal), str2, getPreviousHdfsFileName(handlerJournal)});
                if (!StringUtils.isBlank(getPreviousHdfsPathWithName(handlerJournal)) && !str2.equals(getPreviousHdfsPathWithName(handlerJournal))) {
                    logger.debug(this.handlerPhase, "new hdfspath, payloadEmpty={} hdfsFileName={} previousHdfsFileName={}", new Object[]{Boolean.valueOf(z), this.hdfsFileName, getPreviousHdfsFileName(handlerJournal)});
                    if (!z) {
                        logger.info(this.handlerPhase, "writing to hdfs, validation should be performed");
                        ActionEvent writeToHdfs = writeToHdfs(getPreviousHdfsPath(handlerJournal), byteArrayOutputStream.toByteArray(), getPreviousHdfsFileName(handlerJournal), hdfsFilePathBuilder, actionEvent2);
                        z = true;
                        handlerJournal.setEventList(list);
                        logger.info(this.handlerPhase, "setting event in journal, actionEvents.size={}", new Object[]{Integer.valueOf(list.size())});
                        status = ActionEvent.Status.CALLBACK;
                        getHandlerContext().createSingleItemEventList(writeToHdfs);
                        handlerJournal.setCurrentHdfsPath(str);
                        handlerJournal.setCurrentHdfsPathWithName(str2);
                        handlerJournal.setCurrentHdfsFileName(this.hdfsFileName);
                        handlerJournal.setRecordCount(0);
                        break;
                    }
                } else {
                    logger.debug(this.handlerPhase, "appending payload, record_count={} actionEvents.size={}", new Object[]{Integer.valueOf(handlerJournal.getRecordCount()), Integer.valueOf(list.size())});
                    byteArrayOutputStream.write(actionEvent.getBody());
                    it.remove();
                    initializeRecordCountInJournal(actionEvent, handlerJournal);
                    handlerJournal.incrementRecordCount();
                    z = false;
                    actionEvent2 = actionEvent;
                    logger.debug(this.handlerPhase, "appended payload, record_count={} actionEvents.size={}", new Object[]{Integer.valueOf(handlerJournal.getRecordCount()), Integer.valueOf(list.size())});
                }
                if (StringUtils.isBlank(getPreviousHdfsPathWithName(handlerJournal))) {
                    handlerJournal.setCurrentHdfsPath(str);
                    handlerJournal.setCurrentHdfsPathWithName(str2);
                    handlerJournal.setCurrentHdfsFileName(this.hdfsFileName);
                }
            }
            if (actionEvent != null && list.isEmpty()) {
                if (!z) {
                    logger.info(this.handlerPhase, "writing to hdfs. previousHdfsPath={} detokenizedHdfsPath={} previousHdfsPathWithName={} detokenizedHdfsPathWithName={}", new Object[]{getPreviousHdfsPath(handlerJournal), str, getPreviousHdfsPathWithName(handlerJournal), str2});
                    getHandlerContext().createSingleItemEventList(writeToHdfs(str, byteArrayOutputStream.toByteArray(), this.hdfsFileName, hdfsFilePathBuilder, actionEvent));
                }
                logger.info(this.handlerPhase, "clearing the journal ", new Object[]{Integer.valueOf(list.size()), getId()});
                getHandlerContext().clearJournal(getId());
            }
            logger.debug(this.handlerPhase, "statusToReturn={}", new Object[]{status});
            return status;
        } catch (IOException e) {
            logger.alert(Logger.ALERT_TYPE.INGESTION_FAILED, Logger.ALERT_CAUSE.APPLICATION_INTERNAL_ERROR, Logger.ALERT_SEVERITY.BLOCKER, "\"IOException received\" error={}", new Object[]{e.toString()});
            throw new SinkHandlerException("WebHDFSWriterHandler processing event failed", e);
        } catch (Exception e2) {
            throw new HandlerException(e2.getMessage(), e2);
        }
    }

    private ActionEvent writeToHdfs(String str, byte[] bArr, String str2, HdfsFilePathBuilder hdfsFilePathBuilder, ActionEvent actionEvent) throws IOException, RuntimeInfoStoreException, HandlerException {
        WebHdfsWriter webHdfsWriter = new WebHdfsWriter();
        HdfsFilePathBuilder hdfsFilePathBuilder2 = new HdfsFilePathBuilder();
        String build = hdfsFilePathBuilder2.withActionEvent(actionEvent).withHdfsPath(this.hdfsPath).withTokenHeaderMap(this.tokenToHeaderNameMap).build();
        WebHDFSWriterHandlerJournal journal = getJournal(WebHDFSWriterHandlerJournal.class);
        logger.debug(this.handlerPhase, "journal={}", new Object[]{journal});
        logger.debug(this.handlerPhase, "_message=\"writing to hdfs\" hdfsPath={} hdfsFileName={} hdfsUser={}", new Object[]{build, str2, this.hdfsUser});
        if (this.webHdfs == null) {
            this.webHdfs = WebHdfs.getInstance(this.hostNames, this.port).addHeader("Content-Type", "application/octet-stream").addParameter("user.name", this.hdfsUser);
        }
        webHdfsWriter.write(this.webHdfs, build, bArr, str2);
        this.webHdfs = null;
        logger.debug(this.handlerPhase, "wrote to hdfs");
        String str3 = null;
        String str4 = null;
        for (Map.Entry<String, String> entry : hdfsFilePathBuilder2.getPartitionNameValueMap().entrySet()) {
            if (str4 == null) {
                str4 = entry.getValue();
                str3 = entry.getKey();
            } else {
                str4 = str4 + "," + entry.getValue();
                str3 = str3 + "," + entry.getKey();
            }
        }
        logger.debug(this.handlerPhase, "partitionNames={} partitionValues={} hdfsFileName={}", new Object[]{str3, str4, str2});
        ActionEvent actionEvent2 = new ActionEvent(actionEvent);
        HashMap hashMap = new HashMap(actionEvent.getHeaders());
        actionEvent2.setHeaders(hashMap);
        hashMap.put(WebHDFSWriterHandlerConstants.HIVE_PARTITION_NAMES, str3);
        hashMap.put(WebHDFSWriterHandlerConstants.HIVE_PARTITION_VALUES, str4);
        hashMap.put(WebHDFSWriterHandlerConstants.HDFS_FILE_NAME, str2);
        hashMap.put("recordCount", String.valueOf(journal.getRecordCount()));
        hashMap.put("completeHdfsPath", build + File.separator);
        hashMap.put(WebHDFSWriterHandlerConstants.HDFS_PATH, hdfsFilePathBuilder2.getBaseHdfsPath());
        hashMap.put(WebHDFSWriterHandlerConstants.HOST_NAMES, this.hostNames);
        hashMap.put(WebHDFSWriterHandlerConstants.PORT, String.valueOf(this.port));
        hashMap.put("user.name", this.hdfsUser);
        logger.debug(this.handlerPhase, "headers_from_hdfswriter={}", new Object[]{hashMap});
        return actionEvent2;
    }

    private void initializeRecordCountInJournal(ActionEvent actionEvent, WebHDFSWriterHandlerJournal webHDFSWriterHandlerJournal) throws RuntimeInfoStoreException {
        if (webHDFSWriterHandlerJournal.getRecordCount() < 0) {
            try {
                RuntimeInfo runtimeInfo = this.runtimeInfoStore.get(AdaptorConfig.getInstance().getName(), (String) actionEvent.getHeaders().get("entityName"), (String) actionEvent.getHeaders().get("inputDescriptor"));
                if (runtimeInfo == null || runtimeInfo.getProperties() == null || runtimeInfo.getProperties().get("recordCount") == null) {
                    webHDFSWriterHandlerJournal.setRecordCount(0);
                } else {
                    int intProperty = PropertyHelper.getIntProperty(runtimeInfo.getProperties().get("recordCount"));
                    logger.debug(this.handlerPhase, "read from runtime_store, recordCount={}", new Object[]{Integer.valueOf(intProperty)});
                    webHDFSWriterHandlerJournal.setRecordCount(intProperty);
                }
            } catch (IllegalArgumentException e) {
                webHDFSWriterHandlerJournal.setRecordCount(0);
            }
        }
    }

    protected String getHandlerPhase() {
        return this.handlerPhase;
    }

    public void handleException() {
    }

    public void handleExceptionCondition(ActionEvent actionEvent) {
        String str;
        logger.info(this.handlerPhase, "\"cleaning up file from previous run\" hdfsFileNamePrefix={} hdfsFileNameExtension={} channelDesc={} hdfsFileName={}", new Object[]{this.hdfsFileNamePrefix, this.hdfsFileNameExtension, this.channelDesc, this.hdfsFileName});
        String str2 = "";
        try {
            String str3 = (String) actionEvent.getHeaders().get(WebHDFSWriterHandlerConstants.HDFS_PATH);
            String str4 = (String) actionEvent.getHeaders().get(WebHDFSWriterHandlerConstants.HIVE_PARTITION_VALUES);
            logger.info(this.handlerPhase, "\"cleaning up file from previous run\" hdfsBasePath={} hivePartitionValues={}", new Object[]{str3, str4});
            String str5 = "";
            if (StringUtils.isNotBlank(str4)) {
                String[] split = str4.split(",");
                StringBuilder sb = new StringBuilder();
                for (String str6 : split) {
                    sb.append(str6.trim() + "/");
                }
                str5 = sb.toString();
                str = str3 + str5 + this.hdfsFileName;
            } else {
                str = str3 + this.hdfsFileName;
            }
            String str7 = "backup/" + AdaptorConfig.getInstance().getAdaptorContext().getAdaptorName() + "/" + str5 + "/";
            str2 = str3.substring(11) + str7;
            String str8 = str3 + str7;
            if (this.webHdfs == null) {
                this.webHdfs = WebHdfs.getInstance(this.hostNames, this.port).addHeader("Content-Type", "application/octet-stream").addParameter("user.name", this.hdfsUser);
            }
            WebHdfsWriter webHdfsWriter = new WebHdfsWriter();
            logger.info(this.handlerPhase, "\"creating directory\" toPath={}", new Object[]{str2});
            webHdfsWriter.createDirectory(this.webHdfs, str8);
            logger.info(this.handlerPhase, "\"moving file\" fromPath={} toPath={}", new Object[]{str, str2});
            moveErrorRecordCountFile(str, str2 + this.hdfsFileName + "-" + System.currentTimeMillis());
            this.webHdfs = null;
        } catch (IOException e) {
            logger.warn(this.handlerPhase, "Exception occurs, Failed to move to provided location: toPath={}", new Object[]{str2, e});
        }
    }

    private void moveErrorRecordCountFile(String str, String str2) throws IOException {
        this.webHdfs.addParameter("destination", str2);
        HttpResponse rename = this.webHdfs.rename(str);
        if (rename.getStatusLine().getStatusCode() == 200 || rename.getStatusLine().getStatusCode() == 201) {
            logger.debug(this.handlerPhase, "\"file moved successfully\"", new Object[]{"responseCode={} dest={} responseMessage={}", Integer.valueOf(rename.getStatusLine().getStatusCode()), str2, rename.getStatusLine().getReasonPhrase()});
        } else if (rename.getStatusLine().getStatusCode() == 404) {
            logger.debug(this.handlerPhase, "file does not exist", new Object[]{"responseCode={} dest={} responseMessage={}", Integer.valueOf(rename.getStatusLine().getStatusCode()), str2, rename.getStatusLine().getReasonPhrase()});
        } else {
            logger.warn(this.handlerPhase, "file existence not known, responseCode={} dest={} responseMessage={}", new Object[]{Integer.valueOf(rename.getStatusLine().getStatusCode()), str2, rename.getStatusLine().getReasonPhrase()});
        }
        this.webHdfs.releaseConnection();
    }
}
