Skip to content

Commit e85e6c8

Browse files
authored
[fix](auditlog) add missing audit log fields and duplicate audit log error #42262 (#43015)
bp #42262
1 parent df2bd6d commit e85e6c8

File tree

11 files changed

+142
-22
lines changed

11 files changed

+142
-22
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java

+4
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,18 @@ public class InternalSchema {
8080
AUDIT_SCHEMA.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
8181
AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
8282
AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
83+
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT), true));
84+
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
8385
AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
8486
AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
87+
AUDIT_SCHEMA.add(new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), true));
8588
AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
8689
AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
8790
AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
8891
AUDIT_SCHEMA.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
8992
AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
9093
AUDIT_SCHEMA.add(new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), true));
94+
// Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string
9195
AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
9296
}
9397

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java

+12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.nereids.trees.plans.commands;
1919

20+
import org.apache.doris.analysis.RedirectStatus;
2021
import org.apache.doris.nereids.analyzer.UnboundFunction;
2122
import org.apache.doris.nereids.trees.plans.PlanType;
2223
import org.apache.doris.nereids.trees.plans.commands.call.CallFunc;
@@ -58,4 +59,15 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
5859
return visitor.visitCallCommand(this, context);
5960
}
6061

62+
@Override
63+
public RedirectStatus toRedirectStatus() {
64+
// Some of call statements may need to be redirected, some may not
65+
String funcName = unboundFunction.getName().toUpperCase();
66+
switch (funcName) {
67+
case "FLUSH_AUDIT_LOG":
68+
return RedirectStatus.NO_FORWARD;
69+
default:
70+
return RedirectStatus.FORWARD_WITH_SYNC;
71+
}
72+
}
6173
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.nereids.trees.plans.commands.call;
19+
20+
import org.apache.doris.analysis.UserIdentity;
21+
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.mysql.privilege.PrivPredicate;
23+
import org.apache.doris.nereids.exceptions.AnalysisException;
24+
import org.apache.doris.nereids.trees.expressions.Expression;
25+
26+
import java.util.List;
27+
28+
/**
29+
* call flush_audit_log()
30+
* This will flush audit log immediately to the audit_log table.
31+
* Mainly for test cases, so that we don't need to wait 60 sec to flush the audit log.
32+
*/
33+
public class CallFlushAuditLogFunc extends CallFunc {
34+
35+
private UserIdentity user;
36+
37+
private CallFlushAuditLogFunc(UserIdentity user) {
38+
this.user = user;
39+
}
40+
41+
public static CallFunc create(UserIdentity user, List<Expression> args) {
42+
if (!args.isEmpty()) {
43+
throw new AnalysisException("FLUSH_AUDIT_LOG function requires no parameter");
44+
}
45+
return new CallFlushAuditLogFunc(user);
46+
}
47+
48+
@Override
49+
public void run() {
50+
// check priv
51+
if (!Env.getCurrentEnv().getAuth().checkGlobalPriv(user, PrivPredicate.ADMIN)) {
52+
throw new AnalysisException("Only admin can flush audit log");
53+
}
54+
// flush audit log
55+
Env.getCurrentEnv().getPluginMgr().flushAuditLog();
56+
}
57+
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public static CallFunc getFunc(ConnectContext ctx, UserIdentity user, UnboundFun
3636
// TODO, built-in functions require a separate management
3737
case "EXECUTE_STMT": // Call built-in functions first
3838
return CallExecuteStmtFunc.create(user, unboundFunction.getArguments());
39+
case "FLUSH_AUDIT_LOG":
40+
return CallFlushAuditLogFunc.create(user, unboundFunction.getArguments());
3941
default:
4042
return CallProcedure.create(ctx, originSql);
4143
}

fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public enum EventType {
8080
public String queryId = "";
8181
@AuditField(value = "IsQuery")
8282
public boolean isQuery = false;
83-
@AuditField(value = "isNereids")
83+
@AuditField(value = "IsNereids")
8484
public boolean isNereids = false;
85-
@AuditField(value = "feIp")
85+
@AuditField(value = "FeIp")
8686
public String feIp = "";
8787
@AuditField(value = "Stmt")
8888
public String stmt = "";
@@ -94,12 +94,10 @@ public enum EventType {
9494
public long shuffleSendRows = -1;
9595
@AuditField(value = "SqlHash")
9696
public String sqlHash = "";
97-
@AuditField(value = "peakMemoryBytes")
97+
@AuditField(value = "PeakMemoryBytes")
9898
public long peakMemoryBytes = -1;
9999
@AuditField(value = "SqlDigest")
100100
public String sqlDigest = "";
101-
@AuditField(value = "TraceId")
102-
public String traceId = "";
103101
@AuditField(value = "WorkloadGroup")
104102
public String workloadGroup = "";
105103
// note: newly added fields should be always before fuzzyVariables

fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public class PluginMgr implements Writable {
6161
// all dynamic plugins should have unique names,
6262
private final Set<String> dynamicPluginNames;
6363

64+
// Save this handler for external call
65+
private AuditLoader auditLoader = null;
66+
6467
public PluginMgr() {
6568
plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE];
6669
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
@@ -113,8 +116,8 @@ private void initBuiltinPlugins() {
113116
}
114117

115118
// AuditLoader: log audit log to internal table
116-
AuditLoader auditLoaderPlugin = new AuditLoader();
117-
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
119+
this.auditLoader = new AuditLoader();
120+
if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) {
118121
LOG.warn("failed to register audit log builder");
119122
}
120123

@@ -359,6 +362,12 @@ public List<List<String>> getPluginShowInfos() {
359362
return rows;
360363
}
361364

365+
public void flushAuditLog() {
366+
if (auditLoader != null) {
367+
auditLoader.loadIfNecessary(true);
368+
}
369+
}
370+
362371
public void readFields(DataInputStream dis) throws IOException {
363372
int size = dis.readInt();
364373
for (int i = 0; i < size; i++) {

fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.apache.logging.log4j.Logger;
3636

3737
import java.io.IOException;
38-
import java.time.ZoneId;
39-
import java.time.format.DateTimeFormatter;
4038
import java.util.concurrent.BlockingQueue;
4139
import java.util.concurrent.TimeUnit;
4240

@@ -48,9 +46,6 @@ public class AuditLoader extends Plugin implements AuditPlugin {
4846

4947
public static final String AUDIT_LOG_TABLE = "audit_log";
5048

51-
private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
52-
.withZone(ZoneId.systemDefault());
53-
5449
private StringBuilder auditLogBuffer = new StringBuilder();
5550
private int auditLogNum = 0;
5651
private long lastLoadTimeAuditLog = 0;
@@ -90,7 +85,7 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
9085
// GlobalVariable.audit_plugin_max_batch_bytes.
9186
this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
9287
this.streamLoader = new AuditStreamLoader();
93-
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
88+
this.loadThread = new Thread(new LoadWorker(), "audit loader thread");
9489
this.loadThread.start();
9590

9691
isInit = true;
@@ -143,6 +138,7 @@ private void assembleAudit(AuditEvent event) {
143138
}
144139

145140
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
141+
// should be same order as InternalSchema.AUDIT_SCHEMA
146142
logBuffer.append(event.queryId).append("\t");
147143
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
148144
logBuffer.append(event.clientIp).append("\t");
@@ -156,8 +152,11 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
156152
logBuffer.append(event.scanBytes).append("\t");
157153
logBuffer.append(event.scanRows).append("\t");
158154
logBuffer.append(event.returnRows).append("\t");
155+
logBuffer.append(event.shuffleSendRows).append("\t");
156+
logBuffer.append(event.shuffleSendBytes).append("\t");
159157
logBuffer.append(event.stmtId).append("\t");
160158
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
159+
logBuffer.append(event.isNereids ? 1 : 0).append("\t");
161160
logBuffer.append(event.feIp).append("\t");
162161
logBuffer.append(event.cpuTimeMs).append("\t");
163162
logBuffer.append(event.sqlHash).append("\t");
@@ -172,10 +171,12 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
172171
logBuffer.append(stmt).append("\n");
173172
}
174173

175-
private void loadIfNecessary(AuditStreamLoader loader) {
174+
// public for external call.
175+
// synchronized to avoid concurrent load.
176+
public synchronized void loadIfNecessary(boolean force) {
176177
long currentTime = System.currentTimeMillis();
177178

178-
if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
179+
if (force || auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
179180
|| currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
180181
// begin to load
181182
try {
@@ -188,7 +189,7 @@ private void loadIfNecessary(AuditStreamLoader loader) {
188189
discardLogNum += auditLogNum;
189190
return;
190191
}
191-
AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token);
192+
AuditStreamLoader.LoadResponse response = streamLoader.loadBatch(auditLogBuffer, token);
192193
if (LOG.isDebugEnabled()) {
193194
LOG.debug("audit loader response: {}", response);
194195
}
@@ -214,10 +215,8 @@ private void resetBatch(long currentTime) {
214215
}
215216

216217
private class LoadWorker implements Runnable {
217-
private AuditStreamLoader loader;
218218

219-
public LoadWorker(AuditStreamLoader loader) {
220-
this.loader = loader;
219+
public LoadWorker() {
221220
}
222221

223222
public void run() {
@@ -227,7 +226,7 @@ public void run() {
227226
if (event != null) {
228227
assembleAudit(event);
229228
// process all audit logs
230-
loadIfNecessary(loader);
229+
loadIfNecessary(false);
231230
}
232231
} catch (InterruptedException ie) {
233232
if (LOG.isDebugEnabled()) {
@@ -240,3 +239,4 @@ public void run() {
240239
}
241240
}
242241
}
242+

fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private void auditQueryLog(AuditEvent event) throws IllegalAccessException {
116116
if (af.value().equals("Time(ms)")) {
117117
queryTime = (long) f.get(event);
118118
}
119-
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
119+
sb.append("|").append(af.value()).append("=").append(f.get(event));
120120
}
121121

122122
String auditLog = sb.toString();

fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
174174
CatalogIf catalog = ctx.getCurrentCatalog();
175175

176176
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
177+
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance will be used in the next query.
177178
auditEventBuilder.reset();
178179
auditEventBuilder
179180
.setTimestamp(ctx.getStartTime())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !audit_log_schema --
3+
query_id varchar(48) Yes true \N
4+
time datetime(3) Yes true \N
5+
client_ip varchar(128) Yes true \N
6+
user varchar(128) Yes false \N NONE
7+
catalog varchar(128) Yes false \N NONE
8+
db varchar(128) Yes false \N NONE
9+
state varchar(128) Yes false \N NONE
10+
error_code int Yes false \N NONE
11+
error_message text Yes false \N NONE
12+
query_time bigint Yes false \N NONE
13+
scan_bytes bigint Yes false \N NONE
14+
scan_rows bigint Yes false \N NONE
15+
return_rows bigint Yes false \N NONE
16+
shuffle_send_rows bigint Yes false \N NONE
17+
shuffle_send_bytes bigint Yes false \N NONE
18+
stmt_id bigint Yes false \N NONE
19+
stmt_type varchar(48) Yes false \N NONE
20+
is_query tinyint Yes false \N NONE
21+
is_nereids tinyint Yes false \N NONE
22+
frontend_ip varchar(128) Yes false \N NONE
23+
cpu_time_ms bigint Yes false \N NONE
24+
sql_hash varchar(128) Yes false \N NONE
25+
sql_digest varchar(128) Yes false \N NONE
26+
peak_memory_bytes bigint Yes false \N NONE
27+
workload_group text Yes false \N NONE
28+
compute_group text Yes false \N NONE
29+
stmt text Yes false \N NONE
30+

regression-test/suites/audit/test_audit_log_behavior.groovy

+8-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ suite("test_audit_log_behavior") {
1919
try {
2020
sql "set global enable_audit_plugin = true"
2121
sql "set global audit_plugin_max_sql_length = 58"
22-
sql "set global audit_plugin_max_batch_interval_sec = 1"
22+
// sql "set global audit_plugin_max_batch_interval_sec = 1"
2323
} catch (Exception e) {
2424
log.warn("skip this case, because " + e.getMessage())
2525
assertTrue(e.getMessage().toUpperCase().contains("ADMIN"))
@@ -71,6 +71,8 @@ suite("test_audit_log_behavior") {
7171
]
7272
]
7373

74+
qt_audit_log_schema """desc internal.__internal_schema.audit_log"""
75+
7476
for (def on : [true, false]) {
7577
sql "set enable_nereids_planner=${on}"
7678
sql "truncate table __internal_schema.audit_log"
@@ -80,6 +82,10 @@ suite("test_audit_log_behavior") {
8082
sql tuple2[0]
8183
}
8284

85+
// make sure audit event is created.
86+
// see WorkloadRuntimeStatusMgr.getQueryNeedAudit()
87+
Thread.sleep(6000)
88+
sql """call flush_audit_log()"""
8389
// check result
8490
for (int i = 0; i < cnt; i++) {
8591
def tuple2 = sqls.get(i)
@@ -96,6 +102,7 @@ suite("test_audit_log_behavior") {
96102
assertEquals(res[0][0].toString(), tuple2[1].toString())
97103
}
98104
}
105+
// do not turn off
99106
sql "set global enable_audit_plugin = false"
100107
sql "set global audit_plugin_max_sql_length = 4096"
101108
sql "set global audit_plugin_max_batch_interval_sec = 60"

0 commit comments

Comments
 (0)