cancel
Showing results for 
Search instead for 
Did you mean: 

FreeRtos memory management in interrupt routine

Posted on July 28, 2015 at 08:48

Hello there,

I have a question regaring HAL interrupt routine and freeRtos ST layer. Here is my CAN RX routine.

void HAL_CAN_RxCpltCallback(CAN_HandleTypeDef* CanHandle)

{

    osMessagePut(g_canRxMessageQueue, CanHandle->pRxMsg, 0);

    /* Receive */

    if(HAL_CAN_Receive_IT(CanHandle, CAN_FIFO0) != HAL_OK)

    {

        /* Reception Error */

        return;

    }

}

I insert a pointer to a to a previously defined CanRxMsgTypeDef struct. I then retrieve it in another task:

void task_CanRxQueue(void const * argument)

{

    osEvent status;

    while(1)

    {

        status = osMessageGet(g_canRxMessageQueue, osWaitForever);

        CanRxMsgTypeDef* msgRcv = (CanRxMsgTypeDef*)status.value.p;

    }

}

But this way is not good, as my queue is then really 1 item lenght and new frames comming will overwrite it (all items in queue will point to the same object).

My question is, is it safe and efficient to:

use malloc in the interrupt routine to create a new CanRxMsgTypeDef object,

copy content to it,

put to queue,

deque in another task,

free.

Is there a better way of achieving CAN msg queueing?

#stm32 #can #rtos
9 REPLIES 9
jpeacock
Associate II
Posted on July 28, 2015 at 13:56

Since you are using FreeRTOS create a receive queue when the CAN driver is initialized.  On a CAN RX interrupt write the message (small, only 16 bytes) to the queue.  You can merge both CAN FIFOs to one queue; if you need prioritization write the message to the front of the queue for the high priority FIFO.

Your CAN protocol stack task blocks with a read to the same queue.  The task wakes up when a message arrives.  After processing the message the task blocks with another read to the queue. 

This decouples the CAN protocol stack from incoming message arrival frequency.  Since the queue is a static allocation you don't have to work about memory.  Calling malloc inside an interrupt is generally a (very) bad practice because allocating memory is not a deterministic process.  That is, if you require garbage collection during the malloc your interrupt response time is unpredictable and may cause data loss.

This is how I implemented CANopen on an F4.  Look at the timer service too, it has a lot of nice feature you'll need for timed CAN services like heartbeats and safety objects. 

You can use the same design for outgoing messages too.  Write outgoing CAN messages to a queue which is emptied when CAN TX interrupts occur.  It's a little more complicated because you have the edge case where you have to prime the TX interrupt if there's no prior data pending.

FreeRTOS is designed to do exactly what you are asking so make use of the features.  The queue mechanism is fast and very reliable.

  Jack Peacock

Posted on July 28, 2015 at 14:03

Thank you for answer. I think we havent understood each other. I am using the freeRTOS queue mechanism. The question is, how can I store can messages in it, without using malloc? Lets say 2 messages arrive before I will be able to ready any of them. Without malloc, every next message will overwrite the previous one. And I need the whole CanRxMsgTypeDef struct in the queue.

Also the STM rtos layer seems to allow me only to have 32 bit data stored in size. If its bigger, I need to use pointers.

This is how I got it configured now:

inserting to the queque in the interrupt routine:

void HAL_CAN_RxCpltCallback(CAN_HandleTypeDef* CanHandle)

{

    CanRxMsgTypeDef* item = malloc(sizeof(CanRxMsgTypeDef));

    *item = *CanHandle->pRxMsg; // CanHandle is overwritten each routine call

    if (osEventMessage != osMessagePut(g_canRxMessageQueue, item, 0))

    {

        // error msg;

        asm(''NOP'');

    }

    /* Receive */

    if(HAL_CAN_Receive_IT(CanHandle, CAN_FIFO0) != HAL_OK)

    {

        /* Reception Error */

        return;

    }

}

Taking from the queue (another task):

void task_CanRxQueue(void const * argument)

{

    osEvent retEvent;

    while(1)

    {

        retEvent = osMessageGet(g_canRxMessageQueue, osWaitForever);

        if (retEvent.status == osEventMessage)

        {

            CanRxMsgTypeDef* msgRcv = (CanRxMsgTypeDef*)retEvent.value.p;

            uint32_t opCode = can_getOpCode(msgRcv->ExtId);

            // content

            free(msgRcv);

        }

        else

        {

            // error message

        }

    }

}

jpeacock
Associate II
Posted on July 28, 2015 at 17:15

In FreeRTOS when you create a queue you specify the message size and number of messages buffered in the queue. You don't need malloc, FreeRTOS handles memory allocation in the task space.

From the API:

QueueHandle_t xQueueCreate 
( 
UBaseType_t uxQueueLength, 
UBaseType_t uxItemSize 
); 
Creates a new queue instance. This allocates the storage required by the new queue and returns a handle for the queue. 
Parameters: 
uxQueueLength The maximum number of items that the queue can contain. 
uxItemSize The number of bytes each item in the queue will require. Items are queued by copy, not by reference, so this is the number of bytes that will be copied for each posted item. Each item on the queue must be the same size. 

