cancel
Showing results for 
Search instead for 
Did you mean: 

Issue with MQTT + altcp_tls – LwIP heap and TCP_SEG are not cleared after an unhandled error.

Oleg_Sokolov
Visitor

Hello.
I've been struggling for several weeks to fix an issue when sending data over MQTT using LwIP on an STM32F407 microcontroller.
For periodic message publishing, I'm using the following code:

void MqttPublishTask(void const * argument)
{
	err_t err;
	while(1)
	{
        xEventGroupWaitBits(mqttSendEventGroup,
                            pdFALSE,
                            pdTRUE,
                            portMAX_DELAY);

		if(mqtt_client_is_connected(&client))
		{
			cJSON *root = cJSON_CreateObject();

			for(uint8_t i = 0; i < (nodeID - 64); i++)
			{
				cJSON *tap_metrics = cJSON_CreateObject();
				cJSON_AddNumberToObject(tap_metrics, "ER", taps[i].alarmCode);
				cJSON_AddNumberToObject(tap_metrics, "C", taps[i].coldWater);
				cJSON_AddNumberToObject(tap_metrics, "H", taps[i].hotWater);
				cJSON_AddNumberToObject(tap_metrics, "E", taps[i].totalEnergy);
				cJSON_AddNumberToObject(tap_metrics, "T", taps[i].waterFlowTime);
				cJSON_AddNumberToObject(tap_metrics, "N", taps[i].numberOfUses);

				snprintf(serialString, sizeof(serialString), "%lu", taps[i].serial);
				cJSON_AddItemToObject(root, serialString, tap_metrics);
			}

			char *pub_payload = cJSON_PrintUnformatted(root);
			uint8_t qos = 0;
			uint8_t retain = 0;

			snprintf(serialString, sizeof(serialString), "%lu", OD_PERSIST_COMM.x1018_identity.serialNumber);

			err = mqtt_publish(&client, serialString, pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, 0);
			if(err != ERR_OK)
			{
			  printf("Publish err: %d\n", err);
			}

			cJSON_Delete(root);
			free(pub_payload);
		}
		osDelay(30000);
	}
}

To establish and check the connection with the broker and also for debugging purposes, I’m using the following task:

void MqttConnectTask(void const *argument)
{
	mbedtls_threading_set_alt( cmsis_os_mutex_init,
    							cmsis_os_mutex_free,
								cmsis_os_mutex_lock,
								cmsis_os_mutex_unlock );

    if (tls_config == NULL)
    {
        tls_config = altcp_tls_create_config_client(
            (const unsigned char *)mbedtls_root_certificate,
            sizeof(mbedtls_root_certificate));

        if (tls_config == NULL)
        {
            printf("Failed to create TLS config\n");
            vTaskSuspend(NULL);
        }
    }
	ip_addr_t mqtt_server_ip;

    while (netconn_gethostbyname("my.mqtt.broker.es", &mqtt_server_ip) != ERR_OK)
    {
        printf("MQTT Server DNS resolve failed\n");
        osDelay(5000);
    }

    memset(&ci, 0, sizeof(ci));
    snprintf(serialString, sizeof(serialString), "%ldc", OD_PERSIST_COMM.x1018_identity.serialNumber);
    ci.client_id  = serialString;
    ci.client_user = "username";
    ci.client_pass = "password";
    ci.keep_alive = 14;
    ci.tls_config = tls_config;

    while (1)
    {
    	if (!mqtt_client_is_connected(&client) && netif_is_link_up(&gnetif))
    	{
    		xEventGroupClearBits(mqttSendEventGroup, MQTT_SEND_EVENT_RUN_BIT);
	        mqtt_disconnect(&client);

    		if (client.conn != NULL)
    	    {
    	        altcp_abort(client.conn);
    	        client.conn = NULL;
    	    }
    	    memset(&client, 0, sizeof(client));
    	    osDelay (100);

    	    printf("Attempting MQTT connect...\n");

    	    err_t err = mqtt_client_connect(&client, &mqtt_server_ip, 8883, mqtt_connection_cb, 0, &ci);

    	    if (err != ERR_OK)
    	    {
    	        printf("mqtt_connect return %d\n", err);

    	        if (client.conn != NULL)
    	        {
    	        	altcp_abort(client.conn);
    	        	client.conn = NULL;
    	        }
    	        memset(&client, 0, sizeof(client));
    	    }
    	}
    	else if (!netif_is_link_up(&gnetif))
    	{
    		xEventGroupClearBits(mqttSendEventGroup, MQTT_SEND_EVENT_RUN_BIT);
	        mqtt_disconnect(&client);

    		if (client.conn != NULL)
    	    {
    	        altcp_abort(client.conn);
    	        client.conn = NULL;
    	    }
    	    memset(&client, 0, sizeof(client));
    	    osDelay (100);
    	}
    	else
    	{
    		xEventGroupSetBits(mqttSendEventGroup, MQTT_SEND_EVENT_RUN_BIT);
    	}

        // LwIP heap (MEM_SIZE)
        	    printf("LwIP heap: used: %u / %u bytes (max used: %u bytes)\n",
        	           lwip_stats.mem.used,
        	           lwip_stats.mem.avail,
        	           lwip_stats.mem.max);

        	    // FreeRTOS heap
        	    printf("FreeRTOS heap: current free: %u bytes, minimum ever free: %u bytes\n",
        	           xPortGetFreeHeapSize(),
        	           xPortGetMinimumEverFreeHeapSize());

        	    printf("LwIP pools (current / max / avail):\n");

        	    printf("  PBUF        : %u / %u / %u\n",
        	           lwip_stats.memp[MEMP_PBUF]->used,
        	           lwip_stats.memp[MEMP_PBUF]->max,
        	           lwip_stats.memp[MEMP_PBUF]->avail);

        	    printf("  TCP_PCB     : %u / %u / %u\n",
        	           lwip_stats.memp[MEMP_TCP_PCB]->used,
        	           lwip_stats.memp[MEMP_TCP_PCB]->max,
        	           lwip_stats.memp[MEMP_TCP_PCB]->avail);

        	    printf("  TCP_SEG     : %u / %u / %u\n",
        	           lwip_stats.memp[MEMP_TCP_SEG]->used,
        	           lwip_stats.memp[MEMP_TCP_SEG]->max,
        	           lwip_stats.memp[MEMP_TCP_SEG]->avail);

        	    printf("  ALTCP_PCB   : %u / %u / %u\n",
        	           lwip_stats.memp[MEMP_ALTCP_PCB]->used,
        	           lwip_stats.memp[MEMP_ALTCP_PCB]->max,
        	           lwip_stats.memp[MEMP_ALTCP_PCB]->avail);

                UBaseType_t stackLeft = uxTaskGetStackHighWaterMark(NULL);
                printf("Free stack space MqttConnectTask: %lu words (%lu bytes)\n", stackLeft, stackLeft * sizeof(StackType_t));


        osDelay(5000);
    }

}

 Everything works perfectly, but once every few hours the following appears in the log:

