diff --git a/fsw/cfe-core/src/inc/cfe_sb.h b/fsw/cfe-core/src/inc/cfe_sb.h index 88749ee8a..aa7862e37 100644 --- a/fsw/cfe-core/src/inc/cfe_sb.h +++ b/fsw/cfe-core/src/inc/cfe_sb.h @@ -177,7 +177,10 @@ typedef uint8 *CFE_SB_MsgPayloadPtr_t; ** ** Software Zero Copy handle used in many SB APIs */ -typedef cpuaddr CFE_SB_ZeroCopyHandle_t; +typedef struct +{ + struct CFE_SB_BufferD* BufDscPtr; /* abstract descriptor reference (internal use) */ +} CFE_SB_ZeroCopyHandle_t; /** \brief Quality Of Service Type Definition ** @@ -739,14 +742,14 @@ CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, ** pointer returned by a call to #CFE_SB_ZeroCopyGetPtr, ** but never used in a call to #CFE_SB_TransmitBuffer. ** -** \param[in] BufferHandle This must be the handle supplied with the pointer -** when #CFE_SB_ZeroCopyGetPtr was called. +** \param[in] ZeroCopyHandle This must be the handle supplied with the pointer +** when #CFE_SB_ZeroCopyGetPtr was called. ** ** \return Execution status, see \ref CFEReturnCodes ** \retval #CFE_SUCCESS \copybrief CFE_SUCCESS ** \retval #CFE_SB_BUFFER_INVALID \copybrief CFE_SB_BUFFER_INVALID **/ -CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t BufferHandle); +CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle); /*****************************************************************************/ /** @@ -762,14 +765,19 @@ CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_Zero ** performance in high-rate, high-volume software bus traffic. ** ** \par Assumptions, External Events, and Notes: -** -# The pointer returned by #CFE_SB_ZeroCopyGetPtr is only good for -** one call to #CFE_SB_TransmitBuffer. -** -# Callers must not use the same SB message buffer for multiple sends. +** -# A handle returned by #CFE_SB_ZeroCopyGetPtr is "consumed" by +** a _successful_ call to #CFE_SB_TransmitBuffer. +** -# If this function returns CFE_SUCCESS, this indicates the zero copy handle is +** now owned by software bus, and is no longer owned by the calling application, +** and should not be re-used. +** -# Howver if this function fails (returns any error status) it does not change +** the state of the buffer at all, meaning the calling application still owns it. +** (a failure means the buffer is left in the same state it was before the call). ** -# Applications should be written as if #CFE_SB_ZeroCopyGetPtr is -** equivalent to a \c malloc() and #CFE_SB_TransmitBuffer is equivalent -** to a \c free(). +** equivalent to a \c malloc() and a successful call to #CFE_SB_TransmitBuffer +** is equivalent to a \c free(). ** -# Applications must not de-reference the message pointer (for reading -** or writing) after the call to #CFE_SB_TransmitBuffer. +** or writing) after a successful call to #CFE_SB_TransmitBuffer. ** -# This function will increment and apply the internally tracked ** sequence counter if set to do so. ** diff --git a/fsw/cfe-core/src/sb/cfe_sb_api.c b/fsw/cfe-core/src/sb/cfe_sb_api.c index 8a1b33a98..32740fdee 100644 --- a/fsw/cfe-core/src/sb/cfe_sb_api.c +++ b/fsw/cfe-core/src/sb/cfe_sb_api.c @@ -1393,7 +1393,6 @@ int32 CFE_SB_TransmitMsg(CFE_MSG_Message_t *MsgPtr, bool IncrementSequenceCount char FullName[(OS_MAX_API_NAME * 2)]; CFE_SB_BufferD_t *BufDscPtr; CFE_SBR_RouteId_t RouteId; - CFE_MSG_Type_t MsgType; uint16 PendingEventID; PendingEventID = 0; @@ -1401,44 +1400,66 @@ int32 CFE_SB_TransmitMsg(CFE_MSG_Message_t *MsgPtr, bool IncrementSequenceCount Status = CFE_SB_TransmitMsgValidate(MsgPtr, &MsgId, &Size, &RouteId); + CFE_SB_LockSharedData(__func__, __LINE__); + if (Status == CFE_SUCCESS && CFE_SBR_IsValidRouteId(RouteId)) { - CFE_SB_LockSharedData(__func__, __LINE__); - - /* Get buffer */ - BufDscPtr = CFE_SB_GetBufferFromPool(MsgId, Size); + /* Get buffer - note this pre-initializes the returned buffer with + * a use count of 1, which refers to this task as it fills the buffer. */ + BufDscPtr = CFE_SB_GetBufferFromPool(Size); if (BufDscPtr == NULL) { PendingEventID = CFE_SB_GET_BUF_ERR_EID; Status = CFE_SB_BUF_ALOC_ERR; } - else - { - /* For Tlm packets, increment the seq count if requested */ - CFE_MSG_GetType(MsgPtr, &MsgType); - if((MsgType == CFE_MSG_Type_Tlm) && IncrementSequenceCount) - { - CFE_SBR_IncrementSequenceCounter(RouteId); - CFE_MSG_SetSequenceCount(MsgPtr, CFE_SBR_GetSequenceCounter(RouteId)); - } - } - - CFE_SB_UnlockSharedData(__func__, __LINE__); } - if (Status == CFE_SUCCESS && BufDscPtr != NULL) + /* + * Increment the MsgSendErrorCounter only if there was a real error, + * such as a validation issue or failure to allocate a buffer. + * + * (This should NOT be done if simply no route) + */ + if (Status != CFE_SUCCESS) { - /* Copy data into buffer and transmit */ - memcpy(BufDscPtr->Buffer, MsgPtr, Size); - Status = CFE_SB_TransmitBufferFull(BufDscPtr, RouteId, MsgId); + CFE_SB_Global.HKTlmMsg.Payload.MsgSendErrorCounter++; } - if (Status != CFE_SUCCESS) + CFE_SB_UnlockSharedData(__func__, __LINE__); + + /* + * If a buffer was obtained above, then copy the content into it + * and broadcast it to all subscribers in the route. + * + * Note - if there is no route / no subscribers, the "Status" will + * be CFE_SUCCESS because CFE_SB_TransmitMsgValidate() succeeded, + * but there will be no buffer because CFE_SBR_IsValidRouteId() returned + * false. + * + * But if the desciptor is non-null it means the message is valid and + * there is a route to send it to. + */ + if (BufDscPtr != NULL) { - /* Increment error counter (inside lock) if not success */ - CFE_SB_LockSharedData(__func__, __LINE__); - CFE_SB_Global.HKTlmMsg.Payload.MsgSendErrorCounter++; - CFE_SB_UnlockSharedData(__func__, __LINE__); + /* Copy actual message content into buffer and set its metadata */ + memcpy(&BufDscPtr->Content, MsgPtr, Size); + BufDscPtr->MsgId = MsgId; + BufDscPtr->ContentSize = Size; + BufDscPtr->AutoSequence = IncrementSequenceCount; + CFE_MSG_GetType(MsgPtr, &BufDscPtr->ContentType); + + /* + * This routine will use best-effort to send to all subscribers, + * increment the buffer use count for every successful delivery, + * and send an event/increment counter for any unsucessful delivery. + */ + CFE_SB_BroadcastBufferToRoute(BufDscPtr, RouteId); + + /* + * The broadcast function consumes the buffer, so it should not be + * accessed in this function anymore + */ + BufDscPtr = NULL; } if (PendingEventID == CFE_SB_GET_BUF_ERR_EID) @@ -1615,11 +1636,8 @@ int32 CFE_SB_TransmitMsgValidate(CFE_MSG_Message_t *MsgPtr, * \param[in] BufDscPtr Pointer to the buffer description from the memory pool, * released prior to return * \param[in] RouteId Route to send to - * \param[in] MsgId Message Id that is being sent */ -int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, - CFE_SBR_RouteId_t RouteId, - CFE_SB_MsgId_t MsgId) +void CFE_SB_BroadcastBufferToRoute(CFE_SB_BufferD_t *BufDscPtr, CFE_SBR_RouteId_t RouteId) { CFE_ES_AppId_t AppId; CFE_ES_TaskId_t TskId; @@ -1631,7 +1649,6 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, char FullName[(OS_MAX_API_NAME * 2)]; char PipeName[OS_MAX_API_NAME]; - Status = CFE_SUCCESS; SBSndErr.EvtsToSnd = 0; /* get app id for loopback testing */ @@ -1643,66 +1660,75 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, /* take semaphore to prevent a task switch during processing */ CFE_SB_LockSharedData(__func__,__LINE__); - /* Send the packet to all destinations */ - for(DestPtr = CFE_SBR_GetDestListHeadPtr(RouteId); DestPtr != NULL; DestPtr = DestPtr->Next) + /* For an invalid route / no subsribers this whole logic can be skipped */ + if (CFE_SBR_IsValidRouteId(RouteId)) { - if (DestPtr->Active == CFE_SB_ACTIVE) /* destination is active */ + /* Set the seq count if requested (while locked) before actually sending */ + /* For some reason this is only done for TLM types (historical, TBD) */ + if (BufDscPtr->AutoSequence && BufDscPtr->ContentType == CFE_MSG_Type_Tlm) { - PipeDscPtr = CFE_SB_LocatePipeDescByID(DestPtr->PipeId); - } - else - { - PipeDscPtr = NULL; - } + CFE_SBR_IncrementSequenceCounter(RouteId); - if (!CFE_SB_PipeDescIsMatch(PipeDscPtr, DestPtr->PipeId)) - { - continue; + /* Write the sequence into the message header itself (overwrites whatever was there) */ + CFE_MSG_SetSequenceCount(&BufDscPtr->Content.Msg, CFE_SBR_GetSequenceCounter(RouteId)); } - if((PipeDscPtr->Opts & CFE_SB_PIPEOPTS_IGNOREMINE) != 0 && - CFE_RESOURCEID_TEST_EQUAL(PipeDscPtr->AppId, AppId)) + /* Send the packet to all destinations */ + for (DestPtr = CFE_SBR_GetDestListHeadPtr(RouteId); DestPtr != NULL; DestPtr = DestPtr->Next) { - continue; - }/* end if */ + if (DestPtr->Active == CFE_SB_ACTIVE) /* destination is active */ + { + PipeDscPtr = CFE_SB_LocatePipeDescByID(DestPtr->PipeId); + } + else + { + PipeDscPtr = NULL; + } - /* if Msg limit exceeded, log event, increment counter */ - /* and go to next destination */ - if(DestPtr->BuffCount >= DestPtr->MsgId2PipeLim) - { - SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].PipeId = DestPtr->PipeId; - SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].EventId = CFE_SB_MSGID_LIM_ERR_EID; - SBSndErr.EvtsToSnd++; - CFE_SB_Global.HKTlmMsg.Payload.MsgLimitErrorCounter++; - PipeDscPtr->SendErrors++; + if (!CFE_SB_PipeDescIsMatch(PipeDscPtr, DestPtr->PipeId)) + { + continue; + } - continue; - }/* end if */ + if ((PipeDscPtr->Opts & CFE_SB_PIPEOPTS_IGNOREMINE) != 0 && + CFE_RESOURCEID_TEST_EQUAL(PipeDscPtr->AppId, AppId)) + { + continue; + } /* end if */ - /* - ** Write the buffer descriptor to the queue of the pipe. If the write - ** failed, log info and increment the pipe's error counter. - */ - Status = OS_QueuePut(PipeDscPtr->SysQueueId, (void *)&BufDscPtr, sizeof(CFE_SB_BufferD_t *), 0); + /* if Msg limit exceeded, log event, increment counter */ + /* and go to next destination */ + if (DestPtr->BuffCount >= DestPtr->MsgId2PipeLim) + { + SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].PipeId = DestPtr->PipeId; + SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].EventId = CFE_SB_MSGID_LIM_ERR_EID; + SBSndErr.EvtsToSnd++; + CFE_SB_Global.HKTlmMsg.Payload.MsgLimitErrorCounter++; + PipeDscPtr->SendErrors++; - if (Status == OS_SUCCESS) - { - /* The queue now holds a ref to the buffer, so increment its ref count. */ - CFE_SB_IncrBufUseCnt(BufDscPtr); + continue; + } /* end if */ - DestPtr->BuffCount++; /* used for checking MsgId2PipeLimit */ - DestPtr->DestCnt++; /* used for statistics */ - ++PipeDscPtr->CurrentQueueDepth; - if (PipeDscPtr->CurrentQueueDepth >= PipeDscPtr->PeakQueueDepth) + /* + ** Write the buffer descriptor to the queue of the pipe. If the write + ** failed, log info and increment the pipe's error counter. + */ + Status = OS_QueuePut(PipeDscPtr->SysQueueId, &BufDscPtr, sizeof(BufDscPtr), 0); + + if (Status == OS_SUCCESS) { - PipeDscPtr->PeakQueueDepth = PipeDscPtr->CurrentQueueDepth; - } + /* The queue now holds a ref to the buffer, so increment its ref count. */ + CFE_SB_IncrBufUseCnt(BufDscPtr); - Status = CFE_SUCCESS; - } - else - { - if (Status == OS_QUEUE_FULL) + DestPtr->BuffCount++; /* used for checking MsgId2PipeLimit */ + DestPtr->DestCnt++; /* used for statistics */ + ++PipeDscPtr->CurrentQueueDepth; + if (PipeDscPtr->CurrentQueueDepth >= PipeDscPtr->PeakQueueDepth) + { + PipeDscPtr->PeakQueueDepth = PipeDscPtr->CurrentQueueDepth; + } + } + else if (Status == OS_QUEUE_FULL) { SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].PipeId = DestPtr->PipeId; SBSndErr.EvtBuf[SBSndErr.EvtsToSnd].EventId = CFE_SB_Q_FULL_ERR_EID; @@ -1719,12 +1745,35 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, SBSndErr.EvtsToSnd++; CFE_SB_Global.HKTlmMsg.Payload.InternalErrorCounter++; PipeDscPtr->SendErrors++; - }/*end if */ + } /*end if */ + + } /* end loop over destinations */ + } + + /* + * If any specific delivery issues occured, also increment the + * general error count before releasing the lock. + */ + if (SBSndErr.EvtsToSnd > 0) + { + CFE_SB_Global.HKTlmMsg.Payload.MsgSendErrorCounter++; + } + + /* + * Remove this from whatever list it was in + * + * If it was a singleton/new buffer this has no effect. + * If it was a zero-copy buffer this removes it from the ZeroCopyList. + */ + CFE_SB_TrackingListRemove(&BufDscPtr->Link); + + /* clear the AppID field in case it was a zero copy buffer, + * as it is no longer owned by that app after broadcasting */ + BufDscPtr->AppId = CFE_ES_APPID_UNDEFINED; + + /* track the buffer as an in-transit message */ + CFE_SB_TrackingListAdd(&CFE_SB_Global.InTransitList, &BufDscPtr->Link); - Status = CFE_STATUS_EXTERNAL_RESOURCE_FAIL; - } - } /* end loop over destinations */ - /* ** Decrement the buffer UseCount and free buffer if cnt=0. This decrement is done ** because the use cnt is initialized to 1 in CFE_SB_GetBufferFromPool. @@ -1753,7 +1802,7 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, CFE_EVS_SendEventWithAppID(CFE_SB_MSGID_LIM_ERR_EID,CFE_EVS_EventType_ERROR,CFE_SB_Global.AppId, "Msg Limit Err,MsgId 0x%x,pipe %s,sender %s", - (unsigned int)CFE_SB_MsgIdToValue(MsgId), + (unsigned int)CFE_SB_MsgIdToValue(BufDscPtr->MsgId), PipeName, CFE_SB_GetAppTskName(TskId,FullName)); /* clear the bit so the task may send this event again */ @@ -1772,7 +1821,7 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, CFE_EVS_SendEventWithAppID(CFE_SB_Q_FULL_ERR_EID,CFE_EVS_EventType_ERROR,CFE_SB_Global.AppId, "Pipe Overflow,MsgId 0x%x,pipe %s,sender %s", - (unsigned int)CFE_SB_MsgIdToValue(MsgId), + (unsigned int)CFE_SB_MsgIdToValue(BufDscPtr->MsgId), PipeName, CFE_SB_GetAppTskName(TskId,FullName)); /* clear the bit so the task may send this event again */ @@ -1788,7 +1837,7 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, CFE_EVS_SendEventWithAppID(CFE_SB_Q_WR_ERR_EID,CFE_EVS_EventType_ERROR,CFE_SB_Global.AppId, "Pipe Write Err,MsgId 0x%x,pipe %s,sender %s,stat 0x%x", - (unsigned int)CFE_SB_MsgIdToValue(MsgId), + (unsigned int)CFE_SB_MsgIdToValue(BufDscPtr->MsgId), PipeName, CFE_SB_GetAppTskName(TskId,FullName), (unsigned int)SBSndErr.EvtBuf[i].ErrStat); @@ -1798,9 +1847,6 @@ int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, }/* end if */ } - - return CFE_SUCCESS; - } #ifndef CFE_OMIT_DEPRECATED_6_8 @@ -1984,7 +2030,7 @@ int32 CFE_SB_ReceiveBuffer(CFE_SB_Buffer_t **BufPtr, * Also set the Receivers pointer to the address of the actual message * (currently this is "borrowing" the ref above, not its own ref) */ - *BufPtr = BufDscPtr->Buffer; + *BufPtr = &BufDscPtr->Content; /* get pointer to destination to be used in decrementing msg limit cnt*/ RouteId = CFE_SBR_GetRouteId(BufDscPtr->MsgId); @@ -2075,113 +2121,107 @@ int32 CFE_SB_ReceiveBuffer(CFE_SB_Buffer_t **BufPtr, CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *BufferHandle) { - int32 stat1; - CFE_ES_AppId_t AppId; - cpuaddr address = 0; - CFE_SB_ZeroCopyD_t *zcd = NULL; - CFE_SB_BufferD_t *bd = NULL; + CFE_ES_AppId_t AppId; + CFE_SB_BufferD_t *BufDscPtr; + CFE_SB_Buffer_t *BufPtr; - CFE_SB_LockSharedData(__func__,__LINE__); + AppId = CFE_ES_APPID_UNDEFINED; + BufDscPtr = NULL; + BufPtr = NULL; - /* Allocate a new zero copy descriptor from the SB memory pool.*/ - stat1 = CFE_ES_GetPoolBuf((CFE_ES_MemPoolBuf_t*)&zcd, CFE_SB_Global.Mem.PoolHdl, sizeof(CFE_SB_ZeroCopyD_t)); - if(stat1 < 0){ - CFE_SB_UnlockSharedData(__func__,__LINE__); + if (BufferHandle == NULL) + { return NULL; } - /* Add the size of a zero copy descriptor to the memory-in-use ctr and */ - /* adjust the high water mark if needed */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse+=stat1; - if(CFE_SB_Global.StatTlmMsg.Payload.MemInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse){ - CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = CFE_SB_Global.StatTlmMsg.Payload.MemInUse; - }/* end if */ + /* get callers AppId */ + if (CFE_ES_GetAppID(&AppId) == CFE_SUCCESS) + { + CFE_SB_LockSharedData(__func__,__LINE__); + + /* + * All this needs to do is get a descriptor from the pool, + * and associate that descriptor with this app ID, so it + * can be freed if this app is deleted before it uses it. + */ + BufDscPtr = CFE_SB_GetBufferFromPool(MsgSize); - /* Allocate a new buffer (from the SB memory pool) to hold the message */ - stat1 = CFE_ES_GetPoolBuf((CFE_ES_MemPoolBuf_t*)&bd, CFE_SB_Global.Mem.PoolHdl, MsgSize + sizeof(CFE_SB_BufferD_t)); - if((stat1 < 0)||(bd==NULL)){ - /*deallocate the first buffer if the second buffer creation fails*/ - stat1 = CFE_ES_PutPoolBuf(CFE_SB_Global.Mem.PoolHdl, zcd); - if(stat1 > 0){ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse-=stat1; + if (BufDscPtr != NULL) + { + /* Track the buffer as a zero-copy assigned to this app ID */ + BufDscPtr->AppId = AppId; + BufPtr = &BufDscPtr->Content; + CFE_SB_TrackingListAdd(&CFE_SB_Global.ZeroCopyList, &BufDscPtr->Link); } + CFE_SB_UnlockSharedData(__func__,__LINE__); - return NULL; } - /* Increment the number of buffers in use by one even though two buffers */ - /* were allocated. SBBuffersInUse increments on a per-message basis */ - CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse++; - if(CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse){ - CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse = CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse; - }/* end if */ - - /* Add the size of the actual buffer to the memory-in-use ctr and */ - /* adjust the high water mark if needed */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse+=stat1; - if(CFE_SB_Global.StatTlmMsg.Payload.MemInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse){ - CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = CFE_SB_Global.StatTlmMsg.Payload.MemInUse; - }/* end if */ - - /* first set ptr to actual msg buffer the same as ptr to descriptor */ - address = (cpuaddr)bd; - - /* increment actual msg buffer ptr beyond the descriptor */ - address += sizeof(CFE_SB_BufferD_t); - - /* Initialize the zero copy descriptor structure. */ - zcd->Size = MsgSize; - zcd->Buffer = (void *)address; - zcd->Next = NULL; - - /* Add this Zero Copy Descriptor to the end of the chain */ - if(CFE_SB_Global.ZeroCopyTail != NULL){ - ((CFE_SB_ZeroCopyD_t *) CFE_SB_Global.ZeroCopyTail)->Next = (void *)zcd; + if (BufPtr != NULL) + { + /* + * If a buffer was obtained, wipe it now. + * (This ensures the buffer is fully cleared at least once, + * no stale data from a prior use of the same memory) + */ + memset(BufPtr, 0, MsgSize); } - zcd->Prev = CFE_SB_Global.ZeroCopyTail; - CFE_SB_Global.ZeroCopyTail = (void *)zcd; - CFE_SB_UnlockSharedData(__func__,__LINE__); + /* Export both items (descriptor + msg buffer) to caller */ + BufferHandle->BufDscPtr = BufDscPtr; + return BufPtr; - /* get callers AppId */ - CFE_ES_GetAppID(&AppId); - zcd->AppID = AppId; - - (*BufferHandle) = (CFE_SB_ZeroCopyHandle_t) zcd; - - /* Initialize the buffer descriptor structure. */ - bd->UseCount = 1; - bd->Size = MsgSize; - bd->Buffer = (CFE_SB_Buffer_t *)address; +}/* CFE_SB_ZeroCopyGetPtr */ - return (CFE_SB_Buffer_t *)address; +/* + * Helper functions to do sanity checks on the Zero Copy handle + Buffer combo. + * + * Note in a future CFE version the API can be simplified - + * only one of these pointers is strictly needed, since they + * should refer to the same buffer descriptor object. + */ +int32 CFE_SB_ZeroCopyHandleValidate(CFE_SB_Buffer_t *BufPtr, + CFE_SB_ZeroCopyHandle_t ZeroCopyHandle) +{ + /* + * Sanity Check that the pointers are not NULL + */ + if (BufPtr == NULL || ZeroCopyHandle.BufDscPtr == NULL) + { + return CFE_SB_BAD_ARGUMENT; + } -}/* CFE_SB_ZeroCopyGetPtr */ + /* + * Check that the descriptor is actually a "zero copy" type, + * and that it refers to same actual message buffer. + */ + if (!CFE_RESOURCEID_TEST_DEFINED(ZeroCopyHandle.BufDscPtr->AppId) || + (&ZeroCopyHandle.BufDscPtr->Content != BufPtr)) + { + return CFE_SB_BUFFER_INVALID; + } + /* Basic sanity check passed */ + return CFE_SUCCESS; +} /* * Function: CFE_SB_ZeroCopyReleasePtr - See API and header file for details */ int32 CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, - CFE_SB_ZeroCopyHandle_t BufferHandle) + CFE_SB_ZeroCopyHandle_t ZeroCopyHandle) { - int32 Status; - int32 Stat2; - CFE_ES_MemPoolBuf_t BufAddr; + int32 Status; - Status = CFE_SB_ZeroCopyReleaseDesc(Ptr2Release, BufferHandle); + Status = CFE_SB_ZeroCopyHandleValidate(Ptr2Release, ZeroCopyHandle); CFE_SB_LockSharedData(__func__,__LINE__); - if(Status == CFE_SUCCESS){ - /* give the buffer back to the buffer pool */ - BufAddr = CFE_ES_MEMPOOLBUF_C((cpuaddr)Ptr2Release - sizeof(CFE_SB_BufferD_t)); - Stat2 = CFE_ES_PutPoolBuf(CFE_SB_Global.Mem.PoolHdl, BufAddr); - if(Stat2 > 0){ - /* Substract the size of the actual buffer from the Memory in use ctr */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse-=Stat2; - CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse--; - }/* end if */ + if (Status == CFE_SUCCESS) + { + /* Clear the ownership app ID and decrement use count (may also free) */ + ZeroCopyHandle.BufDscPtr->AppId = CFE_ES_APPID_UNDEFINED; + CFE_SB_DecrBufUseCnt(ZeroCopyHandle.BufDscPtr); } CFE_SB_UnlockSharedData(__func__,__LINE__); @@ -2190,69 +2230,6 @@ int32 CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, }/* end CFE_SB_ZeroCopyReleasePtr */ - -/****************************************************************************** -** Name: CFE_SB_ZeroCopyReleaseDesc -** -** Purpose: API used for releasing a zero copy descriptor (for zero copy mode -** only). -** -** Assumptions, External Events, and Notes: -** None -** -** Date Written: -** 04/25/2005 -** -** Input Arguments: -** Ptr2Release -** BufferHandle -** -** Output Arguments: -** None -** -** Return Values: -** Status -** -******************************************************************************/ -int32 CFE_SB_ZeroCopyReleaseDesc(CFE_SB_Buffer_t *Ptr2Release, - CFE_SB_ZeroCopyHandle_t BufferHandle) -{ - int32 Stat; - CFE_SB_ZeroCopyD_t *zcd = (CFE_SB_ZeroCopyD_t *) BufferHandle; - - CFE_SB_LockSharedData(__func__,__LINE__); - - Stat = CFE_ES_GetPoolBufInfo(CFE_SB_Global.Mem.PoolHdl, zcd); - - if((Ptr2Release == NULL) || (Stat < 0) || (zcd->Buffer != (void *)Ptr2Release)){ - CFE_SB_UnlockSharedData(__func__,__LINE__); - return CFE_SB_BUFFER_INVALID; - } - - /* delink the descriptor */ - if(zcd->Prev != NULL){ - ((CFE_SB_ZeroCopyD_t *) (zcd->Prev))->Next = zcd->Next; - } - if(zcd->Next != NULL){ - ((CFE_SB_ZeroCopyD_t *) (zcd->Next))->Prev = zcd->Prev; - } - if(CFE_SB_Global.ZeroCopyTail == (void *)zcd){ - CFE_SB_Global.ZeroCopyTail = zcd->Prev; - } - - /* give the descriptor back to the buffer pool */ - Stat = CFE_ES_PutPoolBuf(CFE_SB_Global.Mem.PoolHdl, zcd); - if(Stat > 0){ - /* Substract the size of the actual buffer from the Memory in use ctr */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse-=Stat; - }/* end if */ - - CFE_SB_UnlockSharedData(__func__,__LINE__); - - return CFE_SUCCESS; - -}/* end CFE_SB_ZeroCopyReleaseDesc */ - /* * Function CFE_SB_TransmitBuffer - See API and header file for details */ @@ -2261,54 +2238,53 @@ int32 CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, bool IncrementSequenceCount) { int32 Status; - CFE_MSG_Size_t Size = 0; - CFE_SB_MsgId_t MsgId = CFE_SB_INVALID_MSG_ID; CFE_SB_BufferD_t *BufDscPtr; CFE_SBR_RouteId_t RouteId; - CFE_MSG_Type_t MsgType; - /* Release zero copy handle */ - Status = CFE_SB_ZeroCopyReleaseDesc(BufPtr, ZeroCopyHandle); + Status = CFE_SB_ZeroCopyHandleValidate(BufPtr, ZeroCopyHandle); if (Status == CFE_SUCCESS) { - Status = CFE_SB_TransmitMsgValidate(&BufPtr->Msg, &MsgId, &Size, &RouteId); + /* Get actual buffer descriptor pointer from zero copy handle */ + BufDscPtr = ZeroCopyHandle.BufDscPtr; - /* Send if route exists */ + /* Validate the content and get the MsgId, store it in the descriptor */ + Status = CFE_SB_TransmitMsgValidate(&BufPtr->Msg, &BufDscPtr->MsgId, &BufDscPtr->ContentSize, &RouteId); + + /* + * Broadcast the message if validation succeeded. + * + * Note that for the case of no subscribers, the validation returns CFE_SUCCESS + * but the actual route ID may be invalid. This is OK and considered normal- + * the validation will increment the NoSubscribers count, but we should NOT + * increment the MsgSendErrorCounter here - it is not really a sending error to + * have no subscribers. CFE_SB_BroadcastBufferToRoute() will not send to + * anything if the route is not valid (benign). + */ if (Status == CFE_SUCCESS) { - /* Get buffer descriptor pointer */ - BufDscPtr = CFE_SB_GetBufferFromCaller(MsgId, BufPtr); + BufDscPtr->AutoSequence = IncrementSequenceCount; + CFE_MSG_GetType(&BufPtr->Msg, &BufDscPtr->ContentType); - if(CFE_SBR_IsValidRouteId(RouteId)) - { - /* For Tlm packets, increment the seq count if requested */ - CFE_MSG_GetType(&BufPtr->Msg, &MsgType); - if((MsgType == CFE_MSG_Type_Tlm) && IncrementSequenceCount) - { - CFE_SBR_IncrementSequenceCounter(RouteId); - CFE_MSG_SetSequenceCount(&BufPtr->Msg, CFE_SBR_GetSequenceCounter(RouteId)); - } + /* Now broadcast the message, which consumes the buffer */ + CFE_SB_BroadcastBufferToRoute(BufDscPtr, RouteId); - Status = CFE_SB_TransmitBufferFull(BufDscPtr, RouteId, MsgId); - } - else - { - /* Decrement use count if transmit buffer full not called */ - CFE_SB_LockSharedData(__func__, __LINE__); - CFE_SB_DecrBufUseCnt(BufDscPtr); - CFE_SB_UnlockSharedData(__func__, __LINE__); - } - } - else - { - /* Increment send error counter for validation failure */ - CFE_SB_LockSharedData(__func__, __LINE__); - CFE_SB_Global.HKTlmMsg.Payload.MsgSendErrorCounter++; - CFE_SB_UnlockSharedData(__func__, __LINE__); + /* + * IMPORTANT - the descriptor might be freed at any time after this, + * so the descriptor should not be accessed again after this point. + */ + BufDscPtr = NULL; } } + if (Status != CFE_SUCCESS) + { + /* Increment send error counter for validation failure */ + CFE_SB_LockSharedData(__func__, __LINE__); + CFE_SB_Global.HKTlmMsg.Payload.MsgSendErrorCounter++; + CFE_SB_UnlockSharedData(__func__, __LINE__); + } + return Status; } diff --git a/fsw/cfe-core/src/sb/cfe_sb_buf.c b/fsw/cfe-core/src/sb/cfe_sb_buf.c index 03424e1da..ab87523e9 100644 --- a/fsw/cfe-core/src/sb/cfe_sb_buf.c +++ b/fsw/cfe-core/src/sb/cfe_sb_buf.c @@ -39,6 +39,55 @@ #include "cfe_es.h" #include "cfe_error.h" +/* + * The actual message content of a SB Buffer Descriptor is the + * offset of the content member. This will be auto-aligned by + * the compiler according to the requirements of the machine. + */ +#define CFE_SB_BUFFERD_CONTENT_OFFSET (offsetof(CFE_SB_BufferD_t,Content)) + + +/****************************************************************************** + * + * Helper function to reset/clear the links on a list node (make empty) + */ +void CFE_SB_TrackingListReset(CFE_SB_BufferLink_t *Link) +{ + /* A singleton node/empty list points to itself */ + Link->Prev = Link; + Link->Next = Link; +} + +/****************************************************************************** + * + * Helper function to remove a node from a tracking list + */ +void CFE_SB_TrackingListRemove(CFE_SB_BufferLink_t *Node) +{ + /* Remove from list */ + Node->Prev->Next = Node->Next; + Node->Next->Prev = Node->Prev; + + /* The node is now a singleton */ + CFE_SB_TrackingListReset(Node); +} + +/****************************************************************************** + * + * Helper function to add a node to a tracking list + */ +void CFE_SB_TrackingListAdd(CFE_SB_BufferLink_t *List, CFE_SB_BufferLink_t *Node) +{ + /* Connect this node to the list at "prev" position (tail) */ + Node->Prev = List->Prev; + Node->Next = List; + + /* Connect list nodes to this node */ + Node->Prev->Next = Node; + Node->Next->Prev = Node; +} + + /****************************************************************************** ** Function: CFE_SB_GetBufferFromPool() ** @@ -48,82 +97,60 @@ ** by the SB to dynamically allocate memory to hold the message and a buffer ** descriptor associated with the message during the sending of a message. ** +** Note: +** This must only be invoked while holding the SB global lock +** ** Arguments: -** msgId : Message ID -** size : Size of the buffer in bytes. +** MaxMsgSize : Size of the buffer content area in bytes. ** ** Return: ** Pointer to the buffer descriptor for the new buffer, or NULL if the buffer ** could not be allocated. */ -CFE_SB_BufferD_t * CFE_SB_GetBufferFromPool(CFE_SB_MsgId_t MsgId, size_t Size) { - int32 stat1; - uint8 *address = NULL; - CFE_SB_BufferD_t *bd = NULL; +CFE_SB_BufferD_t *CFE_SB_GetBufferFromPool(size_t MaxMsgSize) +{ + int32 stat1; + size_t AllocSize; + CFE_SB_BufferD_t *bd = NULL; + + /* The allocation needs to include enough space for the descriptor object */ + AllocSize = MaxMsgSize + CFE_SB_BUFFERD_CONTENT_OFFSET; /* Allocate a new buffer descriptor from the SB memory pool.*/ - stat1 = CFE_ES_GetPoolBuf((CFE_ES_MemPoolBuf_t*)&bd, CFE_SB_Global.Mem.PoolHdl, Size + sizeof(CFE_SB_BufferD_t)); - if(stat1 < 0){ + stat1 = CFE_ES_GetPoolBuf((CFE_ES_MemPoolBuf_t *)&bd, CFE_SB_Global.Mem.PoolHdl, AllocSize); + if (stat1 < 0) + { return NULL; } /* increment the number of buffers in use and adjust the high water mark if needed */ CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse++; - if(CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse){ + if (CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse) + { CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse = CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse; - }/* end if */ + } /* end if */ /* Add the size of the actual buffer to the memory-in-use ctr and */ /* adjust the high water mark if needed */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse+=stat1; - if(CFE_SB_Global.StatTlmMsg.Payload.MemInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse){ + CFE_SB_Global.StatTlmMsg.Payload.MemInUse += AllocSize; + if (CFE_SB_Global.StatTlmMsg.Payload.MemInUse > CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse) + { CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = CFE_SB_Global.StatTlmMsg.Payload.MemInUse; - }/* end if */ - - /* first set ptr to actual msg buffer the same as ptr to descriptor */ - address = (uint8 *)bd; - - /* increment actual msg buffer ptr beyond the descriptor */ - address += sizeof(CFE_SB_BufferD_t); + } /* end if */ /* Initialize the buffer descriptor structure. */ - bd->MsgId = MsgId; - bd->UseCount = 1; - bd->Size = Size; - bd->Buffer = (CFE_SB_Buffer_t *)address; + memset(bd, 0, CFE_SB_BUFFERD_CONTENT_OFFSET); - return bd; - -}/* CFE_SB_GetBufferFromPool */ - - -/****************************************************************************** -** Function: CFE_SB_GetBufferFromCaller() -** -** Purpose: -** Request a buffer from the SB buffer pool to use as a buffer descriptor -** for and already created SB buffer. -** -** Arguments: -** msgId : Message ID -** Address : Address of the buffer -** -** Return: -** Pointer to the buffer descriptor for the suplied buffer, or NULL if the -** descriptor could not be allocated. -*/ - -CFE_SB_BufferD_t * CFE_SB_GetBufferFromCaller(CFE_SB_MsgId_t MsgId, - void *Address) { - CFE_SB_BufferD_t *bd = (CFE_SB_BufferD_t *)((cpuaddr)Address - sizeof(CFE_SB_BufferD_t)); + bd->MsgId = CFE_SB_INVALID_MSG_ID; + bd->UseCount = 1; + bd->AllocatedSize = AllocSize; - /* Initialize the MsgId in the buffer descriptor (the rest has already been initialized in this case). */ - bd->MsgId = MsgId; + CFE_SB_TrackingListReset(&bd->Link); return bd; -}/* CFE_SB_GetBufferFromCaller */ +} /* CFE_SB_GetBufferFromPool */ /****************************************************************************** @@ -134,24 +161,25 @@ CFE_SB_BufferD_t * CFE_SB_GetBufferFromCaller(CFE_SB_MsgId_t MsgId, ** One block is the memory used to store the actual message, the other block ** was used to store the buffer descriptor for the message. ** +** Note: +** This must only be invoked while holding the SB global lock +** ** Arguments: ** bd : Pointer to the buffer descriptor. ** ** Return: -** SB status +** None */ -int32 CFE_SB_ReturnBufferToPool(CFE_SB_BufferD_t *bd){ - int32 Stat; +void CFE_SB_ReturnBufferToPool(CFE_SB_BufferD_t *bd) +{ + /* Remove from any tracking list (no effect if not in a list) */ + CFE_SB_TrackingListRemove(&bd->Link); - /* give the buf descriptor back to the buf descriptor pool */ - Stat = CFE_ES_PutPoolBuf(CFE_SB_Global.Mem.PoolHdl, bd); - if(Stat > 0){ - CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse--; - /* Substract the size of a buffer descriptor from the Memory in use ctr */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse-=Stat; - }/* end if */ + --CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse; + CFE_SB_Global.StatTlmMsg.Payload.MemInUse -= bd->AllocatedSize; - return CFE_SUCCESS; + /* finally give the buf descriptor back to the buf descriptor pool */ + CFE_ES_PutPoolBuf(CFE_SB_Global.Mem.PoolHdl, bd); }/* end CFE_SB_ReturnBufferToPool */ @@ -166,6 +194,8 @@ int32 CFE_SB_ReturnBufferToPool(CFE_SB_BufferD_t *bd){ ** UseCount is a variable in the CFE_SB_BufferD_t and is used only to ** determine when a buffer may be returned to the memory pool. ** +** This must only be invoked while holding the SB global lock +** ** Arguments: ** bd : Pointer to the buffer descriptor. ** @@ -195,6 +225,8 @@ void CFE_SB_IncrBufUseCnt(CFE_SB_BufferD_t *bd) ** UseCount is a variable in the CFE_SB_BufferD_t and is used only to ** determine when a buffer may be returned to the memory pool. ** +** This must only be invoked while holding the SB global lock +** ** Arguments: ** bd : Pointer to the buffer descriptor. ** @@ -224,6 +256,9 @@ void CFE_SB_DecrBufUseCnt(CFE_SB_BufferD_t *bd) ** Purpose: ** This function gets a destination descriptor from the SB memory pool. ** +** Note: +** This must only be invoked while holding the SB global lock +** ** Arguments: ** None ** @@ -259,6 +294,9 @@ CFE_SB_DestinationD_t *CFE_SB_GetDestinationBlk(void) ** Purpose: ** This function returns a destination descriptor to the SB memory pool. ** +** Note: +** This must only be invoked while holding the SB global lock +** ** Arguments: ** None ** diff --git a/fsw/cfe-core/src/sb/cfe_sb_init.c b/fsw/cfe-core/src/sb/cfe_sb_init.c index 97a4339d4..561f88b48 100644 --- a/fsw/cfe-core/src/sb/cfe_sb_init.c +++ b/fsw/cfe-core/src/sb/cfe_sb_init.c @@ -118,8 +118,6 @@ int32 CFE_SB_EarlyInit (void) { CFE_SB_ValueToMsgId(CFE_SB_STATS_TLM_MID), sizeof(CFE_SB_Global.StatTlmMsg)); - CFE_SB_Global.ZeroCopyTail = NULL; - return Stat; }/* end CFE_SB_EarlyInit */ @@ -156,6 +154,12 @@ int32 CFE_SB_InitBuffers(void) { (unsigned long)CFE_SB_Global.Mem.Partition.Data,CFE_PLATFORM_SB_BUF_MEMORY_BYTES,(unsigned int)Stat); return Stat; } + + /* + * Initialize the buffer tracking lists to be empty + */ + CFE_SB_TrackingListReset(&CFE_SB_Global.InTransitList); + CFE_SB_TrackingListReset(&CFE_SB_Global.ZeroCopyList); return CFE_SUCCESS; diff --git a/fsw/cfe-core/src/sb/cfe_sb_priv.c b/fsw/cfe-core/src/sb/cfe_sb_priv.c index 2a67924aa..047fd0c81 100644 --- a/fsw/cfe-core/src/sb/cfe_sb_priv.c +++ b/fsw/cfe-core/src/sb/cfe_sb_priv.c @@ -535,20 +535,49 @@ void CFE_SB_RemoveDestNode(CFE_SBR_RouteId_t RouteId, CFE_SB_DestinationD_t *Nod ******************************************************************************/ int32 CFE_SB_ZeroCopyReleaseAppId(CFE_ES_AppId_t AppId) { - CFE_SB_ZeroCopyD_t *prev = NULL; - CFE_SB_ZeroCopyD_t *zcd = (CFE_SB_ZeroCopyD_t *) (CFE_SB_Global.ZeroCopyTail); + CFE_SB_BufferLink_t *NextLink; - while(zcd != NULL){ - prev = (CFE_SB_ZeroCopyD_t *) (zcd->Prev); - if( CFE_RESOURCEID_TEST_EQUAL(zcd->AppID, AppId) ) + /* Using a union type permits promotion of CFE_SB_BufferLink_t* to CFE_SB_BufferD_t* + * without violating aliasing rules or alignment rules. Note that the first element + * of CFE_SB_BufferD_t is a CFE_SB_BufferLink_t, which makes this possible. */ + union LocalBufDesc + { + CFE_SB_BufferLink_t As_Link; + CFE_SB_BufferD_t As_Dsc; + } *BufPtr; + + /* + * First go through the "ZeroCopy" tracking list and find all nodes + * with a matching AppID. This needs to be done while locked to + * prevent other tasks from changing the list at the same time. + */ + if (CFE_RESOURCEID_TEST_DEFINED(AppId)) + { + CFE_SB_LockSharedData(__func__, __LINE__); + + /* Get start of list */ + NextLink = CFE_SB_TrackingListGetNext(&CFE_SB_Global.ZeroCopyList); + while(!CFE_SB_TrackingListIsEnd(&CFE_SB_Global.ZeroCopyList, NextLink)) { - CFE_SB_ZeroCopyReleasePtr((CFE_SB_Buffer_t *) zcd->Buffer, (CFE_SB_ZeroCopyHandle_t) zcd); + /* Get buffer descriptor pointer */ + BufPtr = (union LocalBufDesc*)NextLink; + + /* Read the next link now in case this node gets moved */ + NextLink = CFE_SB_TrackingListGetNext(NextLink); + + /* Check if it is a zero-copy buffer owned by this app */ + if (CFE_RESOURCEID_TEST_EQUAL(BufPtr->As_Dsc.AppId, AppId)) + { + /* If so, decrement the use count as the app has now gone away */ + CFE_SB_DecrBufUseCnt(&BufPtr->As_Dsc); + } } - zcd = prev; + + CFE_SB_UnlockSharedData(__func__, __LINE__); } return CFE_SUCCESS; -}/* end CFE_SB_ZeroCopyReleasePtr */ +}/* end CFE_SB_ZeroCopyReleaseAppId */ /*****************************************************************************/ diff --git a/fsw/cfe-core/src/sb/cfe_sb_priv.h b/fsw/cfe-core/src/sb/cfe_sb_priv.h index d01ab0b5d..de6e5096f 100644 --- a/fsw/cfe-core/src/sb/cfe_sb_priv.h +++ b/fsw/cfe-core/src/sb/cfe_sb_priv.h @@ -98,6 +98,17 @@ ** Type Definitions */ + +/** + * \brief Basic linked list structure allowing all buffer descriptors to be tracked. + */ +typedef struct CFE_SB_BufferLink +{ + struct CFE_SB_BufferLink *Next; + struct CFE_SB_BufferLink *Prev; + +} CFE_SB_BufferLink_t; + /****************************************************************************** ** Typedef: CFE_SB_BufferD_t ** @@ -108,32 +119,45 @@ ** Note: Changing the size of this structure may require the memory pool ** block sizes to change. */ +typedef struct CFE_SB_BufferD +{ + CFE_SB_BufferLink_t Link; /**< Links for inclusion in the tracking lists */ + + /** + * Actual MsgId of the content, cached here to avoid repeat + * calls into CFE_MSG API during traversal/delivery of the message. + * + * MsgId is set for buffers which contain actual data in transit. AppId is unset + * while in transit, as it may be sent to multiple apps. + * + * During zero copy buffer initial allocation, the MsgId is not known at this time + * and should be set to the invalid msg ID. + */ + CFE_SB_MsgId_t MsgId; + + /** + * Current owner of the buffer, if owned by a single app. + * + * This is used to track "zero copy" buffer allocations - this will be set to + * the AppID that initally allocated it, before it is used to transmit a message. + * + * When the message is in transit, it may be queued to multiple applictions, + * so this is unset. + */ + CFE_ES_AppId_t AppId; + + size_t AllocatedSize; /**< Total size of this descriptor (including descriptor itself) */ + size_t ContentSize; /**< Actual size of message content currently stored in the buffer */ + CFE_MSG_Type_t ContentType; /**< Type of message content currently stored in the buffer */ + + bool AutoSequence; /**< If message should get its sequence number assigned from the route */ + + uint16 UseCount; /**< Number of active references to this buffer in the system */ + + CFE_SB_Buffer_t Content; /* Variably sized content field, Keep last */ -typedef struct { - CFE_SB_MsgId_t MsgId; - uint16 UseCount; - size_t Size; - CFE_SB_Buffer_t *Buffer; } CFE_SB_BufferD_t; -/****************************************************************************** -** Typedef: CFE_SB_ZeroCopyD_t -** -** Purpose: -** This structure defines a ZERO COPY BUFFER DESCRIPTOR used to specify -** the buffer provided to a requestor. -** -** Note: Changing the size of this structure may require the memory pool -** block sizes to change. -*/ - -typedef struct { - CFE_ES_AppId_t AppID; - size_t Size; - void *Buffer; - void *Next; - void *Prev; -} CFE_SB_ZeroCopyD_t; /****************************************************************************** ** Typedef: CFE_SB_PipeD_t @@ -219,7 +243,6 @@ typedef struct uint32 SubscriptionReporting; CFE_ES_AppId_t AppId; uint32 StopRecurseFlags[OS_MAX_TASKS]; - void *ZeroCopyTail; CFE_SB_PipeD_t PipeTbl[CFE_PLATFORM_SB_MAX_PIPES]; CFE_SB_HousekeepingTlm_t HKTlmMsg; CFE_SB_StatsTlm_t StatTlmMsg; @@ -231,6 +254,13 @@ typedef struct CFE_ResourceId_t LastPipeId; CFE_SB_BackgroundFileStateInfo_t BackgroundFile; + + /* A list of buffers currently in-transit, owned by SB */ + CFE_SB_BufferLink_t InTransitList; + + /* A list of buffers currently issued to apps for zero-copy */ + CFE_SB_BufferLink_t ZeroCopyList; + } CFE_SB_Global_t; @@ -273,13 +303,10 @@ void CFE_SB_UnlockSharedData(const char *FuncName, int32 LineNumber); void CFE_SB_ReleaseBuffer (CFE_SB_BufferD_t *bd, CFE_SB_DestinationD_t *dest); int32 CFE_SB_WriteQueue(CFE_SB_PipeD_t *pd,uint32 TskId, const CFE_SB_BufferD_t *bd,CFE_SB_MsgId_t MsgId ); -int32 CFE_SB_ReturnBufferToPool(CFE_SB_BufferD_t *bd); void CFE_SB_ProcessCmdPipePkt(CFE_SB_Buffer_t *SBBufPtr); void CFE_SB_ResetCounters(void); void CFE_SB_SetMsgSeqCnt(CFE_MSG_Message_t *MsgPtr,uint32 Count); char *CFE_SB_GetAppTskName(CFE_ES_TaskId_t TaskId, char* FullName); -CFE_SB_BufferD_t *CFE_SB_GetBufferFromPool(CFE_SB_MsgId_t MsgId, size_t Size); -CFE_SB_BufferD_t *CFE_SB_GetBufferFromCaller(CFE_SB_MsgId_t MsgId, void *Address); int32 CFE_SB_DeletePipeWithAppId(CFE_SB_PipeId_t PipeId,CFE_ES_AppId_t AppId); int32 CFE_SB_DeletePipeFull(CFE_SB_PipeId_t PipeId,CFE_ES_AppId_t AppId); int32 CFE_SB_SubscribeFull(CFE_SB_MsgId_t MsgId, @@ -293,14 +320,10 @@ int32 CFE_SB_UnsubscribeWithAppId(CFE_SB_MsgId_t MsgId, CFE_SB_PipeId_t PipeId, int32 CFE_SB_UnsubscribeFull(CFE_SB_MsgId_t MsgId, CFE_SB_PipeId_t PipeId, uint8 Scope, CFE_ES_AppId_t AppId); -int32 CFE_SB_TransmitBufferFull(CFE_SB_BufferD_t *BufDscPtr, - CFE_SBR_RouteId_t RouteId, - CFE_SB_MsgId_t MsgId); int32 CFE_SB_TransmitMsgValidate(CFE_MSG_Message_t *MsgPtr, CFE_SB_MsgId_t *MsgIdPtr, CFE_MSG_Size_t *SizePtr, CFE_SBR_RouteId_t *RouteIdPtr); -int32 CFE_SB_ZeroCopyReleaseDesc(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t BufferHandle); int32 CFE_SB_ZeroCopyReleaseAppId(CFE_ES_AppId_t AppId); void CFE_SB_IncrBufUseCnt(CFE_SB_BufferD_t *bd); void CFE_SB_DecrBufUseCnt(CFE_SB_BufferD_t *bd); @@ -314,6 +337,110 @@ void CFE_SB_FinishSendEvent(CFE_ES_TaskId_t TaskId, uint32 Bit); CFE_SB_DestinationD_t *CFE_SB_GetDestinationBlk(void); int32 CFE_SB_PutDestinationBlk(CFE_SB_DestinationD_t *Dest); +/** + * \brief For SB buffer tracking, get first/next position in a list + */ +static inline CFE_SB_BufferLink_t *CFE_SB_TrackingListGetNext(CFE_SB_BufferLink_t *Node) +{ + return Node->Next; +} + +/** + * \brief For SB buffer tracking, checks if this current position represents the end of the list + */ +static inline bool CFE_SB_TrackingListIsEnd(CFE_SB_BufferLink_t *List, CFE_SB_BufferLink_t *Node) +{ + /* Normally list nodes should never have NULL, buf if they do, do not follow it */ + return (Node == NULL || Node == List); +} + +/** + * \brief For SB buffer tracking, reset link state to default + * + * This turns the node into a singleton/lone object (not in a list) + * or resets the head link to be empty. + */ +void CFE_SB_TrackingListReset(CFE_SB_BufferLink_t *Link); + +/** + * \brief For SB buffer tracking, removes a node from a tracking list + * + * Extracts a single node from whatever list it is in. After this the + * node becomes a singleton owned by the caller. It may be put into + * another list or freed. + */ +void CFE_SB_TrackingListRemove(CFE_SB_BufferLink_t *Node); + +/** + * \brief For SB buffer tracking, adds a node to a tracking list + * + * Extracts a single node from the list its in. After this the + * node becomes a singleton owned by the caller. It must put it + * in another list or free it. + */ +void CFE_SB_TrackingListAdd(CFE_SB_BufferLink_t *List, CFE_SB_BufferLink_t *Node); + + +/** + * \brief Allocates a new buffer descriptor from the SB memory pool. + * + * \param[in] MaxMsgSize Maximum message content size that the buffer must be capable of holding + * \returns Pointer to buffer descriptor, or NULL on failure. + */ +CFE_SB_BufferD_t *CFE_SB_GetBufferFromPool(size_t MaxMsgSize); + +/** + * \brief Returns a buffer to SB memory pool + * + * \param[in] Pointer to descriptor to return + */ +void CFE_SB_ReturnBufferToPool(CFE_SB_BufferD_t *bd); + +/** + * \brief Broadcast a SB buffer descriptor to all destinations in route + * + * Internal routine that implements the logic of transmitting a message buffer + * to all destinations subscribed in the SB route. + * + * As this function will broadcast the message to any number of destinations (0-many), + * and some may be successful and some may fail, the status cannot be expressed + * in any single error code, so this does not return any status. + * + * Instead, this routine handles all potential outcomes on its own, and does + * not expect the caller to handle any delivery issues. Also note that the general + * design pattern of the software bus is a "send and forget" model where the sender does + * not know (or care) what entities are subscribed to the data being generated. + * + * - For any undeliverable destination (limit, OSAL error, etc), a proper event is generated. + * - For any successful queueing, the buffer use count is incremented + * + * The caller is expected to hold a reference (use count) of the buffer prior to invoking + * this routine, representing itself, which is then consumed by this routine. + * + * \note _This call will "consume" the buffer by decrementing the buffer use count_ after + * broadcasting the message to all subscribed pipes. + * + * The caller should not access the buffer again after calling this function, as it may + * be deallocated at any time. If the caller wishes to continue accessing the buffer, + * it should explicitly increment the use count before calling this, which will prevent + * deallocation. + * + * \param[in] BufDscPtr Pointer to the buffer descriptor to broadcast + * \param[in] RouteId Route to send to + */ +void CFE_SB_BroadcastBufferToRoute(CFE_SB_BufferD_t *BufDscPtr, CFE_SBR_RouteId_t RouteId); + +/** + * \brief Perform basic sanity check on the Zero Copy handle + * + * \param[in] BufPtr pointer to the content buffer + * \param[in] ZeroCopyHandle Zero copy handle to check + * + * \returns CFE_SUCCESS if validation passed, or error code. + */ +int32 CFE_SB_ZeroCopyHandleValidate(CFE_SB_Buffer_t *BufPtr, + CFE_SB_ZeroCopyHandle_t ZeroCopyHandle); + /** * \brief Add a destination node * diff --git a/fsw/cfe-core/unit-test/sb_UT.c b/fsw/cfe-core/unit-test/sb_UT.c index e6afc7715..2f2cd8346 100644 --- a/fsw/cfe-core/unit-test/sb_UT.c +++ b/fsw/cfe-core/unit-test/sb_UT.c @@ -2739,9 +2739,10 @@ void Test_TransmitMsg_API(void) SB_UT_ADD_SUBTEST(Test_TransmitMsg_ZeroCopyGetPtr); SB_UT_ADD_SUBTEST(Test_TransmitBuffer_IncrementSeqCnt); SB_UT_ADD_SUBTEST(Test_TransmitBuffer_NoIncrement); + SB_UT_ADD_SUBTEST(Test_TransmitMsg_ZeroCopyHandleValidate); SB_UT_ADD_SUBTEST(Test_TransmitMsg_ZeroCopyReleasePtr); SB_UT_ADD_SUBTEST(Test_TransmitMsg_DisabledDestination); - SB_UT_ADD_SUBTEST(Test_TransmitBufferFull); + SB_UT_ADD_SUBTEST(Test_BroadcastBufferToRoute); SB_UT_ADD_SUBTEST(Test_TransmitMsgValidate_MaxMsgSizePlusOne); SB_UT_ADD_SUBTEST(Test_TransmitMsgValidate_NoSubscribers); } /* end Test_TransmitMsg_API */ @@ -3049,42 +3050,30 @@ void Test_TransmitMsg_ZeroCopyGetPtr(void) { CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl; uint16 MsgSize = 10; + uint32 MemUse; /* Have GetPoolBuf stub return error on its next call (buf descriptor * allocation failed) */ UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBuf), 1, CFE_ES_ERR_MEM_BLOCK_SIZE); - ASSERT_TRUE((cpuaddr) CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) == (cpuaddr) NULL); - - /* Have GetPoolBuf stub return error on its second call (actual buffer - * allocation failed) - */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBuf), 2, CFE_ES_ERR_MEM_BLOCK_SIZE); - ASSERT_TRUE((cpuaddr) CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) == (cpuaddr) NULL); - - /* Have GetPoolBuf stub return error on its second call (null buffer - * returned and error returning the memory to the buffer) - */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBuf), 2, -1); - UT_SetDeferredRetcode(UT_KEY(CFE_ES_PutPoolBuf), 1, 0); - ASSERT_TRUE((cpuaddr) CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) == (cpuaddr) NULL); + ASSERT_TRUE(CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) == NULL); EVTCNT(0); /* Increase the peak memory and buffers in use above the expected values in * order to exercise branch paths */ - CFE_SB_Global.StatTlmMsg.Payload.MemInUse = 0; - CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = sizeof(CFE_SB_ZeroCopyD_t) + MsgSize + - sizeof(CFE_SB_BufferD_t) + 1; + + /* predict memory use for a given descriptor (this needs to match what impl does) */ + MemUse = MsgSize + offsetof(CFE_SB_BufferD_t, Content); + CFE_SB_Global.StatTlmMsg.Payload.MemInUse = 0; + CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = MemUse + 10; CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse = CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse + 2; - ASSERT_TRUE((cpuaddr) CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) != (cpuaddr) NULL); + ASSERT_TRUE(CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl) != NULL); - ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse, sizeof(CFE_SB_ZeroCopyD_t) + MsgSize + - sizeof(CFE_SB_BufferD_t) + 1); - ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.MemInUse, sizeof(CFE_SB_ZeroCopyD_t) + MsgSize + - sizeof(CFE_SB_BufferD_t)); + ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse, MemUse + 10); /* unchanged */ + ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.MemInUse, MemUse); /* predicted value */ ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.PeakSBBuffersInUse, CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse + 1); @@ -3092,6 +3081,48 @@ void Test_TransmitMsg_ZeroCopyGetPtr(void) } /* end Test_TransmitMsg_ZeroCopyGetPtr */ +void Test_TransmitMsg_ZeroCopyHandleValidate(void) +{ + CFE_SB_Buffer_t *SendPtr; + CFE_SB_ZeroCopyHandle_t GoodZeroCpyBufHndl; + CFE_SB_ZeroCopyHandle_t BadZeroCpyBufHndl; + CFE_SB_ZeroCopyHandle_t NullZeroCpyBufHndl; + CFE_SB_BufferD_t TestBufDsc; + + /* Create a real/valid Zero Copy handle via the API */ + SendPtr = CFE_SB_ZeroCopyGetPtr(sizeof(SB_UT_Test_Tlm_t), &GoodZeroCpyBufHndl); + if (SendPtr == NULL) + { + UtAssert_Failed("Unexpected NULL pointer returned from ZeroCopyGetPtr"); + } + + /* The NULL handle is just zero */ + NullZeroCpyBufHndl = (CFE_SB_ZeroCopyHandle_t) { 0 }; + + /* Create an invalid Zero Copy handle that is not NULL but refers to a + * descriptor which is NOT from CFE_SB_ZeroCopyGetPtr(). */ + memset(&TestBufDsc, 0, sizeof(TestBufDsc)); + BadZeroCpyBufHndl = (CFE_SB_ZeroCopyHandle_t) { &TestBufDsc }; + + /* Good buffer pointer + Null Handle => BAD_ARGUMENT */ + ASSERT_EQ(CFE_SB_ZeroCopyHandleValidate(SendPtr, NullZeroCpyBufHndl), CFE_SB_BAD_ARGUMENT); + + /* Bad buffer pointer + Good Handle => BAD_ARGUMENT */ + ASSERT_EQ(CFE_SB_ZeroCopyHandleValidate(NULL, GoodZeroCpyBufHndl), CFE_SB_BAD_ARGUMENT); + + /* Good buffer pointer + Non Zero-Copy Handle => CFE_SB_BUFFER_INVALID */ + ASSERT_EQ(CFE_SB_ZeroCopyHandleValidate(SendPtr, BadZeroCpyBufHndl), CFE_SB_BUFFER_INVALID); + + /* Mismatched buffer pointer + Good Handle => CFE_SB_BUFFER_INVALID */ + ASSERT_EQ(CFE_SB_ZeroCopyHandleValidate(SendPtr+1, GoodZeroCpyBufHndl), CFE_SB_BUFFER_INVALID); + + /* Good buffer pointer + Good Handle => SUCCESS */ + ASSERT_EQ(CFE_SB_ZeroCopyHandleValidate(SendPtr, GoodZeroCpyBufHndl), CFE_SUCCESS); + + /* Clean-up */ + CFE_SB_ZeroCopyReleasePtr(SendPtr, GoodZeroCpyBufHndl); +} + /* ** Test successfully sending a message in zero copy mode (telemetry source ** sequence count increments) @@ -3103,7 +3134,7 @@ void Test_TransmitBuffer_IncrementSeqCnt(void) CFE_SB_PipeId_t PipeId; CFE_SB_MsgId_t MsgId = SB_UT_TLM_MID; uint32 PipeDepth = 10; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl = 0; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl; CFE_MSG_SequenceCount_t SeqCnt; CFE_MSG_Size_t Size = sizeof(SB_UT_Test_Tlm_t); CFE_MSG_Type_t Type = CFE_MSG_Type_Tlm; @@ -3115,16 +3146,13 @@ void Test_TransmitBuffer_IncrementSeqCnt(void) SETUP(CFE_SB_Subscribe(MsgId, PipeId)); + /* Create a real/valid Zero Copy handle via the API */ SendPtr = CFE_SB_ZeroCopyGetPtr(sizeof(SB_UT_Test_Tlm_t), &ZeroCpyBufHndl); if (SendPtr == NULL) { UtAssert_Failed("Unexpected NULL pointer returned from ZeroCopyGetPtr"); } - /* Test response to a get pool information error */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBufInfo), 1, -1); - ASSERT_EQ(CFE_SB_TransmitBuffer(SendPtr, ZeroCpyBufHndl, true), CFE_SB_BUFFER_INVALID); - UT_SetDataBuffer(UT_KEY(CFE_MSG_GetMsgId), &MsgId, sizeof(MsgId), false); UT_SetDataBuffer(UT_KEY(CFE_MSG_GetSize), &Size, sizeof(Size), false); UT_SetDataBuffer(UT_KEY(CFE_MSG_GetType), &Type, sizeof(Type), false); @@ -3157,7 +3185,7 @@ void Test_TransmitBuffer_NoIncrement(void) CFE_SB_PipeId_t PipeId; CFE_SB_MsgId_t MsgId = SB_UT_TLM_MID; uint32 PipeDepth = 10; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl = 0; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl; CFE_MSG_SequenceCount_t SeqCnt = 22; CFE_MSG_Size_t Size = sizeof(SB_UT_Test_Tlm_t); CFE_MSG_Type_t Type = CFE_MSG_Type_Tlm; @@ -3175,10 +3203,6 @@ void Test_TransmitBuffer_NoIncrement(void) UtAssert_Failed("Unexpected NULL pointer returned from ZeroCopyGetPtr"); } - /* Test response to a get pool information error */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBufInfo), 1, -1); - ASSERT_EQ(CFE_SB_TransmitBuffer(SendPtr, ZeroCpyBufHndl, false), CFE_SB_BUFFER_INVALID); - UT_SetDataBuffer(UT_KEY(CFE_MSG_GetMsgId), &MsgId, sizeof(MsgId), false); UT_SetDataBuffer(UT_KEY(CFE_MSG_GetSize), &Size, sizeof(Size), false); UT_SetDataBuffer(UT_KEY(CFE_MSG_GetType), &Type, sizeof(Type), false); @@ -3206,9 +3230,9 @@ void Test_TransmitMsg_ZeroCopyReleasePtr(void) CFE_SB_Buffer_t *ZeroCpyMsgPtr1 = NULL; CFE_SB_Buffer_t *ZeroCpyMsgPtr2 = NULL; CFE_SB_Buffer_t *ZeroCpyMsgPtr3 = NULL; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl1 = 0; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl2 = 0; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl3 = 0; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl1; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl2; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl3; uint16 MsgSize = 10; ZeroCpyMsgPtr1 = CFE_SB_ZeroCopyGetPtr(MsgSize, &ZeroCpyBufHndl1); @@ -3217,27 +3241,14 @@ void Test_TransmitMsg_ZeroCopyReleasePtr(void) SETUP(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr2, ZeroCpyBufHndl2)); /* Test response to an invalid buffer */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_GetPoolBufInfo), 1, -1); ASSERT_EQ(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr2, ZeroCpyBufHndl2), CFE_SB_BUFFER_INVALID); /* Test response to a null message pointer */ - ASSERT_EQ(CFE_SB_ZeroCopyReleasePtr(NULL, ZeroCpyBufHndl2), CFE_SB_BUFFER_INVALID); + ASSERT_EQ(CFE_SB_ZeroCopyReleasePtr(NULL, ZeroCpyBufHndl3), CFE_SB_BAD_ARGUMENT); /* Test response to an invalid message pointer */ - ASSERT_EQ(CFE_SB_ZeroCopyReleasePtr((CFE_SB_Buffer_t *) 0x1234, - ZeroCpyBufHndl2), CFE_SB_BUFFER_INVALID); - - /* Test path when return the descriptor to the pool fails in - * CFE_SB_ZeroCopyReleaseDesc - */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_PutPoolBuf), 1, -1); - ASSERT(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr2, ZeroCpyBufHndl2)); - - /* Test path when return the buffer to the pool fails in - * CFE_SB_ZeroCopyReleasePtr - */ - UT_SetDeferredRetcode(UT_KEY(CFE_ES_PutPoolBuf), 2, -1); - ASSERT(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr2, ZeroCpyBufHndl2)); + ASSERT_EQ(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr1, + ZeroCpyBufHndl3), CFE_SB_BUFFER_INVALID); /* Test successful release of the second buffer */ ASSERT(CFE_SB_ZeroCopyReleasePtr(ZeroCpyMsgPtr3, ZeroCpyBufHndl3)); @@ -3287,31 +3298,34 @@ void Test_TransmitMsg_DisabledDestination(void) } /* end Test_TransmitMsg_DisabledDestination */ /* -** Test successful CFE_SB_TransmitBufferFull +** Test successful CFE_SB_BroadcastBufferToRoute */ -void Test_TransmitBufferFull(void) +void Test_BroadcastBufferToRoute(void) { CFE_SB_PipeId_t PipeId; CFE_SB_MsgId_t MsgId = SB_UT_TLM_MID; CFE_SB_BufferD_t SBBufD; - CFE_SB_Buffer_t MsgBuf; int32 PipeDepth; CFE_SBR_RouteId_t RouteId; - SBBufD.Buffer = &MsgBuf; + memset(&SBBufD, 0, sizeof(SBBufD)); + SBBufD.MsgId = MsgId; + CFE_SB_TrackingListReset(&SBBufD.Link); + PipeDepth = 2; SETUP(CFE_SB_CreatePipe(&PipeId, PipeDepth, "TestPipe")); SETUP(CFE_SB_Subscribe(MsgId, PipeId)); RouteId = CFE_SBR_GetRouteId(MsgId); - ASSERT(CFE_SB_TransmitBufferFull(&SBBufD, RouteId, MsgId)); + /* No return from this function - it handles all errors */ + CFE_SB_BroadcastBufferToRoute(&SBBufD, RouteId); EVTCNT(2); TEARDOWN(CFE_SB_DeletePipe(PipeId)); -} /* end Test_TransmitBufferFull */ +} /* end Test_BroadcastBufferToRoute */ /* ** Test response to sending a message with the message size larger than allowed @@ -3520,31 +3534,54 @@ void Test_ReceiveBuffer_PendForever(void) void Test_CleanupApp_API(void) { CFE_SB_PipeId_t PipeId; - CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl = 0; - uint16 PipeDepth = 50; + CFE_SB_ZeroCopyHandle_t ZeroCpyBufHndl; + uint16 PipeDepth = 10; CFE_ES_AppId_t AppID; + CFE_ES_AppId_t AppID2; + + /* + * Reset global descriptor list + */ + CFE_SB_InitBuffers(); CFE_ES_GetAppID(&AppID); + AppID2 = CFE_ES_APPID_C(CFE_ResourceId_FromInteger(2)); SETUP(CFE_SB_CreatePipe(&PipeId, PipeDepth, "TestPipe")); - CFE_SB_ZeroCopyGetPtr(PipeDepth, &ZeroCpyBufHndl); - CFE_SB_ZeroCopyGetPtr(PipeDepth, &ZeroCpyBufHndl); + CFE_SB_ZeroCopyGetPtr(10, &ZeroCpyBufHndl); + + /* Mimic a different app ID getting a buffer */ + UT_SetAppID(AppID2); + CFE_SB_ZeroCopyGetPtr(10, &ZeroCpyBufHndl); + + /* Original app gets a second buffer */ + UT_SetAppID(AppID); + CFE_SB_ZeroCopyGetPtr(10, &ZeroCpyBufHndl); /* Set second application ID to provide complete branch path coverage */ CFE_SB_Global.PipeTbl[1].PipeId = SB_UT_PIPEID_1; CFE_SB_Global.PipeTbl[1].AppId = AppID; - ASSERT_TRUE(CFE_SB_Global.ZeroCopyTail != NULL); - /* Attempt with a bad application ID first in order to get full branch path * coverage in CFE_SB_ZeroCopyReleaseAppId */ CFE_SB_CleanUpApp(CFE_ES_APPID_UNDEFINED); + /* This should have freed no buffers */ + UtAssert_STUB_COUNT(CFE_ES_PutPoolBuf, 0); + /* Attempt again with a valid application ID */ CFE_SB_CleanUpApp(AppID); - ASSERT_TRUE(CFE_SB_Global.ZeroCopyTail == NULL); + /* This should have freed 2 out of the 3 buffers - + * the ones which were gotten by this app. */ + UtAssert_STUB_COUNT(CFE_ES_PutPoolBuf, 2); + + /* Clean up the second App */ + CFE_SB_CleanUpApp(AppID2); + + /* This should have freed the last buffer */ + UtAssert_STUB_COUNT(CFE_ES_PutPoolBuf, 3); EVTCNT(2); @@ -3843,13 +3880,17 @@ void Test_CFE_SB_Buffers(void) CFE_SB_Global.StatTlmMsg.Payload.MemInUse = 0; CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse = sizeof(CFE_SB_BufferD_t) * 4; - bd = CFE_SB_GetBufferFromPool(SB_UT_FIRST_VALID_MID, 0); + bd = CFE_SB_GetBufferFromPool(0); ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.PeakMemInUse, sizeof(CFE_SB_BufferD_t) * 4); EVTCNT(0); - ExpRtn = CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse; + /* + * If returning to the pool fails SB still isn't going to use the buffer anymore, + * so it shouldn't be tracked as "in use" - it is lost. + */ + ExpRtn = CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse - 1; UT_SetDeferredRetcode(UT_KEY(CFE_ES_PutPoolBuf), 1, -1); CFE_SB_ReturnBufferToPool(bd); ASSERT_EQ(CFE_SB_Global.StatTlmMsg.Payload.SBBuffersInUse, ExpRtn); diff --git a/fsw/cfe-core/unit-test/sb_UT.h b/fsw/cfe-core/unit-test/sb_UT.h index 2f09ea386..99dc04cf2 100644 --- a/fsw/cfe-core/unit-test/sb_UT.h +++ b/fsw/cfe-core/unit-test/sb_UT.h @@ -1936,6 +1936,21 @@ void Test_TransmitMsg_ZeroCopyGetPtr(void); ******************************************************************************/ void Test_TransmitBuffer_IncrementSeqCnt(void); +/*****************************************************************************/ +/** +** \brief Test the zero-copy handle validation performed by CFE_SB_ZeroCopyHandleValidate() +** +** \par Description +** Exercises the validation checks within this function +** +** \par Assumptions, External Events, and Notes: +** None +** +** \returns +** This function does not return a value. +******************************************************************************/ +void Test_TransmitMsg_ZeroCopyHandleValidate(void); + /*****************************************************************************/ /** ** \brief Test successfully sending a message in zero copy mode (telemetry @@ -1987,10 +2002,10 @@ void Test_TransmitMsg_DisabledDestination(void); /*****************************************************************************/ /** -** \brief Test successful CFE_SB_TransmitBufferFull +** \brief Test CFE_SB_BroadcastBufferToRoute ** ** \par Description -** This function tests successfully sending a message with the metadata. +** This function tests broadcasting a message buffer with the metadata. ** ** \par Assumptions, External Events, and Notes: ** None @@ -1998,7 +2013,7 @@ void Test_TransmitMsg_DisabledDestination(void); ** \returns ** This function does not return a value. ******************************************************************************/ -void Test_TransmitBufferFull(void); +void Test_BroadcastBufferToRoute(void); /*****************************************************************************/ /**