Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/3.0/td 33165 #29826

Open
wants to merge 12 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/

#define _DEFAULT_SOURCE
#include "tmsg.h"
#include "tglobal.h"
#include "tmsg.h"

#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
Expand Down Expand Up @@ -9284,7 +9284,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
return code;
}

void tDestroySMqPollReq(SMqPollReq *pReq) {
void tDestroySMqPollReq(SMqPollReq *pReq) {
tOffsetDestroy(&pReq->reqOffset);
if (pReq->uidHash != NULL) {
taosHashCleanup(pReq->uidHash);
Expand Down Expand Up @@ -11461,7 +11461,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data = NULL;
uint32_t bLen = 0;
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t**)&data, &bLen));
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t **)&data, &bLen));
if (taosArrayPush(pRsp->blockData, &data) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
Expand Down Expand Up @@ -11526,7 +11526,7 @@ int32_t tDecodeMqRawDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
if (pRsp->blockDataElementFree){
if (pRsp->blockDataElementFree) {
taosArrayDestroyP(pRsp->blockData, NULL);
} else {
taosArrayDestroy(pRsp->blockData);
Expand Down Expand Up @@ -11604,8 +11604,8 @@ void tDeleteSTaosxRsp(SMqDataRsp *pRsp) {
void tDeleteMqRawDataRsp(SMqDataRsp *pRsp) {
tOffsetDestroy(&pRsp->reqOffset);
tOffsetDestroy(&pRsp->rspOffset);
if (pRsp->rawData != NULL){
taosMemoryFree(POINTER_SHIFT(pRsp->rawData, - sizeof(SMqRspHead)));
if (pRsp->rawData != NULL) {
taosMemoryFree(POINTER_SHIFT(pRsp->rawData, -sizeof(SMqRspHead)));
}
}

Expand Down Expand Up @@ -11695,24 +11695,24 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq
_exit:
return code;
}
int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver){
int32_t code = 0;
int32_t lino = 0;
int32_t transformRawSSubmitTbData(void *data, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
int32_t lino = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
tDecoderInit(&decoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t *)data);

int32_t flags = 0;
TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &flags));
flags |= TD_REQ_FROM_TAOX;
flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;

SEncoder encoder = {0};
tEncoderInit(&encoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t*)data);
tEncoderInit(&encoder, (uint8_t *)POINTER_SHIFT(data, INT_BYTES), *(uint32_t *)data);
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, flags));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, suid));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, uid));
TAOS_CHECK_EXIT(tEncodeI32v(&encoder, sver));
_exit:
_exit:
return code;
}

Expand All @@ -11726,7 +11726,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
TAOS_CHECK_EXIT(tEncodeI32v(pCoder, flags));

// auto create table
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if ((pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
if (!(pSubmitTbData->pCreateTbReq)) {
return TSDB_CODE_INVALID_MSG;
}
Expand Down Expand Up @@ -11762,25 +11762,25 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
return code;
}

static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, void* rawData) {
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, void *rawData) {
int32_t code = 0;
int32_t lino;
int32_t flags;
uint8_t version;

uint8_t* dataAfterCreate = NULL;
uint8_t* dataStart = pCoder->data + pCoder->pos;
uint32_t posAfterCreate = 0;
uint8_t *dataAfterCreate = NULL;
uint8_t *dataStart = pCoder->data + pCoder->pos;
uint32_t posAfterCreate = 0;

TAOS_CHECK_EXIT(tStartDecode(pCoder));
uint32_t pos = pCoder->pos;
uint32_t pos = pCoder->pos;
TAOS_CHECK_EXIT(tDecodeI32v(pCoder, &flags));
uint32_t flagsLen = pCoder->pos - pos;

pSubmitTbData->flags = flags & 0xff;
version = (flags >> 8) & 0xff;

if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if ((pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pSubmitTbData->pCreateTbReq == NULL) {
TAOS_CHECK_EXIT(terrno);
Expand Down Expand Up @@ -11833,13 +11833,13 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pSubmitTbData->ctimeMs));
}

if (rawData != NULL){
if (dataAfterCreate != NULL){
if (rawData != NULL) {
if (dataAfterCreate != NULL) {
TAOS_MEMCPY(dataAfterCreate - INT_BYTES - flagsLen, dataStart, INT_BYTES + flagsLen);
*(int32_t*)(dataAfterCreate - INT_BYTES - flagsLen) = pCoder->pos - posAfterCreate + flagsLen;
*(void**)rawData = dataAfterCreate - INT_BYTES - flagsLen;
}else{
*(void**)rawData = dataStart;
*(int32_t *)(dataAfterCreate - INT_BYTES - flagsLen) = pCoder->pos - posAfterCreate + flagsLen;
*(void **)rawData = dataAfterCreate - INT_BYTES - flagsLen;
} else {
*(void **)rawData = dataStart;
}
}
tEndDecode(pCoder);
Expand All @@ -11854,18 +11854,21 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {

TAOS_CHECK_EXIT(tStartEncode(pCoder));
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)));
if (pReq->raw){
if (pReq->raw) {
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
void* data = taosArrayGetP(pReq->aSubmitTbData, i);
if (pCoder->data != NULL){
TAOS_MEMCPY(pCoder->data + pCoder->pos, data, *(uint32_t*)data + INT_BYTES);

void *data = taosArrayGetP(pReq->aSubmitTbData, i);
if (pCoder->data != NULL) {
TAOS_MEMCPY(pCoder->data + pCoder->pos, data, *(uint32_t *)data + INT_BYTES);
}
pCoder->pos += *(uint32_t*)data + INT_BYTES;
pCoder->pos += *(uint32_t *)data + INT_BYTES;
}
} else{
} else {
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)));
SSubmitTbData *pSubmitTbData = taosArrayGet(pReq->aSubmitTbData, i);
if ((pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) && pSubmitTbData->pCreateTbReq == NULL) {
pSubmitTbData->flags = 0;
}
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, pSubmitTbData));
}
}

