Skip to content

Commit

Permalink
Merge pull request #1275 from hbs/tenant.safety
Browse files Browse the repository at this point in the history
Make calls to setRawAccess conditonal
  • Loading branch information
hbs authored Jul 26, 2023
2 parents 556b01f + f0e7cfd commit 09ea531
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 17 deletions.
45 changes: 34 additions & 11 deletions warp10/src/main/java/io/warp10/continuum/store/Directory.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceInstanceBuilder;
import org.apache.curator.x.discovery.ServiceType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -83,18 +89,11 @@
import org.slf4j.LoggerFactory;

import com.apple.foundationdb.Database;
import com.apple.foundationdb.FDB;
import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceInstanceBuilder;
import org.apache.curator.x.discovery.ServiceType;

import io.warp10.SmartPattern;
import io.warp10.ThriftUtils;
Expand All @@ -109,7 +108,6 @@
import io.warp10.continuum.store.thrift.data.DirectoryRequest;
import io.warp10.continuum.store.thrift.data.DirectoryStatsRequest;
import io.warp10.continuum.store.thrift.data.DirectoryStatsResponse;
import io.warp10.continuum.store.thrift.data.DirectoryRequest;
import io.warp10.continuum.store.thrift.data.Metadata;
import io.warp10.continuum.thrift.data.LoggingEvent;
import io.warp10.crypto.CryptoUtils;
Expand Down Expand Up @@ -284,6 +282,7 @@ public int compare(Long o1, Long o2) {

private final ReentrantLock metadatasLock = new ReentrantLock(true);

private RuntimeException initializationError = null;
private final AtomicBoolean cachePopulated = new AtomicBoolean(false);
private final AtomicBoolean fullyInitialized = new AtomicBoolean(false);

Expand Down Expand Up @@ -762,7 +761,17 @@ public void run() {

done = true;
} catch (Throwable t) {
FDBUtils.errorMetrics("directory", t.getCause());
if (t.getCause() instanceof FDBException) {
FDBException fdbe = (FDBException) t.getCause();
int code = fdbe.getCode();
// Tenant related errors are not recoverable
if (2130 == code || 2131 == code) {
self.initializationError = new RuntimeException("Error while accessing FoundationDB.", t);
done = true;
self.cachePopulated.set(true);
throw self.initializationError;
}
try {
db.close();
} catch (Exception e) {}
Expand Down Expand Up @@ -862,10 +871,14 @@ public void run() {
// Wait until directory is fully initialized
//

while(!self.fullyInitialized.get()) {
while(!self.fullyInitialized.get() && null == self.initializationError) {
LockSupport.parkNanos(1000000000L);
}

if (null != self.initializationError) {
return;
}

Sensision.set(SensisionConstants.SENSISION_CLASS_CONTINUUM_DIRECTORY_CLASSES, Sensision.EMPTY_LABELS, classNames.size());

//
Expand Down Expand Up @@ -1062,10 +1075,14 @@ public void run() {
// Wait for initialization to be done
//

while(!this.fullyInitialized.get()) {
while(!this.fullyInitialized.get() && null == initializationError) {
LockSupport.parkNanos(1000000000L);
}

if (null != initializationError) {
throw initializationError;
}

try {
server.start();
} catch (Exception e) {
Expand Down Expand Up @@ -1106,6 +1123,10 @@ public void run() {
LockSupport.parkNanos(1000000000L);
}

if (null != initializationError) {
throw initializationError;
}

//
// Let's call GC once after populating so we take the trash out.
//
Expand Down Expand Up @@ -1372,7 +1393,9 @@ private void flushMutations() throws IOException {
retry = false;
txn = db.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
txn.options().setRawAccess();
if (fdbContext.hasTenant()) {
txn.options().setRawAccess();
}

for (FDBMutation mutation: mutations) {
mutation.apply(txn);
Expand Down
4 changes: 3 additions & 1 deletion warp10/src/main/java/io/warp10/continuum/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ private void flushMutations() throws IOException {
retry = false;
txn = db.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
txn.options().setRawAccess();
if (store.fdbContext.hasTenant() || store.FDBUseTenantPrefix) {
txn.options().setRawAccess();
}
int sets = 0;
int clearranges = 0;

Expand Down
4 changes: 4 additions & 0 deletions warp10/src/main/java/io/warp10/fdb/FDBContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public byte[] getTenantPrefix() {
return this.tenantPrefix;
}

public boolean hasTenant() {
return null != this.tenantPrefix;
}

public byte[] getTenantName() {
return this.tenantName;
}
Expand Down
4 changes: 3 additions & 1 deletion warp10/src/main/java/io/warp10/fdb/FDBKVScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public boolean hasNext() {
if (null == txn) {
this.txn = db.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
this.txn.options().setRawAccess();
if (this.context.hasTenant() || null != scan.getTenantPrefix()) {
this.txn.options().setRawAccess();
}
this.txn.options().setCausalReadRisky();
this.txn.options().setReadYourWritesDisable();
this.txn.options().setSnapshotRywDisable();
Expand Down
3 changes: 2 additions & 1 deletion warp10/src/main/java/io/warp10/fdb/FDBUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static void errorMetrics(String component, Throwable t) {

public static byte[] getKey(Database db, byte[] key) {
Transaction txn = db.createTransaction();
// setRawAccess is called unconditionally because we do not know if a tenant was set or not
txn.options().setRawAccess();
txn.options().setAccessSystemKeys();

Expand All @@ -147,7 +148,6 @@ public static Map<String,Object> getTenantInfo(FDBContext context, String tenant
public static Map<String,Object> getTenantInfo(Database db, String tenant) {
Transaction txn = null;


int attempts = 2;
Map<String,Object> map = new LinkedHashMap<String,Object>();

Expand Down Expand Up @@ -198,6 +198,7 @@ public static Map<Object,Object> getStatus(FDBContext context) throws WarpScript
while(attempts > 0) {
try {
txn = db.createTransaction();
// setRawAccess is called unconditionally because we are accessing a system key without tenant
txn.options().setRawAccess();
txn.options().setAccessSystemKeys();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,9 @@ public void unregister(Metadata metadata) throws IOException {
retry = false;
txn = this.fdb.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
txn.options().setRawAccess();
if (fdbContext.hasTenant()) {
txn.options().setRawAccess();
}

FDBMutation delete = new FDBClear(this.fdbContext.getTenantPrefix(), bytes);
delete.apply(txn);
Expand Down Expand Up @@ -973,7 +975,9 @@ private void store(byte[] key, byte[] value) throws IOException {
if (!mutations.isEmpty()) {
txn = this.fdb.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
txn.options().setRawAccess();
if (fdbContext.hasTenant()) {
txn.options().setRawAccess();
}

for (FDBMutation mutation: mutations) {
mutation.apply(txn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ private void flushMutations() throws IOException {
retry = false;
txn = this.fdb.createTransaction();
// Allow RAW access because we may manually force a tenant key prefix without actually setting a tenant
txn.options().setRawAccess();
if (fdbContext.hasTenant() || FDBUseTenantPrefix) {
txn.options().setRawAccess();
}
int sets = 0;
int clearranges = 0;

Expand Down

0 comments on commit 09ea531

Please sign in to comment.