Skip to content

Commit

Permalink
fix: deleted ledgers are not removed from the cache (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Apr 30, 2024
1 parent 989b095 commit 4d02c27
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 26 deletions.
1 change: 0 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ RUN test -n "$TARBALL"

ENV WEB_PORT=4500
EXPOSE $WEB_PORT
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64

RUN apt-get update \
&& apt-get -y dist-upgrade \
Expand Down
48 changes: 48 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
version: '3'
services:
zk:
image: zookeeper:3.9.2
hostname: zk
ports:
- "2181:2181"
healthcheck:
test: ["CMD", "curl", "-sf", "http://localhost:8080/commands/stat"]
interval: 5s
timeout: 5s
retries: 20
restart: always

bookie-1:
image: apache/bookkeeper:4.16.5
hostname: bk-1
ports:
- "3181:3181"
- "8080:8080"
environment:
- BK_zkServers=zk:2181
- BK_metadataServiceUri=zk+null://zk:2181/ledgers
- BK_DATA_DIR=/data/bookkeeper
- BK_useHostNameAsBookieID=true
- BK_bookiePort=3181
- BK_httpServerEnabled=true
depends_on:
- "zk"
restart: always

bookie-1-insert:
image: apache/bookkeeper:4.16.5
depends_on:
- bookie-1
restart: on-failure
environment:
- BK_zkServers=zk:2181
- BK_metadataServiceUri=zk+null://zk:2181/ledgers
entrypoint: [ "bash", "-c", "source /opt/bookkeeper/scripts/common.sh && bin/bookkeeper shell simpletest -e 1 -a 1 -w 1" ]

bkvm:
ports:
- "4500:4500"
image: bkvm/bkvm:latest
environment:
BKVM_metadataServiceUri: zk://zk:2181/ledgers

6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<jersey.version>2.35</jersey.version>
<libs.checkstyle>3.1.1</libs.checkstyle>
<libs.spotbugs>4.1.4</libs.spotbugs>
<eclipselink.version>2.7.5</eclipselink.version>
<eclipselink.version>2.7.14</eclipselink.version>
<libs.spotbugsannotations>3.1.7</libs.spotbugsannotations>

<libs.jaxws.jaxb-api>2.3.1</libs.jaxws.jaxb-api>
Expand All @@ -70,7 +70,7 @@
<dependency-check-maven.version>6.1.6</dependency-check-maven.version>
<bc-fips.version>1.0.2.3</bc-fips.version>
<guava.version>33.1.0-jre</guava.version>
<protobuf-java.version>3.19.6</protobuf-java.version>
<protobuf-java.version>3.24.0</protobuf-java.version>
<kotlin-stdlib-jdk8.version>1.6.10</kotlin-stdlib-jdk8.version>
<kubernetes-client.version>6.5.0</kubernetes-client.version>
</properties>
Expand Down Expand Up @@ -107,7 +107,7 @@
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>org.eclipse.persistence.asm</artifactId>
<version>${eclipselink.version}</version>
<version>9.7.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.persistence</groupId>
Expand Down
3 changes: 1 addition & 2 deletions web/src/main/java/org/bkvm/auth/AuthManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.EnumUtils;
import org.bkvm.config.ConfigurationStore;

/**
Expand Down Expand Up @@ -57,7 +56,7 @@ public AuthManager(ConfigurationStore store) {
}

private boolean checkUserRole(String role) {
return EnumUtils.isValidEnum(UserRole.class, role);
return UserRole.Fields.isValidRole(role);
}

public boolean login(String username, String password) {
Expand Down
15 changes: 10 additions & 5 deletions web/src/main/java/org/bkvm/auth/UserRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
*/
package org.bkvm.auth;

import lombok.experimental.FieldNameConstants;
import java.util.Set;

@FieldNameConstants
public enum UserRole {
@FieldNameConstants.Include Admin,
@FieldNameConstants.Include User
public class UserRole {
public static class Fields {
public static final String Admin = "Admin";
public static final String User = "User";
private static final Set<String> ALL = Set.of(Admin, User);
public static boolean isValidRole(String role) {
return ALL.contains(role);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BookkeeperCluster ensureCluster(int clusterId, String metadataServiceUri,
conf.setProperty(p, properties.get(p));
}
} catch (IOException ex) {
LOG.log(Level.INFO, "Wrong configuration passed {0}", configuration);
LOG.log(Level.SEVERE, "Wrong configuration passed {0}", configuration);
}

LOG.log(Level.INFO, "Creating cluster {0}, at {1}", new Object[]{clusterId, metadataServiceUri});
Expand Down
18 changes: 12 additions & 6 deletions web/src/main/java/org/bkvm/bookkeeper/BookkeeperManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public enum RefreshStatus {
IDLE,
WORKING
}

private volatile long lastMetadataCacheRefresh;
private final ConcurrentHashMap<Integer, ClusterWideConfiguration> lastClusterWideConfiguration = new ConcurrentHashMap<>();
private final AtomicReference<RefreshStatus> refreshStatus = new AtomicReference<>(RefreshStatus.IDLE);
Expand Down Expand Up @@ -167,7 +168,7 @@ public static final class RefreshCacheWorkerStatus {
private final Map<Integer, ClusterWideConfiguration> lastClusterWideConfiguration;

public RefreshCacheWorkerStatus(RefreshStatus status, long lastMetadataCacheRefresh,
Map<Integer, ClusterWideConfiguration> lastClusterWideConfiguration) {
Map<Integer, ClusterWideConfiguration> lastClusterWideConfiguration) {
this.status = status;
this.lastMetadataCacheRefresh = lastMetadataCacheRefresh;
this.lastClusterWideConfiguration = lastClusterWideConfiguration;
Expand Down Expand Up @@ -281,15 +282,15 @@ public void doRefreshMetadataCache() {
metadataCache.deleteBookie(b.getClusterId(), b.getBookieId());
}
}
Set<Long> deletedLedgers = new HashSet<>(metadataCache.listLedgers(clusterId));

Iterable<Long> ledgersIds = bkAdmin.listLedgers();
for (long ledgerId : ledgersIds) {
LedgerMetadata ledgerMetadata = readLedgerMetadata(ledgerId, clusterId);
if (ledgerMetadata == null) {
// ledger disappeared
metadataCache.deleteLedger(clusterId, ledgerId);
return;
continue;
}
deletedLedgers.remove(ledgerId);
Ledger ledger = new Ledger(ledgerId, clusterId,
ledgerMetadata.getLength(),
new java.sql.Timestamp(ledgerMetadata.getCtime()),
Expand All @@ -308,6 +309,10 @@ public void doRefreshMetadataCache() {
LOG.log(Level.FINE, "Updating ledger {0} metadata", ledgerId);
metadataCache.updateLedger(ledger, bookies, metadataEntries);
}
for (Long deletedLedger : deletedLedgers) {
metadataCache.deleteLedger(clusterId, deletedLedger);
}

}
topologyCache.refreshBookiesTopology();

Expand Down Expand Up @@ -528,7 +533,8 @@ private static ClusterWideConfiguration getClusterWideConfiguration(int clusterI
autoRecoveryEnabled = underreplicationManager.isLedgerReplicationEnabled();
lostBookieRecoveryDelay = underreplicationManager.getLostBookieRecoveryDelay();
}
} catch (ReplicationException.UnavailableException | ReplicationException.CompatibilityException notConfigured) {
} catch (ReplicationException.UnavailableException
| ReplicationException.CompatibilityException notConfigured) {
// auto replication stuff never initialized
LOG.log(Level.INFO, "Cannot get auditor info: {0}", notConfigured + ""); // do not write stacktrace
}
Expand All @@ -542,7 +548,7 @@ private static ClusterWideConfiguration getClusterWideConfiguration(int clusterI
autoRecoveryEnabled, lostBookieRecoveryDelay,
layoutFormatVersion, layoutManagerFactoryClass, layoutManagerVersion);
} catch (InterruptedException
| MetadataException | IOException ex) {
| MetadataException | IOException ex) {
LOG.log(Level.SEVERE, "Error", ex);
throw new BookkeeperManagerException(ex);
} finally {
Expand Down
12 changes: 10 additions & 2 deletions web/src/main/java/org/bkvm/cache/MetadataCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,21 @@ public List<Ledger> listLedgers() {
}
}

public List<Long> listLedgers(int clusterId) {
try (EntityManagerWrapper emw = getEntityManager()) {
EntityManager em = emw.em;
Query q = em.createQuery("select l.ledgerId from ledger l where l.clusterId=" + clusterId, Ledger.class);
return q.getResultList();
}
}

public void updateLedger(Ledger ledger, List<LedgerBookie> bookies,
List<LedgerMetadataEntry> metadataEntries) {
try (EntityManagerWrapper emw = getEntityManager()) {
EntityManager em = emw.em;
em.getTransaction().begin();
long ledgerId = ledger.getLedgerId();
innerDeleteLedger(ledger.getClusterId(), ledger.getLedgerId(), em);
innerDeleteLedger(ledger.getClusterId(), ledgerId, em);
em.persist(ledger);
bookies.forEach((lb) -> {
if (ledgerId != lb.getLedgerId()) {
Expand Down Expand Up @@ -243,7 +251,7 @@ public void deleteLedger(int clusterId, long ledgerId) {
}
}

private void innerDeleteLedger(int clusterId, long ledgerId, EntityManager em) {
private static void innerDeleteLedger(int clusterId, long ledgerId, EntityManager em) {
em.createQuery("DELETE FROM ledger_metadata lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate();
em.createQuery("DELETE FROM ledger_bookie lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate();
em.createQuery("DELETE FROM ledger lm where lm.ledgerId=" + ledgerId + " and lm.clusterId=" + clusterId).executeUpdate();
Expand Down
67 changes: 67 additions & 0 deletions web/src/test/java/org/bkvm/cache/LoadMetadataCacheTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Licensed to Diennea S.r.l. under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. Diennea S.r.l. licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.bkvm.cache;

import static org.junit.Assert.assertEquals;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.bkvm.bookkeeper.BookkeeperManager;
import org.bkvm.utils.BookkeeperManagerTestUtils;
import org.junit.Test;


public class LoadMetadataCacheTest extends BookkeeperManagerTestUtils {

@Test
public void testLoad() throws Exception {
startBookie(false, -1);
ClientConfiguration bkConf = new ClientConfiguration();
bkConf.setMetadataServiceUri(getMetadataServiceUri());

BookKeeper bk = BookKeeper.forConfig(bkConf).build();
WriteHandle wr0 = createLedger(bk);
WriteHandle wr = createLedger(bk);
wr.append("test".getBytes());
final BookkeeperManager bookkeeperManager = getBookkeeperManager();
bookkeeperManager.doRefreshMetadataCache();
assertEquals(2, bookkeeperManager.getAllLedgers().size());

bk.newDeleteLedgerOp()
.withLedgerId(wr.getId())
.execute().get();
bookkeeperManager.doRefreshMetadataCache();
assertEquals(1, bookkeeperManager.getAllLedgers().size());
}

private static WriteHandle createLedger(BookKeeper bk) throws InterruptedException, ExecutionException {
WriteHandle wr = bk.newCreateLedgerOp()
.withAckQuorumSize(1)
.withEnsembleSize(1)
.withWriteQuorumSize(1)
.withPassword("p".getBytes())
.withCustomMetadata(Map.of("meta1", "value1".getBytes()))
.execute().get();
return wr;
}

}
11 changes: 5 additions & 6 deletions web/src/test/java/org/bkvm/utils/BookkeeperManagerTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.bkvm.cache.MetadataCache;
import org.bkvm.config.ConfigurationStore;
import org.bkvm.config.PropertiesConfigurationStore;
import org.bkvm.config.ServerConfiguration;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -55,17 +54,17 @@ public void beforeSetup() throws Exception {
datasource = new HerdDBEmbeddedDataSource();
datasource.setUrl("jdbc:herddb:local");
metadataCache = new MetadataCache(datasource);
final Properties properties = new Properties();
properties.put(ServerConfiguration.PROPERTY_BOOKKEEPER_METADATA_SERVICE_URI,
"zk+null://" + getZooKeeperAddress() + "/ledgers");

ConfigurationStore config = new PropertiesConfigurationStore(new Properties());
bookkeeperManager = new BookkeeperManager(config, metadataCache);
init();
}

protected void init() throws Exception {
createCluster("cluster1", "zk+null://" + getZooKeeperAddress() + "/ledgers", "");
createCluster("cluster1", getMetadataServiceUri(), "");
}

protected String getMetadataServiceUri() {
return "zk+null://" + getZooKeeperAddress() + "/ledgers";
}

protected Cluster createCluster(String name, String metadataServiceUri, String configuration) throws BookkeeperManagerException {
Expand Down

0 comments on commit 4d02c27

Please sign in to comment.