Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat:[ts-5837]commit timely #29832

Open
wants to merge 8 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1355,10 +1355,11 @@ typedef struct {
int8_t encryptAlgorithm;
char dnodeListStr[TSDB_DNODE_LIST_LEN];
// 1. add auto-compact parameters
int32_t compactInterval; // minutes
int32_t compactStartTime; // minutes
int32_t compactEndTime; // minutes
int8_t compactTimeOffset; // hour
int32_t compactInterval; // minutes
int32_t compactStartTime; // minutes
int32_t compactEndTime; // minutes
int8_t compactTimeOffset; // hour
int32_t flushInterval;
} SCreateDbReq;

int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
Expand Down Expand Up @@ -1395,6 +1396,7 @@ typedef struct {
int32_t compactStartTime;
int32_t compactEndTime;
int8_t compactTimeOffset;
int32_t flushInterval;
} SAlterDbReq;

int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
Expand Down Expand Up @@ -1541,6 +1543,7 @@ typedef struct {
int8_t schemaless;
int16_t sstTrigger;
int8_t withArbitrator;
int32_t flushInterval;
} SDbCfgRsp;

typedef SDbCfgRsp SDbCfgInfo;
Expand Down Expand Up @@ -1952,10 +1955,11 @@ void tFreeSConfigRsp(SConfigRsp* pRsp);

typedef struct {
int32_t reserved;
char db[TSDB_DB_FNAME_LEN];
} SMTimerReq;

int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
// int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);

typedef struct SOrphanTask {
int64_t streamId;
Expand Down
1 change: 1 addition & 0 deletions include/common/tmsgdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMMIT_DB_TIMER, "commit-db-tmr", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)

TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
Expand Down
1 change: 1 addition & 0 deletions include/libs/nodes/cmdnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ typedef struct SDatabaseOptions {
SValueNode* pCompactTimeOffsetNode;
SValueNode* pCompactIntervalNode;
SNodeList* pCompactTimeRangeList;
int32_t flushInterval;
// for cache
SDbCfgInfo* pDbCfg;
} SDatabaseOptions;
Expand Down
1 change: 1 addition & 0 deletions include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_COMPACT_TIME_OFFSET 0
#define TSDB_MAX_COMPACT_TIME_OFFSET 23
#define TSDB_DEFAULT_COMPACT_TIME_OFFSET 0
#define TSDB_DEFAULT_FLUSH_INTERVAL 0

#define TSDB_MIN_EXPLAIN_RATIO 0
#define TSDB_MAX_EXPLAIN_RATIO 1
Expand Down
40 changes: 40 additions & 0 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -4058,6 +4058,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactStartTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactEndTime));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->compactTimeOffset));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->flushInterval));

tEndEncode(&encoder);

Expand Down Expand Up @@ -4164,6 +4165,12 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
pReq->compactTimeOffset = TSDB_DEFAULT_COMPACT_TIME_OFFSET;
}

if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->flushInterval));
} else {
pReq->flushInterval = TSDB_DEFAULT_FLUSH_INTERVAL;
}

tEndDecode(&decoder);

_exit:
Expand Down Expand Up @@ -4219,6 +4226,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactStartTime));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pReq->compactEndTime));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->compactTimeOffset));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->flushInterval));
tEndEncode(&encoder);

_exit:
Expand Down Expand Up @@ -4299,6 +4307,13 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
pReq->compactEndTime = TSDB_DEFAULT_COMPACT_END_TIME;
pReq->compactTimeOffset = TSDB_DEFAULT_COMPACT_TIME_OFFSET;
}

if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->flushInterval));
} else {
pReq->flushInterval = TSDB_DEFAULT_FLUSH_INTERVAL;
}

tEndDecode(&decoder);

