Skip to content

Commit e02b1da

Browse files
morningmanYour Name
authored and
Your Name
committed
[fix](auditlog) add missing audit log fields and duplicate audit log error (#42262)
### What problem does this PR solve? Problem Summary: #### Issue 1 There are some fields that is missing in audit log table. This PR add them all: - shuffle_send_rows - shuffle_send_bytes - scan_bytes_from_local_storage - scan_bytes_from_remote_storage - is_nereids - compute_group Notice that `compute_group` is previously name `cloudClusterName` in fe.audit.log, which is incorrect, so I change it to the right name. After this PR, all these fields will be saved in both audit log table and fe.audit.log #### Issue 2 The `AuditEventBuilder` need to be reset at each run, the there will be duplicate audit log. #### Issue 3 Add a new statement `call flush_audit_log()`. It will flush the audit log immediately to audit_log table. This is useful in test case, so that we don't need to wait 1min to flush the audit log data. ### Release note [fix](auditlog) add missing audit log fields and duplicate audit log error
1 parent 757b0ea commit e02b1da

File tree

11 files changed

+165
-25
lines changed

11 files changed

+165
-25
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java

+17
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,23 @@ public class InternalSchema {
139139
AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
140140
AUDIT_SCHEMA
141141
.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
142+
AUDIT_SCHEMA
143+
.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT),
144+
ColumnNullableType.NULLABLE));
145+
AUDIT_SCHEMA
146+
.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT),
147+
ColumnNullableType.NULLABLE));
148+
AUDIT_SCHEMA
149+
.add(new ColumnDef("scan_bytes_from_local_storage", TypeDef.create(PrimitiveType.BIGINT),
150+
ColumnNullableType.NULLABLE));
151+
AUDIT_SCHEMA
152+
.add(new ColumnDef("scan_bytes_from_remote_storage", TypeDef.create(PrimitiveType.BIGINT),
153+
ColumnNullableType.NULLABLE));
142154
AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
143155
AUDIT_SCHEMA.add(new ColumnDef("stmt_type", TypeDef.createVarchar(48), ColumnNullableType.NULLABLE));
144156
AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
157+
AUDIT_SCHEMA.add(
158+
new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
145159
AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
146160
AUDIT_SCHEMA
147161
.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
@@ -151,6 +165,9 @@ public class InternalSchema {
151165
new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
152166
AUDIT_SCHEMA.add(
153167
new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
168+
AUDIT_SCHEMA.add(
169+
new ColumnDef("compute_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
170+
// Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string
154171
AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
155172
}
156173

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java

+13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.nereids.trees.plans.commands;
1919

20+
import org.apache.doris.analysis.RedirectStatus;
2021
import org.apache.doris.analysis.StmtType;
2122
import org.apache.doris.nereids.analyzer.UnboundFunction;
2223
import org.apache.doris.nereids.trees.plans.PlanType;
@@ -63,4 +64,16 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
6364
public StmtType stmtType() {
6465
return StmtType.CALL;
6566
}
67+
68+
@Override
69+
public RedirectStatus toRedirectStatus() {
70+
// Some of call statements may need to be redirected, some may not
71+
String funcName = unboundFunction.getName().toUpperCase();
72+
switch (funcName) {
73+
case "FLUSH_AUDIT_LOG":
74+
return RedirectStatus.NO_FORWARD;
75+
default:
76+
return RedirectStatus.FORWARD_WITH_SYNC;
77+
}
78+
}
6679
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.nereids.trees.plans.commands.call;
19+
20+
import org.apache.doris.analysis.UserIdentity;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.mysql.privilege.PrivPredicate;
23+
import org.apache.doris.nereids.exceptions.AnalysisException;
24+
import org.apache.doris.nereids.trees.expressions.Expression;
25+
26+
import java.util.List;
27+
28+
/**
29+
* call flush_audit_log()
30+
* This will flush audit log immediately to the audit_log table.
31+
* Mainly for test cases, so that we don't need to wait 60 sec to flush the audit log.
32+
*/
33+
public class CallFlushAuditLogFunc extends CallFunc {
34+
35+
private UserIdentity user;
36+
37+
private CallFlushAuditLogFunc(UserIdentity user) {
38+
this.user = user;
39+
}
40+
41+
public static CallFunc create(UserIdentity user, List<Expression> args) {
42+
if (!args.isEmpty()) {
43+
throw new AnalysisException("FLUSH_AUDIT_LOG function requires no parameter");
44+
}
45+
return new CallFlushAuditLogFunc(user);
46+
}
47+
48+
@Override
49+
public void run() {
50+
// check priv
51+
if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user, PrivPredicate.ADMIN)) {
52+
throw new AnalysisException("Only admin can flush audit log");
53+
}
54+
// flush audit log
55+
Env.getCurrentEnv().getPluginMgr().flushAuditLog();
56+
}
57+
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public static CallFunc getFunc(ConnectContext ctx, UserIdentity user, UnboundFun
3636
// TODO, built-in functions require a separate management
3737
case "EXECUTE_STMT": // Call built-in functions first
3838
return CallExecuteStmtFunc.create(user, unboundFunction.getArguments());
39+
case "FLUSH_AUDIT_LOG":
40+
return CallFlushAuditLogFunc.create(user, unboundFunction.getArguments());
3941
default:
4042
return CallProcedure.create(ctx, originSql);
4143
}

fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public enum EventType {
8080
public String queryId = "";
8181
@AuditField(value = "IsQuery")
8282
public boolean isQuery = false;
83-
@AuditField(value = "isNereids")
83+
@AuditField(value = "IsNereids")
8484
public boolean isNereids = false;
85-
@AuditField(value = "feIp")
85+
@AuditField(value = "FeIp")
8686
public String feIp = "";
8787
@AuditField(value = "StmtType")
8888
public String stmtType = "";
@@ -96,22 +96,20 @@ public enum EventType {
9696
public long shuffleSendRows = -1;
9797
@AuditField(value = "SqlHash")
9898
public String sqlHash = "";
99-
@AuditField(value = "peakMemoryBytes")
99+
@AuditField(value = "PeakMemoryBytes")
100100
public long peakMemoryBytes = -1;
101101
@AuditField(value = "SqlDigest")
102102
public String sqlDigest = "";
103-
@AuditField(value = "cloudClusterName")
103+
@AuditField(value = "ComputeGroupName")
104104
public String cloudClusterName = "";
105-
@AuditField(value = "TraceId")
106-
public String traceId = "";
107105
@AuditField(value = "WorkloadGroup")
108106
public String workloadGroup = "";
109107
// note: newly added fields should be always before fuzzyVariables
110108
@AuditField(value = "FuzzyVariables")
111109
public String fuzzyVariables = "";
112-
@AuditField(value = "scanBytesFromLocalStorage")
110+
@AuditField(value = "ScanBytesFromLocalStorage")
113111
public long scanBytesFromLocalStorage = -1;
114-
@AuditField(value = "scanBytesFromRemoteStorage")
112+
@AuditField(value = "ScanBytesFromRemoteStorage")
115113
public long scanBytesFromRemoteStorage = -1;
116114

117115
public long pushToAuditLogQueueTime;

fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public class PluginMgr implements Writable {
6161
// all dynamic plugins should have unique names,
6262
private final Set<String> dynamicPluginNames;
6363

64+
// Save this handler for external call
65+
private AuditLoader auditLoader = null;
66+
6467
public PluginMgr() {
6568
plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE];
6669
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
@@ -113,8 +116,8 @@ private void initBuiltinPlugins() {
113116
}
114117

115118
// AuditLoader: log audit log to internal table
116-
AuditLoader auditLoaderPlugin = new AuditLoader();
117-
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
119+
this.auditLoader = new AuditLoader();
120+
if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) {
118121
LOG.warn("failed to register audit log builder");
119122
}
120123

@@ -363,6 +366,12 @@ public List<List<String>> getPluginShowInfos() {
363366
return rows;
364367
}
365368

369+
public void flushAuditLog() {
370+
if (auditLoader != null) {
371+
auditLoader.loadIfNecessary(true);
372+
}
373+
}
374+
366375
public void readFields(DataInputStream dis) throws IOException {
367376
int size = dis.readInt();
368377
for (int i = 0; i < size; i++) {

fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.apache.logging.log4j.Logger;
3636

3737
import java.io.IOException;
38-
import java.time.ZoneId;
39-
import java.time.format.DateTimeFormatter;
4038
import java.util.concurrent.BlockingQueue;
4139
import java.util.concurrent.TimeUnit;
4240

@@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements AuditPlugin {
4846

4947
public static final String AUDIT_LOG_TABLE = "audit_log";
5048

51-
private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
52-
.withZone(ZoneId.systemDefault());
53-
5449
private StringBuilder auditLogBuffer = new StringBuilder();
5550
private int auditLogNum = 0;
5651
private long lastLoadTimeAuditLog = 0;
@@ -90,7 +85,7 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
9085
// GlobalVariable.audit_plugin_max_batch_bytes.
9186
this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
9287
this.streamLoader = new AuditStreamLoader();
93-
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
88+
this.loadThread = new Thread(new LoadWorker(), "audit loader thread");
9489
this.loadThread.start();
9590

9691
isInit = true;
@@ -143,6 +138,7 @@ private void assembleAudit(AuditEvent event) {
143138
}
144139

145140
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
141+
// should be same order as InternalSchema.AUDIT_SCHEMA
146142
logBuffer.append(event.queryId).append("\t");
147143
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
148144
logBuffer.append(event.clientIp).append("\t");
@@ -156,15 +152,21 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
156152
logBuffer.append(event.scanBytes).append("\t");
157153
logBuffer.append(event.scanRows).append("\t");
158154
logBuffer.append(event.returnRows).append("\t");
155+
logBuffer.append(event.shuffleSendRows).append("\t");
156+
logBuffer.append(event.shuffleSendBytes).append("\t");
157+
logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
158+
logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
159159
logBuffer.append(event.stmtId).append("\t");
160160
logBuffer.append(event.stmtType).append("\t");
161161
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
162+
logBuffer.append(event.isNereids ? 1 : 0).append("\t");
162163
logBuffer.append(event.feIp).append("\t");
163164
logBuffer.append(event.cpuTimeMs).append("\t");
164165
logBuffer.append(event.sqlHash).append("\t");
165166
logBuffer.append(event.sqlDigest).append("\t");
166167
logBuffer.append(event.peakMemoryBytes).append("\t");
167168
logBuffer.append(event.workloadGroup).append("\t");
169+
logBuffer.append(event.cloudClusterName).append("\t");
168170
// already trim the query in org.apache.doris.qe.AuditLogHelper#logAuditLog
169171
String stmt = event.stmt;
170172
if (LOG.isDebugEnabled()) {
@@ -173,10 +175,12 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
173175
logBuffer.append(stmt).append("\n");
174176
}
175177

176-
private void loadIfNecessary(AuditStreamLoader loader) {
178+
// public for external call.
179+
// synchronized to avoid concurrent load.
180+
public synchronized void loadIfNecessary(boolean force) {
177181
long currentTime = System.currentTimeMillis();
178182

179-
if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
183+
if (force || auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
180184
|| currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
181185
// begin to load
182186
try {
@@ -189,7 +193,7 @@ private void loadIfNecessary(AuditStreamLoader loader) {
189193
discardLogNum += auditLogNum;
190194
return;
191195
}
192-
AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token);
196+
AuditStreamLoader.LoadResponse response = streamLoader.loadBatch(auditLogBuffer, token);
193197
if (LOG.isDebugEnabled()) {
194198
LOG.debug("audit loader response: {}", response);
195199
}
@@ -215,10 +219,8 @@ private void resetBatch(long currentTime) {
215219
}
216220

217221
private class LoadWorker implements Runnable {
218-
private AuditStreamLoader loader;
219222

220-
public LoadWorker(AuditStreamLoader loader) {
221-
this.loader = loader;
223+
public LoadWorker() {
222224
}
223225

224226
public void run() {
@@ -228,7 +230,7 @@ public void run() {
228230
if (event != null) {
229231
assembleAudit(event);
230232
// process all audit logs
231-
loadIfNecessary(loader);
233+
loadIfNecessary(false);
232234
}
233235
} catch (InterruptedException ie) {
234236
if (LOG.isDebugEnabled()) {
@@ -241,3 +243,4 @@ public void run() {
241243
}
242244
}
243245
}
246+

fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private void auditQueryLog(AuditEvent event) throws IllegalAccessException {
116116
if (af.value().equals("Time(ms)")) {
117117
queryTime = (long) f.get(event);
118118
}
119-
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
119+
sb.append("|").append(af.value()).append("=").append(f.get(event));
120120
}
121121

122122
String auditLog = sb.toString();

fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java

+2
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
186186
String cluster = Config.isCloudMode() ? cloudCluster : "";
187187

188188
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
189+
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
190+
auditEventBuilder.reset();
189191
auditEventBuilder
190192
.setTimestamp(ctx.getStartTime())
191193
.setClientIp(ctx.getClientIP())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !audit_log_schema --
3+
query_id varchar(48) Yes true \N
4+
time datetime(3) Yes true \N
5+
client_ip varchar(128) Yes true \N
6+
user varchar(128) Yes false \N NONE
7+
catalog varchar(128) Yes false \N NONE
8+
db varchar(128) Yes false \N NONE
9+
state varchar(128) Yes false \N NONE
10+
error_code int Yes false \N NONE
11+
error_message text Yes false \N NONE
12+
query_time bigint Yes false \N NONE
13+
scan_bytes bigint Yes false \N NONE
14+
scan_rows bigint Yes false \N NONE
15+
return_rows bigint Yes false \N NONE
16+
shuffle_send_rows bigint Yes false \N NONE
17+
shuffle_send_bytes bigint Yes false \N NONE
18+
scan_bytes_from_local_storage bigint Yes false \N NONE
19+
scan_bytes_from_remote_storage bigint Yes false \N NONE
20+
stmt_id bigint Yes false \N NONE
21+
stmt_type varchar(48) Yes false \N NONE
22+
is_query tinyint Yes false \N NONE
23+
is_nereids tinyint Yes false \N NONE
24+
frontend_ip varchar(128) Yes false \N NONE
25+
cpu_time_ms bigint Yes false \N NONE
26+
sql_hash varchar(128) Yes false \N NONE
27+
sql_digest varchar(128) Yes false \N NONE
28+
peak_memory_bytes bigint Yes false \N NONE
29+
workload_group text Yes false \N NONE
30+
compute_group text Yes false \N NONE
31+
stmt text Yes false \N NONE
32+

regression-test/suites/audit/test_audit_log_behavior.groovy

+8-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ suite("test_audit_log_behavior") {
1919
try {
2020
sql "set global enable_audit_plugin = true"
2121
sql "set global audit_plugin_max_sql_length = 58"
22-
sql "set global audit_plugin_max_batch_interval_sec = 1"
22+
// sql "set global audit_plugin_max_batch_interval_sec = 1"
2323
} catch (Exception e) {
2424
log.warn("skip this case, because " + e.getMessage())
2525
assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
@@ -71,6 +71,8 @@ suite("test_audit_log_behavior") {
7171
]
7272
]
7373

74+
qt_audit_log_schema """desc internal.__internal_schema.audit_log"""
75+
7476
for (def on : [true, false]) {
7577
sql "set enable_nereids_planner=${on}"
7678
sql "truncate table __internal_schema.audit_log"
@@ -80,6 +82,10 @@ suite("test_audit_log_behavior") {
8082
sql tuple2[0]
8183
}
8284

85+
// make sure audit event is created.
86+
// see WorkloadRuntimeStatusMgr.getQueryNeedAudit()
87+
Thread.sleep(6000)
88+
sql """call flush_audit_log()"""
8389
// check result
8490
for (int i = 0; i < cnt; i++) {
8591
def tuple2 = sqls.get(i)
@@ -96,6 +102,7 @@ suite("test_audit_log_behavior") {
96102
assertEquals(res[0][0].toString(), tuple2[1].toString())
97103
}
98104
}
105+
// do not turn off
99106
sql "set global enable_audit_plugin = false"
100107
sql "set global audit_plugin_max_sql_length = 4096"
101108
sql "set global audit_plugin_max_batch_interval_sec = 60"

0 commit comments

Comments
 (0)