package com.mongodb.flume;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/flume/MongoDBSink.class */
public class MongoDBSink extends EventSink.Base {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
    private final MongoClientURI uri;
    private MongoClient mongo;
    private DBCollection collection;

    public MongoDBSink(String str) {
        this.uri = new MongoClientURI(str);
    }

    public void open() {
        try {
            this.mongo = new MongoClient(this.uri);
            try {
                this.collection = this.mongo.getDB(this.uri.getDatabase()).getCollection(this.uri.getCollection());
            } catch (Exception e) {
                LOG.error("Connected to MongoDB but failed in acquiring collection.", e);
                throw new MongoException("Could not acquire specified collection.", e);
            }
        } catch (Exception e2) {
            LOG.error("Connecting to MongoDB failed.", e2);
            throw new MongoException("Failed to connect to MongoDB. ", e2);
        }
    }

    public void append(Event event) throws IOException {
        BasicDBObjectBuilder start = BasicDBObjectBuilder.start("timestamp", new Date(event.getTimestamp()));
        start.append("nanoseconds", Long.valueOf(event.getNanos()));
        start.append("hostname", event.getHost());
        start.append("priority", event.getPriority().name());
        start.append("message", new String(event.getBody()));
        start.append("metadata", new BasicDBObject(event.getAttrs()));
        this.collection.insert(new DBObject[]{start.get()});
    }

    public void close() throws IOException {
        this.mongo.close();
    }

    public static SinkFactory.SinkBuilder builder() {
        return new SinkFactory.SinkBuilder() { // from class: com.mongodb.flume.MongoDBSink.1
            public EventSink build(Context context, String... strArr) {
                Preconditions.checkArgument(strArr.length == 1, "usage: mongoDBSink(\"mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]\")\n ... See http://www.mongodb.org/display/DOCS/Connections for information on the MongoDB Connection URI Format.\n\t Note that using [?options] you can specify Write Concern related settings: \n\t\t safe={true|false} (default: false) Whether or not the driver should send getLastError to verify each write operation.\n\t\t w={n} (default: 0) Specify the number of servers to replicate a write to before returning success. When non-zero, implies safe=true.\n\t\t wtimeout={ms} (default: wait forever) The number of milliseconds to wait for W replications to complete.  When non-zero, implies safe=true.\n\t\t fsync={true|false} (default: false) When enabled, forces an fsync after each write operation to increase durability.  You probably *don't* want to do this; see the MongoDB docs for info.  When 'true', implies safe=true");
                return new MongoDBSink(strArr[0]);
            }
        };
    }

    public MongoClientURI getUri() {
        return this.uri;
    }

    public MongoClient getMongo() {
        return this.mongo;
    }

    public DBCollection getCollection() {
        return this.collection;
    }

    public static List<Pair<String, SinkFactory.SinkBuilder>> getSinkBuilders() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Pair("mongoDBSink", builder()));
        return arrayList;
    }
}