_exit:
Expand Down Expand Up @@ -5436,6 +5451,7 @@ int32_t tSerializeSDbCfgRspImpl(SEncoder *encoder, const SDbCfgRsp *pRsp) {
TAOS_CHECK_RETURN(tEncodeI32v(encoder, pRsp->compactStartTime));
TAOS_CHECK_RETURN(tEncodeI32v(encoder, pRsp->compactEndTime));
TAOS_CHECK_RETURN(tEncodeI8(encoder, pRsp->compactTimeOffset));
TAOS_CHECK_RETURN(tEncodeI32(encoder, pRsp->flushInterval));

return 0;
}
Expand Down Expand Up @@ -5546,6 +5562,12 @@ int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) {
pRsp->compactTimeOffset = TSDB_DEFAULT_COMPACT_TIME_OFFSET;
}

if (!tDecodeIsEnd(decoder)) {
TAOS_CHECK_RETURN(tDecodeI32(decoder, &pRsp->flushInterval));
} else {
pRsp->flushInterval = TSDB_DEFAULT_FLUSH_INTERVAL;
}

return 0;
}

Expand Down Expand Up @@ -6649,6 +6671,7 @@ int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {

TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->reserved));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->db));
tEndEncode(&encoder);

_exit:
Expand All @@ -6661,6 +6684,23 @@ int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
return tlen;
}

int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pRsp){
SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen);

TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pRsp->reserved));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pRsp->db));

tEndDecode(&decoder);

_exit:
tDecoderClear(&decoder);
return code;
}

int32_t tSerializeDropOrphanTaskMsg(void *buf, int32_t bufLen, SMStreamDropOrphanMsg *pMsg) {
SEncoder encoder = {0};
int32_t code = 0;
Expand Down
1 change: 1 addition & 0 deletions source/common/src/systable.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "compact_interval", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "compact_time_range", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "compact_time_offset", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "flush_interval", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
};

static const SSysDbTableSchema userFuncSchema[] = {
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ typedef struct {
int32_t compactInterval; // minute
int32_t compactStartTime; // minute
int32_t compactEndTime; // minute
int32_t flushInterval;
} SDbCfg;

typedef struct {
Expand Down
18 changes: 17 additions & 1 deletion source/dnode/mnode/impl/src/mndDb.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "tjson.h"

#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE 14
#define DB_RESERVE_SIZE 10

static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
Expand Down Expand Up @@ -156,6 +156,7 @@ SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.compactStartTime, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.compactEndTime, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.compactInterval, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.flushInterval, _OVER)

SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
Expand Down Expand Up @@ -259,6 +260,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.compactStartTime, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.compactEndTime, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.compactInterval, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.flushInterval, _OVER)

SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pDb->lock);
Expand Down Expand Up @@ -380,6 +382,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->cfg.compactTimeOffset = pNew->cfg.compactTimeOffset;
pOld->compactStartTime = pNew->compactStartTime;
pOld->tsmaVersion = pNew->tsmaVersion;
pOld->cfg.flushInterval = pNew->cfg.flushInterval;
taosWUnLockLatch(&pOld->lock);
return 0;
}
Expand Down Expand Up @@ -510,6 +513,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
return code;
if (pCfg->compactStartTime != 0 && pCfg->compactEndTime != 0 && pCfg->compactStartTime >= pCfg->compactEndTime)
return code;
if (pCfg->flushInterval < 0) return code;
if (pCfg->compactTimeOffset < TSDB_MIN_COMPACT_TIME_OFFSET || pCfg->compactTimeOffset > TSDB_MAX_COMPACT_TIME_OFFSET)
return code;

Expand Down Expand Up @@ -593,6 +597,7 @@ static int32_t mndCheckInChangeDbCfg(SMnode *pMnode, SDbCfg *pOldCfg, SDbCfg *pN
if (pNewCfg->compactTimeOffset < TSDB_MIN_COMPACT_TIME_OFFSET ||
pNewCfg->compactTimeOffset > TSDB_MAX_COMPACT_TIME_OFFSET)
return code;
if (pNewCfg->flushInterval < 0) return code;

code = 0;
TAOS_RETURN(code);
Expand Down Expand Up @@ -847,6 +852,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.compactStartTime = pCreate->compactStartTime,
.compactEndTime = pCreate->compactEndTime,
.compactTimeOffset = pCreate->compactTimeOffset,
.flushInterval = pCreate->flushInterval,
};

dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
Expand Down Expand Up @@ -1206,6 +1212,12 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
code = 0;
}

