package io.yggdrash.core.p2p;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import io.yggdrash.core.blockchain.BranchId;
import io.yggdrash.core.blockchain.Transaction;
import io.yggdrash.core.blockchain.TransactionImpl;
import io.yggdrash.proto.CommonProto;
import io.yggdrash.proto.Proto;
import io.yggdrash.proto.TransactionServiceGrpc;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/yggdrash/core/p2p/AbstractBlockChainHandler.class */
public abstract class AbstractBlockChainHandler<T> extends DiscoveryHandler<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractBlockChainHandler.class);
    private final TransactionServiceGrpc.TransactionServiceStub transactionAsyncStub;
    private final StreamObserver<CommonProto.Empty> emptyResponseStreamObserver;
    private StreamObserver<Proto.Transaction> broadcastTxRequestObserver;

    public AbstractBlockChainHandler(final ManagedChannel managedChannel, Peer peer) {
        super(managedChannel, peer);
        this.transactionAsyncStub = TransactionServiceGrpc.newStub(managedChannel);
        this.emptyResponseStreamObserver = new StreamObserver<CommonProto.Empty>() { // from class: io.yggdrash.core.p2p.AbstractBlockChainHandler.1
            public void onNext(CommonProto.Empty empty) {
                AbstractBlockChainHandler.log.debug("[PeerHandler] Empty Received");
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
                AbstractBlockChainHandler.log.debug("[PeerHandler] Broadcast Finished");
                managedChannel.shutdown();
            }
        };
    }

    @Override // io.yggdrash.core.p2p.DiscoveryHandler, io.yggdrash.core.p2p.BlockChainHandler
    public Future<List<Transaction>> syncTx(BranchId branchId) {
        CommonProto.SyncLimit build = CommonProto.SyncLimit.newBuilder().setBranch(ByteString.copyFrom(branchId.getBytes())).build();
        log.debug("Requesting sync tx: branchId={}", branchId);
        final CompletableFuture completableFuture = new CompletableFuture();
        this.transactionAsyncStub.syncTx(build, new StreamObserver<Proto.TransactionList>() { // from class: io.yggdrash.core.p2p.AbstractBlockChainHandler.2
            public void onNext(Proto.TransactionList transactionList) {
                List list = (List) transactionList.getTransactionsList().stream().map(TransactionImpl::new).collect(Collectors.toList());
                AbstractBlockChainHandler.log.debug("[PeerHandler] TransactionList(size={}) Received", Integer.valueOf(list.size()));
                completableFuture.complete(list);
            }

            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            public void onCompleted() {
                AbstractBlockChainHandler.log.debug("[PeerHandler] Sync Tx Finished");
                completableFuture.complete(Collections.emptyList());
            }
        });
        return completableFuture;
    }

    @Override // io.yggdrash.core.p2p.DiscoveryHandler, io.yggdrash.core.p2p.BlockChainHandler
    public void broadcastTx(Transaction transaction) {
        try {
            this.broadcastTxRequestObserver = ((TransactionServiceGrpc.TransactionServiceStub) this.transactionAsyncStub.withDeadlineAfter(3L, TimeUnit.SECONDS)).broadcastTx(this.emptyResponseStreamObserver);
            this.broadcastTxRequestObserver.onNext(transaction.getInstance());
            log.trace("Broadcasting tx={} to={}", transaction.getHash().toString(), getPeer().getYnodeUri());
        } catch (Exception e) {
            log.trace("BroadcastingTx() is failed. {}", e.getMessage());
        }
    }
}
