From 4d02c2756f08b3e9850fe3cc3b53908785f2bb50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 30 Apr 2024 13:35:29 +0200 Subject: [PATCH] fix: deleted ledgers are not removed from the cache (#137) --- docker/Dockerfile | 1 - docker/docker-compose.yaml | 48 +++++++++++++ pom.xml | 6 +- .../main/java/org/bkvm/auth/AuthManager.java | 3 +- web/src/main/java/org/bkvm/auth/UserRole.java | 15 +++-- .../bookkeeper/BookkeeperClusterPool.java | 2 +- .../bkvm/bookkeeper/BookkeeperManager.java | 18 +++-- .../java/org/bkvm/cache/MetadataCache.java | 12 +++- .../org/bkvm/cache/LoadMetadataCacheTest.java | 67 +++++++++++++++++++ .../utils/BookkeeperManagerTestUtils.java | 11 ++- 10 files changed, 157 insertions(+), 26 deletions(-) create mode 100644 docker/docker-compose.yaml create mode 100644 web/src/test/java/org/bkvm/cache/LoadMetadataCacheTest.java diff --git a/docker/Dockerfile b/docker/Dockerfile index f4d90028..ad2cf413 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -7,7 +7,6 @@ RUN test -n "$TARBALL" ENV WEB_PORT=4500 EXPOSE $WEB_PORT -ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 RUN apt-get update \ && apt-get -y dist-upgrade \ diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml new file mode 100644 index 00000000..af61e5b2 --- /dev/null +++ b/docker/docker-compose.yaml @@ -0,0 +1,48 @@ +version: '3' +services: + zk: + image: zookeeper:3.9.2 + hostname: zk + ports: + - "2181:2181" + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8080/commands/stat"] + interval: 5s + timeout: 5s + retries: 20 + restart: always + + bookie-1: + image: apache/bookkeeper:4.16.5 + hostname: bk-1 + ports: + - "3181:3181" + - "8080:8080" + environment: + - BK_zkServers=zk:2181 + - BK_metadataServiceUri=zk+null://zk:2181/ledgers + - BK_DATA_DIR=/data/bookkeeper + - BK_useHostNameAsBookieID=true + - BK_bookiePort=3181 + - BK_httpServerEnabled=true + depends_on: + - "zk" + restart: always + + bookie-1-insert: + image: apache/bookkeeper:4.16.5 + depends_on: + - bookie-1 + restart: on-failure + environment: + - BK_zkServers=zk:2181 + - BK_metadataServiceUri=zk+null://zk:2181/ledgers + entrypoint: [ "bash", "-c", "source /opt/bookkeeper/scripts/common.sh && bin/bookkeeper shell simpletest -e 1 -a 1 -w 1" ] + + bkvm: + ports: + - "4500:4500" + image: bkvm/bkvm:latest + environment: + BKVM_metadataServiceUri: zk://zk:2181/ledgers + diff --git a/pom.xml b/pom.xml index aeef1374..31e60d57 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 2.35 3.1.1 4.1.4 - 2.7.5 + 2.7.14 3.1.7 2.3.1 @@ -70,7 +70,7 @@ 6.1.6 1.0.2.3 33.1.0-jre - 3.19.6 + 3.24.0 1.6.10 6.5.0 @@ -107,7 +107,7 @@ org.eclipse.persistence org.eclipse.persistence.asm - ${eclipselink.version} + 9.7.0 org.eclipse.persistence diff --git a/web/src/main/java/org/bkvm/auth/AuthManager.java b/web/src/main/java/org/bkvm/auth/AuthManager.java index 2661dfa9..fd4898ad 100644 --- a/web/src/main/java/org/bkvm/auth/AuthManager.java +++ b/web/src/main/java/org/bkvm/auth/AuthManager.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.commons.lang3.EnumUtils; import org.bkvm.config.ConfigurationStore; /** @@ -57,7 +56,7 @@ public AuthManager(ConfigurationStore store) { } private boolean checkUserRole(String role) { - return EnumUtils.isValidEnum(UserRole.class, role); + return UserRole.Fields.isValidRole(role); } public boolean login(String username, String password) { diff --git a/web/src/main/java/org/bkvm/auth/UserRole.java b/web/src/main/java/org/bkvm/auth/UserRole.java index b9db9fb3..7dc141b0 100644 --- a/web/src/main/java/org/bkvm/auth/UserRole.java +++ b/web/src/main/java/org/bkvm/auth/UserRole.java @@ -19,10 +19,15 @@ */ package org.bkvm.auth; -import lombok.experimental.FieldNameConstants; +import java.util.Set; -@FieldNameConstants -public enum UserRole { - @FieldNameConstants.Include Admin, - @FieldNameConstants.Include User +public class UserRole { + public static class Fields { + public static final String Admin = "Admin"; + public static final String User = "User"; + private static final Set ALL = Set.of(Admin, User); + public static boolean isValidRole(String role) { + return ALL.contains(role); + } + } } diff --git a/web/src/main/java/org/bkvm/bookkeeper/BookkeeperClusterPool.java b/web/src/main/java/org/bkvm/bookkeeper/BookkeeperClusterPool.java index a9f9ca2b..ba810c68 100644 --- a/web/src/main/java/org/bkvm/bookkeeper/BookkeeperClusterPool.java +++ b/web/src/main/java/org/bkvm/bookkeeper/BookkeeperClusterPool.java @@ -65,7 +65,7 @@ public BookkeeperCluster ensureCluster(int clusterId, String metadataServiceUri, conf.setProperty(p, properties.get(p)); } } catch (IOException ex) { - LOG.log(Level.INFO, "Wrong configuration passed {0}", configuration); + LOG.log(Level.SEVERE, "Wrong configuration passed {0}", configuration); } LOG.log(Level.INFO, "Creating cluster {0}, at {1}", new Object[]{clusterId, metadataServiceUri}); diff --git a/web/src/main/java/org/bkvm/bookkeeper/BookkeeperManager.java b/web/src/main/java/org/bkvm/bookkeeper/BookkeeperManager.java index b4911286..31b46a43 100644 --- a/web/src/main/java/org/bkvm/bookkeeper/BookkeeperManager.java +++ b/web/src/main/java/org/bkvm/bookkeeper/BookkeeperManager.java @@ -129,6 +129,7 @@ public enum RefreshStatus { IDLE, WORKING } + private volatile long lastMetadataCacheRefresh; private final ConcurrentHashMap lastClusterWideConfiguration = new ConcurrentHashMap<>(); private final AtomicReference refreshStatus = new AtomicReference<>(RefreshStatus.IDLE); @@ -167,7 +168,7 @@ public static final class RefreshCacheWorkerStatus { private final Map lastClusterWideConfiguration; public RefreshCacheWorkerStatus(RefreshStatus status, long lastMetadataCacheRefresh, - Map lastClusterWideConfiguration) { + Map lastClusterWideConfiguration) { this.status = status; this.lastMetadataCacheRefresh = lastMetadataCacheRefresh; this.lastClusterWideConfiguration = lastClusterWideConfiguration; @@ -281,15 +282,15 @@ public void doRefreshMetadataCache() { metadataCache.deleteBookie(b.getClusterId(), b.getBookieId()); } } + Set deletedLedgers = new HashSet<>(metadataCache.listLedgers(clusterId)); Iterable ledgersIds = bkAdmin.listLedgers(); for (long ledgerId : ledgersIds) { LedgerMetadata ledgerMetadata = readLedgerMetadata(ledgerId, clusterId); if (ledgerMetadata == null) { - // ledger disappeared - metadataCache.deleteLedger(clusterId, ledgerId); - return; + continue; } + deletedLedgers.remove(ledgerId); Ledger ledger = new Ledger(ledgerId, clusterId, ledgerMetadata.getLength(), new java.sql.Timestamp(ledgerMetadata.getCtime()), @@ -308,6 +309,10 @@ public void doRefreshMetadataCache() { LOG.log(Level.FINE, "Updating ledger {0} metadata", ledgerId); metadataCache.updateLedger(ledger, bookies, metadataEntries); } + for (Long deletedLedger : deletedLedgers) { + metadataCache.deleteLedger(clusterId, deletedLedger); + } + } topologyCache.refreshBookiesTopology(); @@ -528,7 +533,8 @@ private static ClusterWideConfiguration getClusterWideConfiguration(int clusterI autoRecoveryEnabled = underreplicationManager.isLedgerReplicationEnabled(); lostBookieRecoveryDelay = underreplicationManager.getLostBookieRecoveryDelay(); } - } catch (ReplicationException.UnavailableException | ReplicationException.CompatibilityException notConfigured) { + } catch (ReplicationException.UnavailableException + | ReplicationException.CompatibilityException notConfigured) { // auto replication stuff never initialized LOG.log(Level.INFO, "Cannot get auditor info: {0}", notConfigured + ""); // do not write stacktrace } @@ -542,7 +548,7 @@ private static ClusterWideConfiguration getClusterWideConfiguration(int clusterI autoRecoveryEnabled, lostBookieRecoveryDelay, layoutFormatVersion, layoutManagerFactoryClass, layoutManagerVersion); } catch (InterruptedException - | MetadataException | IOException ex) { + | MetadataException | IOException ex) { LOG.log(Level.SEVERE, "Error", ex); throw new BookkeeperManagerException(ex); } finally { diff --git a/web/src/main/java/org/bkvm/cache/MetadataCache.java b/web/src/main/java/org/bkvm/cache/MetadataCache.java index 2026d99b..0479a58b 100644 --- a/web/src/main/java/org/bkvm/cache/MetadataCache.java +++ b/web/src/main/java/org/bkvm/cache/MetadataCache.java @@ -207,13 +207,21 @@ public List listLedgers() { } } + public List listLedgers(int clusterId) { + try (EntityManagerWrapper emw = getEntityManager()) { + EntityManager em = emw.em; + Query q = em.createQuery("select l.ledgerId from ledger l where l.clusterId=" + clusterId, Ledger.class); + return q.getResultList(); + } + } + public void updateLedger(Ledger ledger, List bookies, List metadataEntries) { try (EntityManagerWrapper emw = getEntityManager()) { EntityManager em = emw.em; em.getTransaction().begin(); long ledgerId = ledger.getLedgerId(); - innerDeleteLedger(ledger.getClusterId(), ledger.getLedgerId(), em); + innerDeleteLedger(ledger.getClusterId(), ledgerId, em); em.persist(ledger); bookies.forEach((lb) -> { if (ledgerId != lb.getLedgerId()) { @@ -243,7 +251,7 @@ public void deleteLedger(int clusterId, long ledgerId) { } } - private void innerDeleteLedger(int clusterId, long ledgerId, EntityManager em) { + private static void innerDeleteLedger(int clusterId, long ledgerId, EntityManager em) { em.createQuery("DELETE FROM ledger_metadata lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate(); em.createQuery("DELETE FROM ledger_bookie lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate(); em.createQuery("DELETE FROM ledger lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate(); diff --git a/web/src/test/java/org/bkvm/cache/LoadMetadataCacheTest.java b/web/src/test/java/org/bkvm/cache/LoadMetadataCacheTest.java new file mode 100644 index 00000000..839852f0 --- /dev/null +++ b/web/src/test/java/org/bkvm/cache/LoadMetadataCacheTest.java @@ -0,0 +1,67 @@ +/* + Licensed to Diennea S.r.l. under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. Diennea S.r.l. licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + */ +package org.bkvm.cache; + +import static org.junit.Assert.assertEquals; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.WriteHandle; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.bkvm.bookkeeper.BookkeeperManager; +import org.bkvm.utils.BookkeeperManagerTestUtils; +import org.junit.Test; + + +public class LoadMetadataCacheTest extends BookkeeperManagerTestUtils { + + @Test + public void testLoad() throws Exception { + startBookie(false, -1); + ClientConfiguration bkConf = new ClientConfiguration(); + bkConf.setMetadataServiceUri(getMetadataServiceUri()); + + BookKeeper bk = BookKeeper.forConfig(bkConf).build(); + WriteHandle wr0 = createLedger(bk); + WriteHandle wr = createLedger(bk); + wr.append("test".getBytes()); + final BookkeeperManager bookkeeperManager = getBookkeeperManager(); + bookkeeperManager.doRefreshMetadataCache(); + assertEquals(2, bookkeeperManager.getAllLedgers().size()); + + bk.newDeleteLedgerOp() + .withLedgerId(wr.getId()) + .execute().get(); + bookkeeperManager.doRefreshMetadataCache(); + assertEquals(1, bookkeeperManager.getAllLedgers().size()); + } + + private static WriteHandle createLedger(BookKeeper bk) throws InterruptedException, ExecutionException { + WriteHandle wr = bk.newCreateLedgerOp() + .withAckQuorumSize(1) + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withPassword("p".getBytes()) + .withCustomMetadata(Map.of("meta1", "value1".getBytes())) + .execute().get(); + return wr; + } + +} \ No newline at end of file diff --git a/web/src/test/java/org/bkvm/utils/BookkeeperManagerTestUtils.java b/web/src/test/java/org/bkvm/utils/BookkeeperManagerTestUtils.java index 59e06560..37a17f35 100644 --- a/web/src/test/java/org/bkvm/utils/BookkeeperManagerTestUtils.java +++ b/web/src/test/java/org/bkvm/utils/BookkeeperManagerTestUtils.java @@ -27,7 +27,6 @@ import org.bkvm.cache.MetadataCache; import org.bkvm.config.ConfigurationStore; import org.bkvm.config.PropertiesConfigurationStore; -import org.bkvm.config.ServerConfiguration; import org.junit.After; import org.junit.Before; @@ -55,17 +54,17 @@ public void beforeSetup() throws Exception { datasource = new HerdDBEmbeddedDataSource(); datasource.setUrl("jdbc:herddb:local"); metadataCache = new MetadataCache(datasource); - final Properties properties = new Properties(); - properties.put(ServerConfiguration.PROPERTY_BOOKKEEPER_METADATA_SERVICE_URI, - "zk+null://" + getZooKeeperAddress() + "/ledgers"); - ConfigurationStore config = new PropertiesConfigurationStore(new Properties()); bookkeeperManager = new BookkeeperManager(config, metadataCache); init(); } protected void init() throws Exception { - createCluster("cluster1", "zk+null://" + getZooKeeperAddress() + "/ledgers", ""); + createCluster("cluster1", getMetadataServiceUri(), ""); + } + + protected String getMetadataServiceUri() { + return "zk+null://" + getZooKeeperAddress() + "/ledgers"; } protected Cluster createCluster(String name, String metadataServiceUri, String configuration) throws BookkeeperManagerException {