package org.apache.pulsar.bookie.rackawareness;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.class */
public class BookieRackAffinityMappingTest {
    private MetadataStore store;
    private BookieSocketAddress BOOKIE1 = null;
    private BookieSocketAddress BOOKIE2 = null;
    private BookieSocketAddress BOOKIE3 = null;
    private final ObjectMapper jsonMapper = ObjectMapperFactory.create();

    @BeforeMethod
    public void setUp() throws Exception {
        this.store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
        this.BOOKIE1 = new BookieSocketAddress("127.0.0.1:3181");
        this.BOOKIE2 = new BookieSocketAddress("127.0.0.2:3181");
        this.BOOKIE3 = new BookieSocketAddress("127.0.0.3:3181");
    }

    @AfterMethod(alwaysRun = true)
    void teardown() throws Exception {
        this.store.close();
    }

    @Test
    public void testBasic() throws Exception {
        this.store.put("/bookies", ("{\"group1\": {\"" + String.valueOf(this.BOOKIE1) + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + String.valueOf(this.BOOKIE2) + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}").getBytes(), Optional.empty()).join();
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
        List resolve = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()}));
        Assert.assertEquals((String) resolve.get(0), "/rack0");
        Assert.assertEquals((String) resolve.get(1), "/rack1");
        Assert.assertNull(resolve.get(2));
    }

    @Test
    public void testMultipleMetadataServiceUris() {
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("metadataServiceUri", "memory:local,memory:local");
        clientConfiguration.setProperty("zkTimeout", "100000");
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
    }

    @Test
    public void testInvalidRackName() {
        this.store.put("/bookies", ("{\"group1\": {\"" + String.valueOf(this.BOOKIE1) + "\": {\"rack\": \"/\", \"hostname\": \"bookie1.example.com\"}, \"" + String.valueOf(this.BOOKIE2) + "\": {\"rack\": \"\", \"hostname\": \"bookie2.example.com\"}}}").getBytes(), Optional.empty()).join();
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
        List resolve = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()}));
        Assert.assertNull(resolve.get(0));
        Assert.assertNull(resolve.get(1));
        Assert.assertNull(resolve.get(2));
    }

    @Test
    public void testNoBookieInfo() throws Exception {
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
        List resolve = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{"127.0.0.1", "127.0.0.2", "127.0.0.3"}));
        Assert.assertNull(resolve.get(0));
        Assert.assertNull(resolve.get(1));
        Assert.assertNull(resolve.get(2));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.BOOKIE1, BookieInfo.builder().rack("/rack0").build());
        hashMap2.put(this.BOOKIE2, BookieInfo.builder().rack("/rack1").build());
        hashMap.put("group1", hashMap2);
        this.store.put("/bookies", this.jsonMapper.writeValueAsBytes(hashMap), Optional.empty()).join();
        Awaitility.await().untilAsserted(() -> {
            List resolve2 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{"127.0.0.1", "127.0.0.2", "127.0.0.3"}));
            Assert.assertEquals((String) resolve2.get(0), "/rack0");
            Assert.assertEquals((String) resolve2.get(1), "/rack1");
            Assert.assertNull(resolve2.get(2));
        });
    }

    @Test
    public void testBookieInfoChange() throws Exception {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.BOOKIE1, BookieInfo.builder().rack("rack0").build());
        hashMap2.put(this.BOOKIE2, BookieInfo.builder().rack("rack1").build());
        hashMap.put("group1", hashMap2);
        this.store.put("/bookies", this.jsonMapper.writeValueAsBytes(hashMap), Optional.empty()).join();
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
        List resolve = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()}));
        Assert.assertEquals((String) resolve.get(0), "/rack0");
        Assert.assertEquals((String) resolve.get(1), "/rack1");
        Assert.assertNull(resolve.get(2));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(this.BOOKIE3, BookieInfo.builder().rack("rack0").build());
        hashMap.put("group2", hashMap3);
        this.store.put("/bookies", this.jsonMapper.writeValueAsBytes(hashMap), Optional.empty()).join();
        Awaitility.await().untilAsserted(() -> {
            List resolve2 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{"127.0.0.1", "127.0.0.2", "127.0.0.3"}));
            Assert.assertEquals((String) resolve2.get(0), "/rack0");
            Assert.assertEquals((String) resolve2.get(1), "/rack1");
            Assert.assertEquals((String) resolve2.get(2), "/rack0");
        });
        this.store.put("/bookies", "{}".getBytes(), Optional.empty()).join();
        Awaitility.await().untilAsserted(() -> {
            List resolve2 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{"127.0.0.1", "127.0.0.2", "127.0.0.3"}));
            Assert.assertNull(resolve2.get(0));
            Assert.assertNull(resolve2.get(1));
            Assert.assertNull(resolve2.get(2));
        });
    }

    @Test
    public void testWithPulsarRegistrationClient() throws Exception {
        this.store.put("/bookies", ("{\"group1\": {\"" + String.valueOf(this.BOOKIE1) + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + String.valueOf(this.BOOKIE2) + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}").getBytes(), Optional.empty()).join();
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        Field declaredField = BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
        declaredField.setAccessible(true);
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        PulsarRegistrationClient pulsarRegistrationClient = new PulsarRegistrationClient(this.store, "/ledgers");
        try {
            DefaultBookieAddressResolver defaultBookieAddressResolver = new DefaultBookieAddressResolver(pulsarRegistrationClient);
            bookieRackAffinityMapping.setBookieAddressResolver(defaultBookieAddressResolver);
            bookieRackAffinityMapping.setConf(clientConfiguration);
            Assert.assertEquals(bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()})).stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).toList().size(), 0);
            HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), clientConfiguration.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, clientConfiguration.getTimeoutTimerNumTicks());
            try {
                RackawareEnsemblePlacementPolicy rackawareEnsemblePlacementPolicy = new RackawareEnsemblePlacementPolicy();
                bookieRackAffinityMapping.registerRackChangeListener(rackawareEnsemblePlacementPolicy);
                Field declaredField2 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy").getDeclaredField("knownBookies");
                declaredField2.setAccessible(true);
                Map map = (Map) declaredField2.get(rackawareEnsemblePlacementPolicy);
                rackawareEnsemblePlacementPolicy.initialize(clientConfiguration, Optional.of(bookieRackAffinityMapping), hashedWheelTimer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, defaultBookieAddressResolver);
                Class<?> cls = Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
                Constructor<?> declaredConstructor = cls.getDeclaredConstructor(ClientConfiguration.class, EnsemblePlacementPolicy.class, RegistrationClient.class, BookieAddressResolver.class, StatsLogger.class);
                declaredConstructor.setAccessible(true);
                Object newInstance = declaredConstructor.newInstance(clientConfiguration, rackawareEnsemblePlacementPolicy, pulsarRegistrationClient, defaultBookieAddressResolver, NullStatsLogger.INSTANCE);
                Method declaredMethod = cls.getDeclaredMethod("initialBlockingBookieRead", new Class[0]);
                declaredMethod.setAccessible(true);
                declaredMethod.invoke(newInstance, new Object[0]);
                new HashSet().add(this.BOOKIE1.toBookieId());
                Field declaredField3 = BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
                declaredField3.setAccessible(true);
                BookieServiceInfoSerde bookieServiceInfoSerde = (BookieServiceInfoSerde) declaredField3.get(null);
                this.store.put("/ledgers/available/" + String.valueOf(this.BOOKIE1), bookieServiceInfoSerde.serialize("", BookieServiceInfoUtils.buildLegacyBookieServiceInfo(this.BOOKIE1.toString())), Optional.of(-1L)).get();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(((Map) ((BookiesRackConfiguration) declaredField.get(bookieRackAffinityMapping)).get("group1")).size() == 1);
                });
                List list = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()})).stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toList();
                Assert.assertEquals(list.size(), 1);
                Assert.assertEquals((String) list.get(0), "/rack0");
                Assert.assertEquals(map.size(), 1);
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE1.toBookieId())).getNetworkLocation(), "/rack0");
                this.store.put("/ledgers/available/" + String.valueOf(this.BOOKIE2), bookieServiceInfoSerde.serialize("", BookieServiceInfoUtils.buildLegacyBookieServiceInfo(this.BOOKIE2.toString())), Optional.of(-1L)).get();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(((Map) ((BookiesRackConfiguration) declaredField.get(bookieRackAffinityMapping)).get("group1")).size() == 2);
                });
                List list2 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()})).stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toList();
                Assert.assertEquals(list2.size(), 2);
                Assert.assertEquals((String) list2.get(0), "/rack0");
                Assert.assertEquals((String) list2.get(1), "/rack1");
                Assert.assertEquals(map.size(), 2);
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE1.toBookieId())).getNetworkLocation(), "/rack0");
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE2.toBookieId())).getNetworkLocation(), "/rack1");
                this.store.put("/ledgers/available/" + String.valueOf(this.BOOKIE3), bookieServiceInfoSerde.serialize("", BookieServiceInfoUtils.buildLegacyBookieServiceInfo(this.BOOKIE3.toString())), Optional.of(-1L)).get();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(((Map) ((BookiesRackConfiguration) declaredField.get(bookieRackAffinityMapping)).get("group1")).size() == 2);
                });
                List list3 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()})).stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toList();
                Assert.assertEquals(list3.size(), 2);
                Assert.assertEquals((String) list3.get(0), "/rack0");
                Assert.assertEquals((String) list3.get(1), "/rack1");
                Assert.assertEquals(map.size(), 3);
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE1.toBookieId())).getNetworkLocation(), "/rack0");
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE2.toBookieId())).getNetworkLocation(), "/rack1");
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE3.toBookieId())).getNetworkLocation(), "/default-rack");
                this.store.put("/bookies", ("{\"group1\": {\"" + String.valueOf(this.BOOKIE1) + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}").getBytes(), Optional.empty()).join();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(((Map) ((BookiesRackConfiguration) declaredField.get(bookieRackAffinityMapping)).get("group1")).size() == 1);
                });
                List list4 = bookieRackAffinityMapping.resolve(Lists.newArrayList(new String[]{this.BOOKIE1.getHostName(), this.BOOKIE2.getHostName(), this.BOOKIE3.getHostName()})).stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toList();
                Assert.assertEquals(list4.size(), 1);
                Assert.assertEquals((String) list4.get(0), "/rack0");
                Assert.assertEquals(map.size(), 3);
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE1.toBookieId())).getNetworkLocation(), "/rack0");
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE2.toBookieId())).getNetworkLocation(), "/default-rack");
                Assert.assertEquals(((BookieNode) map.get(this.BOOKIE3.toBookieId())).getNetworkLocation(), "/default-rack");
                if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                    hashedWheelTimer.stop();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                    hashedWheelTimer.stop();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(pulsarRegistrationClient).get(0) != null) {
                pulsarRegistrationClient.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testNoDeadlockWithRackawarePolicy() throws Exception {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setProperty("METADATA_STORE_INSTANCE", this.store);
        BookieRackAffinityMapping bookieRackAffinityMapping = new BookieRackAffinityMapping();
        bookieRackAffinityMapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
        bookieRackAffinityMapping.setConf(clientConfiguration);
        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), clientConfiguration.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, clientConfiguration.getTimeoutTimerNumTicks());
        try {
            RackawareEnsemblePlacementPolicy rackawareEnsemblePlacementPolicy = new RackawareEnsemblePlacementPolicy();
            rackawareEnsemblePlacementPolicy.initialize(clientConfiguration, Optional.of(bookieRackAffinityMapping), hashedWheelTimer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
            rackawareEnsemblePlacementPolicy.withDefaultRack("/default-region/default-rack");
            bookieRackAffinityMapping.registerRackChangeListener(rackawareEnsemblePlacementPolicy);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            try {
                ExecutorService newSingleThreadExecutor2 = Executors.newSingleThreadExecutor();
                try {
                    CountDownLatch countDownLatch = new CountDownLatch(2);
                    newSingleThreadExecutor.submit(() -> {
                        try {
                            Method declaredMethod = BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates", Notification.class);
                            declaredMethod.setAccessible(true);
                            Notification notification = new Notification(NotificationType.Modified, "/bookies");
                            long currentTimeMillis = System.currentTimeMillis();
                            while (System.currentTimeMillis() - currentTimeMillis < 2000) {
                                declaredMethod.invoke(bookieRackAffinityMapping, notification);
                            }
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                    newSingleThreadExecutor2.submit(() -> {
                        HashSet hashSet = new HashSet();
                        hashSet.add(this.BOOKIE1.toBookieId());
                        long currentTimeMillis = System.currentTimeMillis();
                        while (System.currentTimeMillis() - currentTimeMillis < 2000) {
                            rackawareEnsemblePlacementPolicy.onClusterChanged(hashSet, Collections.emptySet());
                            rackawareEnsemblePlacementPolicy.onClusterChanged(Collections.emptySet(), Collections.emptySet());
                        }
                        countDownLatch.countDown();
                    });
                    Assert.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
                    if (Collections.singletonList(newSingleThreadExecutor2).get(0) != null) {
                        newSingleThreadExecutor2.shutdownNow();
                    }
                    if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                        newSingleThreadExecutor.shutdownNow();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(newSingleThreadExecutor2).get(0) != null) {
                        newSingleThreadExecutor2.shutdownNow();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(newSingleThreadExecutor).get(0) != null) {
                    newSingleThreadExecutor.shutdownNow();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(hashedWheelTimer).get(0) != null) {
                hashedWheelTimer.stop();
            }
        }
    }
}