LwIP heap: used: 19708 / 36864 bytes (max used: 24452 bytes)
FreeRTOS heap: current free: 12672 bytes, minimum ever free: 10424 bytes
LwIP pools (current / max / avail):
  PBUF        : 0 / 0 / 16
  TCP_PCB     : 1 / 2 / 5
  TCP_SEG     : 0 / 3 / 16
  ALTCP_PCB   : 2 / 2 / 5
Free stack space MqttConnectTask: 652 words (2608 bytes)

Assertion "unhandled error" failed at line 1079 in ../Middlewares/Third_Party/LwIP/src/apps/altcp_tls/altcp_tls_mbedtls.c

LwIP heap: used: 22300 / 36864 bytes (max used: 24452 bytes)
FreeRTOS heap: current free: 12672 bytes, minimum ever free: 10424 bytes
LwIP pools (current / max / avail):
  PBUF        : 0 / 0 / 16
  TCP_PCB     : 1 / 2 / 5
  TCP_SEG     : 4 / 5 / 16
  ALTCP_PCB   : 2 / 2 / 5
Free stack space MqttConnectTask: 652 words (2608 bytes)

That is, before the message "Assertion "unhandled error" failed at line 1079" appears, there are 0 TCP_SEG in use and 19708 bytes used from the LwIP heap.
After the error, I see 2 TCP_SEG in use and 22300 bytes used from the heap — and this memory is never freed.

After 5, 10, or 20 hours, the same error happens again, and these values increase further. This continues until all available TCP_SEG or heap memory is exhausted, at which point the task simply hangs and stops the data transfer.

At line 1079 in altcp_tls_mbedtls.c, there's only this line:

 
LWIP_ASSERT("unhandled error", 0);
return ERR_VAL;

This error is not handled by LwIP in any way, so the memory is not freed, leading to a memory leak.

I tried disconnecting and re-establishing the connection, replacing the lines above with the following:

LWIP_DEBUGF(ALTCP_MBEDTLS_DEBUG, ("mbedtls_ssl_write failed: %d\n", ret));
if (conn) 
{ 
altcp_abort(conn); 
} 
return ERR_ABRT;
—but it had no effect.

Has anyone encountered a similar issue?
There are many reports about MQTT + mbedTLS hanging after some time, but I haven’t found any working solution yet.

0 REPLIES 0