Expand All @@ -11874,7 +11877,7 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
return code;
}

int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray *rawList) {
int32_t code = 0;

memset(pReq, 0, sizeof(*pReq));
Expand All @@ -11898,7 +11901,7 @@ int32_t tDecodeSubmitReq(SDecoder *pCoder, SSubmitReq2 *pReq, SArray* rawList) {
}

for (uint64_t i = 0; i < nSubmitTbData; i++) {
SSubmitTbData* data = taosArrayReserve(pReq->aSubmitTbData, 1);
SSubmitTbData *data = taosArrayReserve(pReq->aSubmitTbData, 1);
if (tDecodeSSubmitTbData(pCoder, data, rawList != NULL ? taosArrayReserve(rawList, 1) : NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
Expand Down Expand Up @@ -11963,7 +11966,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {
if (pReq->aSubmitTbData == NULL) return;

if (!pReq->raw){
if (!pReq->raw) {
int32_t nSubmitTbData = TARRAY_SIZE(pReq->aSubmitTbData);
SSubmitTbData *aSubmitTbData = (SSubmitTbData *)TARRAY_DATA(pReq->aSubmitTbData);

Expand Down
9 changes: 6 additions & 3 deletions source/dnode/vnode/src/vnd/vnodeSvr.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}

extern int64_t tsMaxKeyByPrecision[];

static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
int32_t code = 0;
int32_t lino = 0;
Expand Down Expand Up @@ -374,6 +375,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
}
return code;
}

static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
Expand Down Expand Up @@ -1476,16 +1478,17 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
vAlterTbRsp.pMeta = &vMetaRsp;
}

if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
if (uid == 0) {
vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
vAlterTbReq.tbName);
goto _exit;
}

SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
void* p = taosArrayPush(tbUids, &uid);
SArray *tbUids = taosArrayInit(4, sizeof(int64_t));
void *p = taosArrayPush(tbUids, &uid);
TSDB_CHECK_NULL(p, code, lino, _exit, terrno);

vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);
Expand Down
22 changes: 16 additions & 6 deletions source/libs/parser/inc/parUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ extern "C" {

#include "catalog.h"
#include "os.h"
#include "parser.h"
#include "parToken.h"
#include "parser.h"
#include "query.h"

#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
Expand Down Expand Up @@ -56,6 +56,15 @@ extern "C" {
pSql += index; \
} while (0)

#define NEXT_TOKEN_WITH_PREV_VALUES(pSql, token) \
do { \
int32_t index = 0; \
token = tStrGetToken(pSql, &index, true, NULL); \
if (token.type != TK_VALUES) { \
pSql += index; \
} \
} while (0)

#define NEXT_TOKEN_WITH_PREV_EXT(pSql, token, pIgnoreComma) \
do { \
int32_t index = 0; \
Expand Down Expand Up @@ -130,13 +139,13 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
int32_t getTableTypeFromTableNode(SNode *pTable);
int32_t getTableTypeFromTableNode(SNode* pTable);

int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
int32_t getVnodeSysTableTargetName(int32_t acctId, SNode* pWhere, SName* pName);
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type);
int32_t parseTagValue(SMsgBuf* pMsgBuf, const char** pSql, uint8_t precision, SSchema* pTagSchema, SToken* pToken,
SArray* pTagName, SArray* pTagVals, STag** pTag, timezone_t tz, void *charsetCxt);
SArray* pTagName, SArray* pTagVals, STag** pTag, timezone_t tz, void* charsetCxt);
int32_t parseTbnameToken(SMsgBuf* pMsgBuf, char* tname, SToken* pToken, bool* pFoundCtbName);

int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq);
Expand All @@ -153,8 +162,8 @@ int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCac
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
int32_t reserveViewUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable, AUTH_TYPE type,
SParseMetaCache* pMetaCache);
int32_t reserveViewUserAuthInCache(int32_t acctId, const char* pUser, const char* pDb, const char* pTable,
AUTH_TYPE type, SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t reserveTableCfgInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
Expand All @@ -177,7 +186,8 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableCfg** pOutput);
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
int32_t createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint, SNode** ppSelect);
int32_t createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint,
SNode** ppSelect);
int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas);
int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, STableTSMAInfo** pTsma);

Expand Down
Loading
Loading