cancel
Showing results for 
Search instead for 
Did you mean: 

Error creating queues with threadX

massimoperdigo
Associate III

Hello,

I’m working on an embedded project using ThreadX, and I’ve encountered what appears to be a queue memory overflow issue.

I’ve created two queues: one for data messages and one for events. Both are configured to hold 100 messages of the following structure:

typedef struct {
    ENGINE_ID origin;
    ENGINE_ID destination;
    EVENTS objectEvent;
    CMDVALUES command;
} INTERNAL_QUEUE_OBJECT;

All fields are enums or uint32_t values (implicitly 4 bytes each), so the structure size is 16 bytes. However, the compiler optimise it to 4 bytes total.

here, I have the queue configuration (everything align with ULONGS):

#define QUEUE_SIZE 100
#define QUEUE_MESSAGE_SIZE sizeof(INTERNAL_QUEUE_OBJECT)
#define QUEUE_MESSAGE_SIZE_ULONGS ((QUEUE_MESSAGE_SIZE + sizeof(ULONG) - 1) / sizeof(ULONG))

TX_QUEUE dataQueueHandle = {0};
TX_QUEUE eventQueueHandle = {0};

ULONG dataQueueBuffer[QUEUE_SIZE * QUEUE_MESSAGE_SIZE_ULONGS];
ULONG eventQueueBuffer[QUEUE_SIZE * QUEUE_MESSAGE_SIZE_ULONGS];

tx_queue_create(&dataQueueHandle, "Data Queue", QUEUE_MESSAGE_SIZE_ULONGS, dataQueueBuffer, sizeof(dataQueueBuffer));
tx_queue_create(&eventQueueHandle, "Event Queue", QUEUE_MESSAGE_SIZE_ULONGS, eventQueueBuffer, sizeof(eventQueueBuffer));

 Additionally, I am using these wrapper functions to send and receive:

/**
 * @brief Function for handling the sending of data to a queue.
 * 
 * @PAram hQueue handler of the queue
 * @PAram origin  Engine ID who sends the data
 * @PAram destination  Engine ID who will recieve the data
 * @PAram command  data
 * @return EMBL_RETCODES 
 */
EMBL_RETCODES sendQueueData(TX_QUEUE* hQueue, ENGINE_ID origin, ENGINE_ID destination, CMDVALUES command)
{
  INTERNAL_QUEUE_OBJECT ObjectSend = {0};
  UINT errCode = TX_SUCCESS;
  ULONG spaceLeft;

  tx_queue_info_get(hQueue, NULL, NULL, &spaceLeft, NULL, NULL, NULL);
  if(spaceLeft == 0)
  {
      SEGGER_RTT_printf(0, "Queue is full\n");
      return EMBL_NOMEMORY;
  }
  ObjectSend.origin = origin;
  ObjectSend.destination = destination;
  ObjectSend.command = command;
  ObjectSend.objectEvent = EVENT_NONE;

  errCode = tx_queue_send(hQueue, (void *)&ObjectSend, TICKSTOWAIT);
  
  if(errCode != TX_SUCCESS)
  {
      SEGGER_RTT_printf(0, "Queue: Error putting data to queue\n");
      return EMBL_ERROR;
  }
  return EMBL_OK;
}
 
/**
 * @brief Function for handling the reception of data from a queue.
 * 
 * @PAram hQueue handler of the queue
 * @PAram whoAmI_ENGINE  Engine ID who is receiving the data
 * @PAram pCommand  data
 * @return EMBL_RETCODES 
 */ 
EMBL_RETCODES receiveQueueData(TX_QUEUE* hQueue, ENGINE_ID whoAmI_ENGINE, CMDVALUES* pCommand)
{
  INTERNAL_QUEUE_OBJECT objectReceive = {0};
  UINT errCode = TX_SUCCESS;
  ULONG eventCount = 0;
  CMDVALUES Cmd = CMD_NACK;

  if(pCommand == NULL )
  {
    SEGGER_RTT_printf(0, "Queue: pCommand is NULL\n");
    return EMBL_ERROR;
  }
  
  tx_queue_info_get(hQueue, NULL, &eventCount, NULL, NULL, NULL, NULL);
  if (eventCount == 0)
  {
    //SEGGER_RTT_printf(0, "Queue: Queue is empty\n");
    return EMBL_EMPTY;
  } else if (eventCount == QUEUE_DATA_SIZE){
    SEGGER_RTT_printf(0, "Queue: Queue is full\n");
    return EMBL_NOMEMORY;
  }
  else
  {
    SEGGER_RTT_printf(0, "Queue data: Queue has items\n");
    errCode = tx_queue_receive(hQueue, (void *)&objectReceive, TICKSTOWAIT);
    if((errCode == TX_SUCCESS) && (objectReceive.destination == whoAmI_ENGINE))
    {
      SEGGER_RTT_printf(0," Received data %d  from Task%d\n",objectReceive.command,objectReceive.origin);
      Cmd = objectReceive.command;

      *pCommand  = Cmd;
      return EMBL_OK;

    }
    else if (objectReceive.destination != whoAmI_ENGINE)
    {
      tx_queue_front_send(hQueue, (void *)&objectReceive, TICKSTOWAIT);
    }
    else
    {
      SEGGER_RTT_printf(0, "Queue: Error receiving data to queue\n");
      return EMBL_ERROR;
    }
  }
  return EMBL_OK;
}​


As you can see, I create a temporary variable to populate the data, and then pass it to the queue. As I know, is that the structure is copied internally by ThreadX, and a different thread will later receive that copy.

However, during debugging (with Ozone), I noticed that the dataQueueBuffer fills up correctly, but it never seems to free any slots, even though another thread is consuming data from the queue using tx_queue_receive().

Eventually, when the queue reaches its capacity, the system starts writing beyond the bounds of the statically allocated dataQueueBuffer, into unrelated memory, and not concatenated to the dataQueueBuffer. The same happens with the other queue.

Do you know what may be happening? 

Thank you for your time,

 

2 REPLIES 2
Saket_Om
ST Employee

Hello @massimoperdigo 

Please refer to the example Tx_Thread_MsgQueue below:

STM32CubeH5/Projects/NUCLEO-H563ZI/Applications/ThreadX/Tx_Thread_MsgQueue at main · STMicroelectronics/STM32CubeH5 · GitHub

To give better visibility on the answered topics, please click on "Accept as Solution" on the reply which solved your issue or answered your question.
Saket_Om

Hi Saket,

I’ve already reviewed that example.

However, in my case, I’m creating the queues statically. Everything seems to work correctly at first, but when I inspect the statically allocated buffer, I notice that the data is not erased after messages are received. Instead, the buffer appears to be continuously filled, even though the queue API reports no messages remaining after a receive operation.

Shouldn’t the statically allocated buffer be cleared automatically as messages are consumed?

Additionally, could you please provide some feedback on how queue sizing is handled in ThreadX? I’d like to know if my approach is correct.

Thanks in advance.