Skip to content

Commit

Permalink
for #1172, refactor SQLExecutionHook
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent 2d67276 commit 74c7bae
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolea
DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
for (List<Object> each : parameterSets) {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData, isTrunkThread);
// TODO remove after BED removed
shardingEventBus.post(SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData));
}
try {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread);
T result = executeSQL(statementExecuteUnit);
for (List<Object> each : parameterSets) {
sqlExecutionHook.finishSuccess();
Expand All @@ -92,8 +92,8 @@ private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolea
}
return result;
} catch (final SQLException ex) {
sqlExecutionHook.finishFailure(ex);
for (List<Object> each : parameterSets) {
sqlExecutionHook.finishFailure(ex);
// TODO remove after BED removed
SQLExecutionEvent finishEvent = SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData);
finishEvent.setExecuteFailure(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package io.shardingsphere.core.spi.executor;

import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.routing.RouteUnit;

import java.util.List;
import java.util.ServiceLoader;

/**
Expand All @@ -32,9 +32,9 @@ public final class SPISQLExecutionHook implements SQLExecutionHook {
private static final ServiceLoader<SQLExecutionHook> SERVICE_LOADER = ServiceLoader.load(SQLExecutionHook.class);

@Override
public synchronized void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
public synchronized void start(final RouteUnit routeUnit, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
for (SQLExecutionHook each : SERVICE_LOADER) {
each.start(dataSourceName, sql, parameters, dataSourceMetaData, isTrunkThread);
each.start(routeUnit, dataSourceMetaData, isTrunkThread);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package io.shardingsphere.core.spi.executor;

import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;

import java.util.List;
import io.shardingsphere.core.routing.RouteUnit;

/**
* SQL Execution hook.
Expand All @@ -31,13 +30,11 @@ public interface SQLExecutionHook {
/**
* Handle when SQL execution started.
*
* @param dataSourceName data source name
* @param sql SQL to be executed
* @param parameters parameters of SQL
* @param routeUnit route unit to be executed
* @param dataSourceMetaData data source meta data
* @param isTrunkThread is execution in trunk thread
*/
void start(String dataSourceName, String sql, List<Object> parameters, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread);
void start(RouteUnit routeUnit, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread);

/**
* Handle when SQL execution finished success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.routing.RouteUnit;
import io.shardingsphere.core.spi.executor.SQLExecutionHook;
import io.shardingsphere.opentracing.ShardingTracer;
import io.shardingsphere.opentracing.constant.ShardingTags;

import java.util.LinkedList;
import java.util.List;

/**
Expand All @@ -43,7 +45,7 @@ public final class OpenTracingSQLExecutionHook implements SQLExecutionHook {
private Span span;

@Override
public void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
public void start(final RouteUnit routeUnit, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
if (!isTrunkThread) {
activeSpan = ((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION)).activate();
}
Expand All @@ -53,12 +55,24 @@ public void start(final String dataSourceName, final String sql, final List<Obje
.withTag(Tags.PEER_HOSTNAME.getKey(), dataSourceMetaData.getHostName())
.withTag(Tags.PEER_PORT.getKey(), dataSourceMetaData.getPort())
.withTag(Tags.DB_TYPE.getKey(), "sql")
.withTag(Tags.DB_INSTANCE.getKey(), dataSourceName)
.withTag(Tags.DB_STATEMENT.getKey(), sql)
.withTag(ShardingTags.DB_BIND_VARIABLES.getKey(), parameters.isEmpty() ? "" : Joiner.on(",").join(parameters)).startManual();
.withTag(Tags.DB_INSTANCE.getKey(), routeUnit.getDataSourceName())
.withTag(Tags.DB_STATEMENT.getKey(), routeUnit.getSqlUnit().getSql())
.withTag(ShardingTags.DB_BIND_VARIABLES.getKey(), toString(routeUnit.getSqlUnit().getParameterSets())).startManual();

}

private String toString(final List<List<Object>> parameterSets) {
return parameterSets.isEmpty() ? "" : Joiner.on(", ").join(toStringList(parameterSets));
}

private List<String> toStringList(final List<List<Object>> parameterSets) {
List<String> parameterString = new LinkedList<>();
for (List<Object> each : parameterSets) {
parameterString.add(String.format("[%s]", Joiner.on(", ").join(each)));
}
return parameterString;
}

@Override
public void finishSuccess() {
if (null != span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.routing.RouteUnit;
import io.shardingsphere.core.routing.SQLUnit;
import io.shardingsphere.core.spi.executor.SPISQLExecutionHook;
import io.shardingsphere.core.spi.executor.SQLExecutionHook;
import io.shardingsphere.opentracing.constant.ShardingTags;
Expand All @@ -33,6 +35,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -71,7 +74,7 @@ public void assertExecuteSuccessForTrunkThread() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData, true);
sqlExecutionHook.start(createRouteUnit("success_ds", "SELECT * FROM success_tbl;", Collections.singletonList(Arrays.<Object>asList("1", 2))), dataSourceMetaData, true);
sqlExecutionHook.finishSuccess();
assertThat(getTracer().finishedSpans().size(), is(1));
MockSpan actual = getTracer().finishedSpans().get(0);
Expand All @@ -82,9 +85,9 @@ public void assertExecuteSuccessForTrunkThread() {
assertThat(actualTags.get(Tags.PEER_HOSTNAME.getKey()), CoreMatchers.<Object>is("localhost"));
assertThat(actualTags.get(Tags.PEER_PORT.getKey()), CoreMatchers.<Object>is(8888));
assertThat(actualTags.get(Tags.DB_TYPE.getKey()), CoreMatchers.<Object>is("sql"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("ds_test"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM XXX;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is("1,2"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("success_ds"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM success_tbl;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is("[1, 2]"));
verify(activeSpan, times(0)).deactivate();
}

Expand All @@ -93,7 +96,7 @@ public void assertExecuteSuccessForBranchThread() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData, false);
sqlExecutionHook.start(createRouteUnit("success_ds", "SELECT * FROM success_tbl;", Collections.singletonList(Arrays.<Object>asList("1", 2))), dataSourceMetaData, false);
sqlExecutionHook.finishSuccess();
assertThat(getTracer().finishedSpans().size(), is(1));
MockSpan actual = getTracer().finishedSpans().get(0);
Expand All @@ -104,9 +107,9 @@ public void assertExecuteSuccessForBranchThread() {
assertThat(actualTags.get(Tags.PEER_HOSTNAME.getKey()), CoreMatchers.<Object>is("localhost"));
assertThat(actualTags.get(Tags.PEER_PORT.getKey()), CoreMatchers.<Object>is(8888));
assertThat(actualTags.get(Tags.DB_TYPE.getKey()), CoreMatchers.<Object>is("sql"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("ds_test"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM XXX;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is("1,2"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("success_ds"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM success_tbl;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is("[1, 2]"));
verify(activeSpan).deactivate();
}

Expand All @@ -115,7 +118,7 @@ public void assertExecuteFailure() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Collections.emptyList(), dataSourceMetaData, true);
sqlExecutionHook.start(createRouteUnit("failure_ds", "SELECT * FROM failure_tbl;", Collections.<List<Object>>emptyList()), dataSourceMetaData, true);
sqlExecutionHook.finishFailure(new RuntimeException("SQL execution error"));
assertThat(getTracer().finishedSpans().size(), is(1));
MockSpan actual = getTracer().finishedSpans().get(0);
Expand All @@ -126,10 +129,15 @@ public void assertExecuteFailure() {
assertThat(actualTags.get(Tags.PEER_HOSTNAME.getKey()), CoreMatchers.<Object>is("localhost"));
assertThat(actualTags.get(Tags.PEER_PORT.getKey()), CoreMatchers.<Object>is(8888));
assertThat(actualTags.get(Tags.DB_TYPE.getKey()), CoreMatchers.<Object>is("sql"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("ds_test"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM XXX;"));
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("failure_ds"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM failure_tbl;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is(""));
assertSpanError(actual, RuntimeException.class, "SQL execution error");
verify(activeSpan, times(0)).deactivate();
}

private RouteUnit createRouteUnit(final String dataSourceName, final String sql, final List<List<Object>> parameterSets) {
SQLUnit sqlUnit = new SQLUnit(sql, parameterSets);
return new RouteUnit(dataSourceName, sqlUnit);
}
}

0 comments on commit 74c7bae

Please sign in to comment.