if (pAlter->flushInterval >= TSDB_DEFAULT_FLUSH_INTERVAL && pAlter->flushInterval != pDb->cfg.flushInterval) {
pDb->cfg.flushInterval = pAlter->flushInterval;
pDb->vgVersion++;
code = 0;
}

TAOS_RETURN(code);
}

Expand Down Expand Up @@ -1446,6 +1458,7 @@ static void mndDumpDbCfgInfo(SDbCfgRsp *cfgRsp, SDbObj *pDb) {
cfgRsp->compactStartTime = pDb->cfg.compactStartTime;
cfgRsp->compactEndTime = pDb->cfg.compactEndTime;
cfgRsp->compactTimeOffset = pDb->cfg.compactTimeOffset;
cfgRsp->flushInterval = pDb->cfg.flushInterval;
}

static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
Expand Down Expand Up @@ -2603,6 +2616,9 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
if ((pColInfo = taosArrayGet(pBlock->pDataBlock, cols++))) {
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)durationVstr, false), &lino, _OVER);
}

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, rows, (const char *)&pDb->cfg.flushInterval, false), &lino, _OVER);
}
_OVER:
if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
Expand Down
48 changes: 44 additions & 4 deletions source/dnode/mnode/impl/src/mndMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
return pReq;
}

static void *mndBuildTimerMsgWithArg(int32_t *pContLen, char *db) {
terrno = 0;
SMTimerReq timerReq = {0};
tstrncpy(timerReq.db, db, TSDB_DB_FNAME_LEN);

int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
if (contLen <= 0) return NULL;
void *pReq = rpcMallocCont(contLen);
if (pReq == NULL) return NULL;

if (tSerializeSMTimerMsg(pReq, contLen, &timerReq) < 0) {
mError("failed to serialize timer msg since %s", terrstr());
}
*pContLen = contLen;
return pReq;
}

static void mndPullupTrans(SMnode *pMnode) {
mTrace("pullup trans msg");
int32_t contLen = 0;
Expand Down Expand Up @@ -125,7 +142,6 @@ static void mndPullupTtl(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
// TODO check return value
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
}
Expand All @@ -136,17 +152,27 @@ static void mndPullupTrimDb(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRIM_DB_TIMER, .pCont = pReq, .contLen = contLen};
// TODO check return value
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
}
}

static void mndPullupCommitDb(SMnode *pMnode, char *db) {
mTrace("pullup commit db");
int32_t contLen = 0;
int32_t code = 0;
void *pReq = mndBuildTimerMsgWithArg(&contLen, db);
mInfo("pull up commit db %s", db);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMMIT_DB_TIMER, .pCont = pReq, .contLen = contLen};
if ((code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)) < 0) {
mError("failed to put commit-db-timer into write-queue since %s, line:%d", tstrerror(code), __LINE__);
}
}

static void mndPullupS3MigrateDb(SMnode *pMnode) {
mTrace("pullup trim");
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
// TODO check return value
SRpcMsg rpcMsg = {.msgType = TDMT_MND_S3MIGRATE_DB_TIMER, .pCont = pReq, .contLen = contLen};
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
Expand Down Expand Up @@ -383,6 +409,20 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndPullupTrimDb(pMnode);
}

SDbObj *pDb = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
mDebug("db:%s, flush period:%d, sec:%" PRId64, pDb->name, pDb->cfg.flushInterval, sec);
if (pDb->cfg.flushInterval > 0) {
mDebug("db:%s, flush period:%d, sec:%" PRId64, pDb->name, pDb->cfg.flushInterval, sec);
if (sec % pDb->cfg.flushInterval == 0) {
mndPullupCommitDb(pMnode, pDb->name);
}
}
sdbRelease(pSdb, pDb);
}

if (tsS3MigrateEnabled && sec % tsS3MigrateIntervalSec == 0) {
mndPullupS3MigrateDb(pMnode);
}
Expand Down Expand Up @@ -897,7 +937,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER ||
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) {
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER || pMsg->msgType == TDMT_MND_COMMIT_DB_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state));
TAOS_RETURN(code);
Expand Down
Loading
Loading