How do you create the CAN RX queue? Data is actually copied into the queue buffer, not passed as a reference. Jack Peacock
Posted on July 28, 2015 at 17:56

Everything you say is correct, but you are reffering to bare freeRTOS library, not the one provided with STM32 HAL lib (that one has one more level of ''abstraction''):

For example, this is the function responsible for inserting to queue. Please see that I am restricted to 4 bytes item in size. It can be either an int variable, or a pointer really:

/**

* @brief Put a Message to a Queue.

* @param  queue_id  message queue ID obtained with \ref osMessageCreate.

* @param  info      message information.

* @param  millisec  timeout value or 0 in case of no time-out.

* @retval status code that indicates the execution status of the function.

* @note   MUST REMAIN UNCHANGED: \b osMessagePut shall be consistent in every CMSIS-RTOS.

*/

osStatus osMessagePut (osMessageQId queue_id, uint32_t info, uint32_t millisec)

{

  portBASE_TYPE taskWoken = pdFALSE;

  TickType_t ticks;

 

  ticks = millisec / portTICK_PERIOD_MS;

  if (ticks == 0) {

    ticks = 1;

  }

 

  if (inHandlerMode()) {

    if (xQueueSendFromISR(queue_id, &info, &taskWoken) != pdTRUE) {

      return osErrorOS;

    }

    portEND_SWITCHING_ISR(taskWoken);

  }

  else {

    if (xQueueSend(queue_id, &info, ticks) != pdTRUE) {

      return osErrorOS;

    }

  }

 

  return osOK;

}

Could you please reffer? Do I understand it correctly?

jpeacock
Associate II
Posted on July 28, 2015 at 19:23

Apparently this is far more complex using the HAL.  Create a memory pool of fixed length messages.  Create the message queue with the same number of entries.  You will have to manage the pool yourself, posting the address of the pool entry to the queue after copying the RX message into the pool entry.  Treat the memory pool as a FIFO with an insert and remove set of pointers.  Insert in the RX interrupt handler, remove at the protocol stack task.  Lot of duplication of effort by not directly calling FreeRTOS APIs.

From what I see the HAL (actually CMSIS-RTOS) only uses 32 bit values for queues, a good argument not to use the HAL.  Not ST's problem, ARM specifies the CMSIS specs based on their RTX OS.  Maybe RTX doesn't support variable length queue messages?

  Jack Peacock

Posted on July 28, 2015 at 22:24

When I read the rtos manual, they said there that for larger data structs pointers should be stored in queue instead od data structs itself. That makes sense. The can rx type is a big data struct.

I understand it in the way that the cmsis layer is forcing user to use pointers if the data struct for an item is bigger than 4 bytes. Dont you think?

Before i used freeRtos I was using queueing the way you have described it- I had a fixed lenght messages pool. I have thought of doing it this way again with freeRTOS, but that would mean double queueing at some point. Thats why I hoped that malloc would be well optimised.
Barry.Richard
Associate III
Posted on July 29, 2015 at 19:27

First - I would definitely not recommend using a malloc() in an interrupt, especially if its using the C library malloc() rather than one of the

http://www.freertos.org/a00111.html

.  Even when using a FreeRTOS malloc the call to xTaskResumeAll() contained within the implementation is not really interrupt safe, but probably ok on a Cortex-M.

I'm a FreeRTOS guy, so am bias, but generally the FreeRTOS API is designed to be easy to use, and in my humble opinion you are seeing how that works in practical scenarios.

In this case, personally I would not use a queue at all, but write the incoming messages to a thread safe circular buffer.  The StreamBuffer implementation used in the

http://www.freertos.org/FreeRTOS-Plus/FreeRTOS_Plus_TCP/index.html

TCP/IP stack is an example of such a buffer.  Once the data is written to the circular RAM buffer (which is fast to do) you can unblock a task using

http://www.freertos.org/RTOS-task-notifications.html

- which is also fast to do and has practically no RAM overhead.

If the messages are of fixed length, then all the easier.  If the messages are of differing length then write the message length to the circular buffer before the actual message.

Hope that helps, although it is slightly adjacent to your direct question.

jpeacock
Associate II
Posted on July 29, 2015 at 19:52

A circular buffer only works if the messages are processed in sequence.  Unlike TCP/IP some CAN stacks have a dual track message path where a higher priority message (often safety related) arriving after a low priority is processed out of order.  With a single bus the STM32 has a dual CAN FIFO to handle this, and if redundant buses are used (as in dual CAN STM32s) the complexity expands to four FIFOs (but still two priority levels depending on how redundancy is handled).  This is fundamentally different than handling TCP/IP because messages are always routed to listeners in order of arrival, and if there are two ports they can be interleaved by the port mapper.

Being able to insert at the head of the queue ensure the high priority message is processed first, regardless of arrival order.  An alternative would be a circular queue for each priority level but that increases overhead to determine which queue has to be serviced first.

  Jack Peacock
Posted on July 30, 2015 at 08:33

I have removed the dynamic allocation and created a static array for allocating incomming messages, that has the same size of my queue. Thank you for hints.

What is the actual profit from getting rid of queue and replacing it with the circullar buffer you described? I mean, I kind of am using circullar buffer now- thats my static memory array. I put items in there and their pointers are placed in to the queue.