Skip to content

Commit

Permalink
[pinpoint-apm#11350] Add write option to AsyncBufferedMutatorBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 14, 2024
1 parent 9ddc23a commit 2cfc296
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 44 deletions.
5 changes: 5 additions & 0 deletions collector/src/main/resources/hbase-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ hbase.client.properties.hbase.client.retries.number=4
hbase.client.put-writer.concurrency-limit=100000
hbase.client.span-put-writer.concurrency-limit=0

# asyncBufferedMutator, asyncTable
hbase.client.put-writer=asyncBufferedMutator
hbase.client.put-writer.async-buffered-mutator.writeBufferSize=100
hbase.client.put-writer.async-buffered-mutator.writeBufferPeriodicFlush=100


# hbase async =================================================================
# enable hbase async operation. default: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;

public interface AsyncBufferedMutatorCustomizer {
void customize(AsyncBufferedMutatorBuilder builder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;

import java.util.concurrent.ExecutorService;

public interface AsyncBufferedMutatorFactory {

AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool);

AsyncBufferedMutator getBufferedMutator(TableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;

public interface AsyncTableCustomizer {
void customize(AsyncTableBuilder<?> builder);

void customize(AsyncBufferedMutatorBuilder builder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ScanResultConsumer;

Expand All @@ -38,8 +37,4 @@ public interface AsyncTableFactory {
*/
AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorService pool);


AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool);

AsyncBufferedMutator getBufferedMutator(TableName tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.CompletableFuture;

public class BatchAsyncHbasePutWriter implements HbasePutWriter {
private final AsyncTableFactory asyncTableFactory;
private final AsyncBufferedMutatorFactory asyncTableFactory;

public BatchAsyncHbasePutWriter(AsyncTableFactory asyncTableFactory) {
public BatchAsyncHbasePutWriter(AsyncBufferedMutatorFactory asyncTableFactory) {
this.asyncTableFactory = Objects.requireNonNull(asyncTableFactory, "asyncTableFactory");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;

import java.util.concurrent.TimeUnit;

public class DefaultAsyncBufferedMutatorCustomizer implements AsyncBufferedMutatorCustomizer {

private long writeBufferSize = 100;
private long writeBufferPeriodicFlush = 100;

public DefaultAsyncBufferedMutatorCustomizer() {
}

public void setWriteBufferSize(long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public void setWriteBufferPeriodicFlush(long writeBufferPeriodicFlush) {
this.writeBufferPeriodicFlush = writeBufferPeriodicFlush;
}

@Override
public void customize(AsyncBufferedMutatorBuilder builder) {
builder.setWriteBufferSize(writeBufferSize);
builder.setWriteBufferPeriodicFlush(writeBufferPeriodicFlush, TimeUnit.MILLISECONDS);
}

@Override
public String toString() {
return "DefaultAsyncBufferedMutatorCustomizer{" +
"writeBufferSize=" + writeBufferSize +
", writeBufferPeriodicFlush=" + writeBufferPeriodicFlush +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@

package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;

import java.util.concurrent.TimeUnit;

public class DefaultAsyncTableCustomizer implements AsyncTableCustomizer {

@Override
public void customize(AsyncTableBuilder<?> builder) {

public DefaultAsyncTableCustomizer() {
}

@Override
public void customize(AsyncBufferedMutatorBuilder builder) {
public void customize(AsyncTableBuilder<?> builder) {

builder.setWriteBufferPeriodicFlush(500, TimeUnit.MILLISECONDS);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.navercorp.pinpoint.common.hbase.async;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.springframework.cache.annotation.Cacheable;

import java.util.Objects;
import java.util.concurrent.ExecutorService;

public class HbaseAsyncBufferedMutatorFactory implements AsyncBufferedMutatorFactory {

private final AsyncConnection connection;
private final AsyncBufferedMutatorCustomizer customizer;

public HbaseAsyncBufferedMutatorFactory(AsyncConnection connection, AsyncBufferedMutatorCustomizer customizer) {
this.connection = Objects.requireNonNull(connection, "connection");
this.customizer = Objects.requireNonNull(customizer, "customizer");
}


@Override
@Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool);
customizer.customize(builder);
return builder.build();
}

@Override
@Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName);
customizer.customize(builder);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;
Expand Down Expand Up @@ -56,21 +54,4 @@ public AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorServ
return builder.build();
}


@Override
@Cacheable(cacheNames = "bufferedMutator-pool", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName, pool);
customizer.customize(builder);
return builder.build();
}

@Override
@Cacheable(cacheNames = "bufferedMutator", keyGenerator = "tableNameAndPoolKeyGenerator", cacheManager = "hbaseAsyncBufferedMutatorManager")
public AsyncBufferedMutator getBufferedMutator(TableName tableName) {
AsyncBufferedMutatorBuilder builder = connection.getBufferedMutatorBuilder(tableName);
customizer.customize(builder);
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import com.navercorp.pinpoint.common.hbase.HbaseTemplate;
import com.navercorp.pinpoint.common.hbase.HbaseVersionCheckBean;
import com.navercorp.pinpoint.common.hbase.TableFactory;
import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorCustomizer;
import com.navercorp.pinpoint.common.hbase.async.AsyncBufferedMutatorFactory;
import com.navercorp.pinpoint.common.hbase.async.AsyncHbasePutWriter;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer;
import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory;
import com.navercorp.pinpoint.common.hbase.async.BatchAsyncHbasePutWriter;
import com.navercorp.pinpoint.common.hbase.async.ConcurrencyDecorator;
import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncBufferedMutatorCustomizer;
import com.navercorp.pinpoint.common.hbase.async.DefaultAsyncTableCustomizer;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncBufferedMutatorFactory;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncCacheConfiguration;
import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTableFactory;
import com.navercorp.pinpoint.common.hbase.async.HbasePutWriter;
Expand Down Expand Up @@ -78,6 +82,19 @@ public AsyncTableFactory hbaseAsyncTableFactory(@Qualifier("hbaseAsyncConnection
return new HbaseAsyncTableFactory(connection, customizer);
}

@Bean
@ConfigurationProperties(prefix = "hbase.client.put-writer.async-buffered-mutator")
public AsyncBufferedMutatorCustomizer asyncBufferedMutatorCustomizer() {
return new DefaultAsyncBufferedMutatorCustomizer();
}

@Bean
public AsyncBufferedMutatorFactory hbaseAsyncBufferedMutatorFactory(@Qualifier("hbaseAsyncConnection") AsyncConnection connection,
AsyncBufferedMutatorCustomizer customizer) {
logger.info("AsyncBufferedMutatorCustomizer {}", customizer);
return new HbaseAsyncBufferedMutatorFactory(connection, customizer);
}


@Bean
@ConditionalOnProperty(name = "hbase.client.parallel.scan.enable", havingValue = "true")
Expand Down Expand Up @@ -136,7 +153,7 @@ public AsyncHbasePutWriterConfig() {
@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", putWriter);
return putWriter;
Expand All @@ -149,7 +166,7 @@ public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-w

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter putWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbaseSpanPutWriter {}", putWriter);
return putWriter;
Expand Down Expand Up @@ -179,8 +196,8 @@ public AsyncBufferedHbasePutWriterConfig() {

@Primary
@Bean
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
public HbasePutWriter hbasePutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory,
@Qualifier("concurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("hbasePutWriter {}", hbasePutWriter);
return hbasePutWriter;
Expand All @@ -192,8 +209,8 @@ public HbasePutWriterDecorator concurrencyDecorator(@Value("${hbase.client.put-w
}

@Bean
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncTableFactory") AsyncTableFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
public HbasePutWriter spanPutWriter(@Qualifier("hbaseAsyncBufferedMutatorFactory") AsyncBufferedMutatorFactory asyncTableFactory,
@Qualifier("spanConcurrencyDecorator") HbasePutWriterDecorator decorator) {
HbasePutWriter hbasePutWriter = newPutWriter(asyncTableFactory, decorator);
logger.info("HbaseSpanPutWriter {}", hbasePutWriter);
return hbasePutWriter;
Expand All @@ -204,7 +221,7 @@ public HbasePutWriterDecorator spanConcurrencyDecorator(@Value("${hbase.client.s
return new ConcurrencyDecorator(concurrency);
}

private HbasePutWriter newPutWriter(AsyncTableFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
private HbasePutWriter newPutWriter(AsyncBufferedMutatorFactory asyncTableFactory, HbasePutWriterDecorator decorator) {
HbasePutWriter writer = new BatchAsyncHbasePutWriter(asyncTableFactory);
HbasePutWriter putWriter = decorator.decorator(writer);
return new LoggingHbasePutWriter(putWriter);
Expand Down

0 comments on commit 2cfc296

Please sign in to comment.