package org.huahinframework.core.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.huahinframework.core.Summarizer;
import org.huahinframework.core.io.Key;
import org.huahinframework.core.io.Record;
import org.huahinframework.core.io.Value;

/* loaded from: input_file:org/huahinframework/core/writer/CombineWriter.class */
public class CombineWriter implements Writer {
    private Record defaultRecord;
    private TaskInputOutputContext context;
    private Summarizer summarizer;
    private int cache;
    private Map<Key, List<Value>> buffer = new HashMap();
    private int flushCount = 0;

    public CombineWriter(Class<? extends Reducer<?, ?, ?, ?>> cls, int i) throws InstantiationException, IllegalAccessException {
        this.summarizer = (Summarizer) cls.newInstance();
        this.cache = i;
    }

    @Override // org.huahinframework.core.writer.Writer
    public void write(Record record) throws IOException, InterruptedException {
        if (record.isKeyEmpty()) {
            record.setKey(this.defaultRecord.getKey());
        } else if (record.getKey().isGroupingEmpty() && !record.getKey().isSortEmpty()) {
            record.getKey().setGrouping(this.defaultRecord.getKey().getGrouping());
        }
        if (record.isValueEmpty()) {
            record.setValue(this.defaultRecord.getValue());
        }
        this.flushCount++;
        if (this.flushCount > this.cache) {
            flush();
            this.flushCount = 0;
        }
        List<Value> list = this.buffer.get(record.getKey());
        if (list == null) {
            list = new ArrayList();
        }
        list.add(record.getValue());
        this.buffer.put(record.getKey(), list);
    }

    @Override // org.huahinframework.core.writer.Writer
    public void flush() throws IOException, InterruptedException {
        for (Map.Entry<Key, List<Value>> entry : this.buffer.entrySet()) {
            this.summarizer.combine(entry.getKey(), entry.getValue(), this.context);
            entry.getValue().clear();
        }
        this.buffer.clear();
    }

    @Override // org.huahinframework.core.writer.Writer
    public Record getDefaultRecord() {
        return this.defaultRecord;
    }

    @Override // org.huahinframework.core.writer.Writer
    public void setDefaultRecord(Record record) {
        this.defaultRecord = record;
    }

    @Override // org.huahinframework.core.writer.Writer
    public TaskInputOutputContext getContext() {
        return this.context;
    }

    @Override // org.huahinframework.core.writer.Writer
    public void setContext(TaskInputOutputContext taskInputOutputContext) {
        this.context = taskInputOutputContext;
    }
}
