cancel
Showing results for 
Search instead for 
Did you mean: 

STM32 MQTT

WilliamVR
Associate III

Hi, 

I'm using a STM32F407VGTx, trying to use MQTT on this microcontroller.


I first made a test program on a NUCLEO dev kit (also a F4 controller) and it worked.
Now I get an error and it doesn't want to connect to my PC which is the broker. I have tried the following things:

- I have made sure my firewall doesn't block port 1883 en 8883.

- I can ping the controller.

- I made an extra init flag to make sure that the init code runs first before trying to subscribe or connect to anything.

 

I got the following error first: ERR_RTE = -4 which is a routing problem.

But his has changed and now I get:

Assertion "mqtt_sub_unsub: client != NULL" failed at line 1183 in ../Middlewares/Third_Party/LwIP/src/apps/mqtt/mqtt.c

(so I'm not sure if the previous error is fixed)

 

I'm not immediately sure what the problem can be. For more context i'm using freertos and have given the task a stack size of 256 words. I have included the code from the task. Anyone have any ideas?

 

Kind regards, 

William

1 ACCEPTED SOLUTION

Accepted Solutions

Well, I'm not sure, but I think that the MQTT software that comes with lwip in cubeIDE is not thread save and should not be used in combination with freertos. My time for working on this project is up, but what I think a solution can be is: switch to coreMQTT and the necessary dependencies. 

 

I don't know if anyone from ST can confirm this.

View solution in original post

6 REPLIES 6
Guillaume K
ST Employee

Hello

In lwip initialisation, after adding a network interface with netif_add(), do you set it as default interface ?

netif_set_default(&netif);

 

Hi I use the default generated code the netif_set_default is present.

void MX_LWIP_Init(void)
{
  /* IP addresses initialization */
  IP_ADDRESS[0] = 192;
  IP_ADDRESS[1] = 168;
  IP_ADDRESS[2] = 0;
  IP_ADDRESS[3] = 2;
  NETMASK_ADDRESS[0] = 255;
  NETMASK_ADDRESS[1] = 255;
  NETMASK_ADDRESS[2] = 255;
  NETMASK_ADDRESS[3] = 0;
  GATEWAY_ADDRESS[0] = 192;
  GATEWAY_ADDRESS[1] = 168;
  GATEWAY_ADDRESS[2] = 0;
  GATEWAY_ADDRESS[3] = 1;

/* USER CODE BEGIN IP_ADDRESSES */
/* USER CODE END IP_ADDRESSES */

  /* Initialize the LwIP stack with RTOS */
  tcpip_init( NULL, NULL );

  /* IP addresses initialization without DHCP (IPv4) */
  IP4_ADDR(&ipaddr, IP_ADDRESS[0], IP_ADDRESS[1], IP_ADDRESS[2], IP_ADDRESS[3]);
  IP4_ADDR(&netmask, NETMASK_ADDRESS[0], NETMASK_ADDRESS[1] , NETMASK_ADDRESS[2], NETMASK_ADDRESS[3]);
  IP4_ADDR(&gw, GATEWAY_ADDRESS[0], GATEWAY_ADDRESS[1], GATEWAY_ADDRESS[2], GATEWAY_ADDRESS[3]);

  /* add the network interface (IPv4/IPv6) with RTOS */
  netif_add(&gnetif, &ipaddr, &netmask, &gw, NULL, &ethernetif_init, &tcpip_input);

  /* Registers the default network interface */
  netif_set_default(&gnetif); //<- here

  /* We must always bring the network interface up connection or not... */
  netif_set_up(&gnetif);

  /* Set the link callback function, this function is called on change of link status*/
  netif_set_link_callback(&gnetif, ethernet_link_status_updated);

  /* Create the Ethernet link handler thread */
/* USER CODE BEGIN H7_OS_THREAD_DEF_CREATE_CMSIS_RTOS_V1 */
  osThreadDef(EthLink, ethernet_link_thread, osPriorityBelowNormal, 0, configMINIMAL_STACK_SIZE *2);
  osThreadCreate (osThread(EthLink), &gnetif);
/* USER CODE END H7_OS_THREAD_DEF_CREATE_CMSIS_RTOS_V1 */

/* USER CODE BEGIN 3 */

/* USER CODE END 3 */
}

 

WilliamVR
Associate III

I havent fixed to problem but what I suspect is that the freertos task doesn't get enougf memory allocated. I have seen it subscribe without any errors when I commented a large piece of code from another task. I will investigate it further and leave my findings here.

WilliamVR
Associate III

oké,

I have gotten some things to work. The problem was that my task stack size wasn't big enough. I couldn't make this bigger because I put my TOTAL_HEAP_SIZE to big. The current problem I still face is that I can publish data but not receive data.

If anyone sees this this is my code atm:

 

/*
 * MQTT_Task.c
 *
 *  Created on: Mar 21, 2024
 *      Author: William Van Raemdonck
 */
#include "main.h"
#include "./TasksInc/MQTT_Task.h"
#include "./TasksInc/IO_Task.h"
#include "./configIRIS.h"
#include "lwip/apps/mqtt.h"
#include "lwip.h"
#include "lwip/ip.h"
#include "lwip/ip_addr.h"
#include "string.h"
#include "stm32f4xx_hal.h"


char Topic[] = "/IRIS";

// Digital Inputs
const char* digitalInputs[] = {
		"/digital/input/digitalInput_0",
		"/digital/input/digitalInput_1",
		"/digital/input/digitalInput_2",
		"/digital/input/digitalInput_3",
		"/digital/input/digitalInput_4",
		"/digital/input/digitalInput_5",
		"/digital/input/digitalInput_6",
		"/digital/input/digitalInput_7",
		"/digital/input/digitalInput_8",
		"/digital/input/digitalInput_9",
		"/digital/input/digitalInput_10",
		"/digital/input/digitalInput_11",
		"/digital/input/digitalInput_12",
		"/digital/input/digitalInput_13",
		"/digital/input/digitalInput_14",
		"/digital/input/digitalInput_15",
		"/digital/input/digitalInput_16",
		"/digital/input/digitalInput_17",
		"/digital/input/digitalInput_18",
		"/digital/input/digitalInput_19",
		"/digital/input/digitalInput_20",
		"/digital/input/digitalInput_21",
		"/digital/input/digitalInput_22",
		"/digital/input/digitalInput_23"
};

// Analog Inputs
const char* analogInputs[] = {
		"/analogue/input/adc_0-50V_0",
		"/analogue/input/adc_0-50V_1",
		"/analogue/input/adc_0-50V_2",
		"/analogue/input/adc_0-50V_3",
		"/analogue/input/adc_0-50V_4",
		"/analogue/input/adc_0-50V_5",
		"/analogue/input/adc_0-50V_6",
		"/analogue/input/adc_0-50V_7"
};

// Digital Outputs
const char* digitalOutputsHighSide[] = {
		"/digital/output/highside_0",
		"/digital/output/highside_1",
		"/digital/output/highside_2",
		"/digital/output/highside_3",
		"/digital/output/highside_4",
		"/digital/output/highside_5",
		"/digital/output/highside_6",
		"/digital/output/highside_7"
};

const char* digitalOutputsOpenCollector[] = {
		"/digital/output/oc_0",
		"/digital/output/oc_1",
		"/digital/output/oc_2",
		"/digital/output/oc_3",
		"/digital/output/oc_4",
		"/digital/output/oc_5",
		"/digital/output/oc_6",
		"/digital/output/oc_7"
};

const char* digitalOutputsRelais[] = {
		"/digital/output/relais_0",
		"/digital/output/relais_1",
		"/digital/output/relais_2",
		"/digital/output/relais_3",
		"/digital/output/relais_4",
		"/digital/output/relais_5",
		"/digital/output/relais_6",
		"/digital/output/relais_7"
};

mqtt_client_t *client;

extern osMutexId UART_MutexHandle;

ip4_addr_t ipAddrBroker;
uint8_t IP_ADDRESS_BROKER[4];
char MQTTusername[64];
char MQTTpassword[64];
char MQTTid[64];

char debugMQTT[128];

char* receivedMqttCommand;
char receivedDA[20];
char receivedIO[20];
char receivedIndex[20];

extern osMessageQId IO_OUTPUT_PIN_QUEUEHandle;
extern osMessageQId IO_OUTPUT_DATA_QUEUEHandle;

//CONFIG struct
extern IRISConfigstr configIRIS;

int initFlagMQTT = 0;

void MQTT_Init(void){
	//TODO
	IP_ADDRESS_BROKER[0] = 192;	//configIRIS.MQTTbroker[0]
	IP_ADDRESS_BROKER[1] = 168;	//configIRIS.MQTTbroker[1]
	IP_ADDRESS_BROKER[2] = 0;	//configIRIS.MQTTbroker[2]
	IP_ADDRESS_BROKER[3] = 1;	//configIRIS.MQTTbroker[3]

	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);

	client = mqtt_client_new();

	osPrintf("MQTT", "new cient made");

	if(client != NULL) {
		example_do_connect(client);
	}

	initFlagMQTT = 1;
}

void MQTT_Subscribe(void){
	char fullTopic[50] = "";

	osPrintf("MQTT", "subscribed to topics:");

	//	SUBSCRIBE to all available outputs
	//	OC
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsOpenCollector[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsOpenCollector[i]);
	}

	//	HSD
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsHighSide[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsHighSide[i]);

	}

	//	REL
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsRelais[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsRelais[i]);

	}
}

void example_do_connect(mqtt_client_t *client)
{
	struct mqtt_connect_client_info_t ci;
	err_t err;

	/* Setup an empty client info structure */
	memset(&ci, 0, sizeof(ci));

	/* Minimal amount of information required is client identifier, so set it here */
	ci.client_id = "IRIS";
	ci.client_user = "username";
	ci.client_pass = "password";

	strcpy(MQTTusername, ci.client_user);
	strcpy(MQTTpassword, ci.client_pass);
	strcpy(MQTTid, ci.client_id);

	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);

	/* Initiate client and connect to server, if this fails immediately an error code is returned
     otherwise mqtt_connection_cb will be called with connection result after attempting
     to establish a connection with the server.
     For now MQTT version 3.1.1 is always used */

	err = mqtt_client_connect(client, &ipAddrBroker, 1883, mqtt_connection_cb, 0, &ci);

	/* For now just print the result code if something goes wrong */
	if(err != ERR_OK) {
		osPrintf("MQTT", "mqtt_connect return: ");
		itoa(err, debugMQTT, 10);
		osPrintf("MQTT", debugMQTT);
	}
}

static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
	err_t err;
	if(status == MQTT_CONNECT_ACCEPTED) {
		osPrintf("MQTT", "mqtt_connection_cb: Successfully connected\r\n");

		/* Setup callback for incoming publish requests */
		mqtt_set_inpub_callback(client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, arg);

		/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
		err = mqtt_subscribe(client, "subtopic", 1, mqtt_sub_request_cb, arg);

		//subscribe to own topics
		MQTT_Subscribe();

		if(err != ERR_OK) {
			snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_subscribe return: %d", err);
			osPrintf("MQTT", debugMQTT);
		}
	}
	else
	{
		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_connection_cb: Disconnected, reason: %d", status);
		osPrintf("MQTT", debugMQTT);

		/* Its more nice to be connected, so try to reconnect */
		example_do_connect(client);
	}
}

static void mqtt_sub_request_cb(void *arg, err_t result)
{
	/* Just print the result code here for simplicity,
     normal behaviour would be to take some action if subscribe fails like
     notifying user, retry subscribe or disconnect from server */

	snprintf(debugMQTT, sizeof(debugMQTT), "Subscribe result: %d (0 = ERR_OK)", result);
	osPrintf("MQTT", debugMQTT);
}


//-----------------------------------------------------------------------------------
//	INCOMIMING


static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
	char inpub_type[20];
	int inpub_pin = 0;
	int index = 0;
	char *parts[4]; // Array to store extracted parts

	//DEBUG
	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish at topic: %s", topic);
	osPrintf("MQTT", debugMQTT);

	//extract pin number and type
	receivedMqttCommand = strtok(topic, "/");
	while (receivedMqttCommand != NULL && index < 4) {
		parts[index++] = receivedMqttCommand;
		receivedMqttCommand = strtok(NULL, "/");
	}
	sscanf(parts[3], "%[^_]_%d", inpub_type, &inpub_pin);


	if(strcmp(parts[0], &Topic[1]) != 0){
		osPrintf("debug", "received MQTT packet not for this device or wrong topic");
	}
	else{
		snprintf(debugMQTT, sizeof(debugMQTT), "type: %s, pin: %d", inpub_type, inpub_pin);
		osPrintf("MQTT", debugMQTT);

		uint8_t offset = 0;
		if(strcmp(inpub_type, "highside") == 0){
			offset = HIGHSIDE_0;
		}
		else if(strcmp(inpub_type, "oc") == 0){
			offset = OPEN_COLLECTOR_0;
		}
		else if(strcmp(inpub_type, "relay") == 0){
			offset = RELAY_0;
		}
		else{
			osPrintf("ERROR", "MQTT - Not a valid topic");
		}

		osMessagePut(IO_OUTPUT_PIN_QUEUEHandle, offset + inpub_pin, HAL_MAX_DELAY);

		//osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, inpub_type, HAL_MAX_DELAY);
	}
}

static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish payload with length: %d with flags: %d", len, (unsigned int)flags);
	osPrintf("MQTT", debugMQTT);

	if(flags & MQTT_DATA_FLAG_LAST) {
		/* Last fragment of payload received (or whole part if payload fits receive buffer
       See MQTT_VAR_HEADER_BUFFER_LEN)  */
		uint8_t dataNew = 0;


		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_incoming_data_cb: %s", (char*)data);
		osPrintf("MQTT", debugMQTT);

		dataNew = atoi((char*)data);
		osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, dataNew, HAL_MAX_DELAY);

	} else {
		/* Handle fragmented payload, store in buffer, write to file or whatever */
	}
}

void example_publish(mqtt_client_t *client, void *arg, const char* subTopic)
{
	const char *pub_payload= "1";	//tmp
	err_t err;
	u8_t qos = 2; /* 0 1 or 2, see MQTT specification */
	u8_t retain = 0; /* No don't retain such crappy payload... */

	char fullTopic[50] = "";
	strcat(fullTopic, Topic);
	strcat(fullTopic, subTopic);

	snprintf(fullTopic, 50, "%s%s", Topic, subTopic);


	err = mqtt_publish(client, fullTopic, pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, arg);

	//strcat(fullTopic, "\r\n");
	//osPrintf("MQTT", "TOPIC:" );
	//osPrintf("MQTT", fullTopic);

	if(err != ERR_OK) {
		if(err != ERR_MEM){
			snprintf(debugMQTT, sizeof(debugMQTT), "Publish err: %d", err);
			osPrintf("MQTT", debugMQTT);
		}
	}
}

/* Called when publish is complete either with sucess or failure */
static void mqtt_pub_request_cb(void *arg, err_t result)
{
	if(result != ERR_OK) {
		snprintf(debugMQTT, sizeof(debugMQTT), "Publish result: %d", result);
		osPrintf("MQTT",debugMQTT);
	}
	else{
		osPrintf("MQTT", "Publish result: Success!");
	}
}


void MQTT_Task(void){
	example_publish(client,	NULL, digitalInputs[0]);
}













 

WilliamVR
Associate III

So the receiving was fixed! 
It turn out my RAM allocation wasn't just right.

But now there is a diffrent problem.
After a while of publishen data it stops sending and gets a ERR_MEM.
This is because the tcp send buffer for lwip fills up and overflows.
I don't know if there is a way to detect this.

WilliamVR_0-1717140375877.png

This is what wireshark sees. After this retransmission it stops working. It handles other retransmissions just fine.
This is my code atm.

All help is welcome!


/*
 * MQTT_Task.c
 *
 *  Created on: Mar 21, 2024
 *      Author: William Van Raemdonck
 */
#include "main.h"
#include "./TasksInc/MQTT_Task.h"
#include "./TasksInc/IO_Task.h"
#include "./configIRIS.h"
#include "lwip/apps/mqtt.h"
#include "lwip.h"
#include "lwip/ip.h"
#include "lwip/ip_addr.h"
#include "string.h"
#include "stm32f4xx_hal.h"


char Topic[] = "/IRIS";

// Digital Inputs
const char* digitalInputs[] = {
		"/digital/input/digitalInput_0",
		"/digital/input/digitalInput_1",
		"/digital/input/digitalInput_2",
		"/digital/input/digitalInput_3",
		"/digital/input/digitalInput_4",
		"/digital/input/digitalInput_5",
		"/digital/input/digitalInput_6",
		"/digital/input/digitalInput_7",
		"/digital/input/digitalInput_8",
		"/digital/input/digitalInput_9",
		"/digital/input/digitalInput_10",
		"/digital/input/digitalInput_11",
		"/digital/input/digitalInput_12",
		"/digital/input/digitalInput_13",
		"/digital/input/digitalInput_14",
		"/digital/input/digitalInput_15",
		"/digital/input/digitalInput_16",
		"/digital/input/digitalInput_17",
		"/digital/input/digitalInput_18",
		"/digital/input/digitalInput_19",
		"/digital/input/digitalInput_20",
		"/digital/input/digitalInput_21",
		"/digital/input/digitalInput_22",
		"/digital/input/digitalInput_23"
};

// Analog Inputs
const char* analogInputs[] = {
		"/analogue/input/adc_0-50V_0",
		"/analogue/input/adc_0-50V_1",
		"/analogue/input/adc_0-50V_2",
		"/analogue/input/adc_0-50V_3",
		"/analogue/input/adc_0-50V_4",
		"/analogue/input/adc_0-50V_5",
		"/analogue/input/adc_0-50V_6",
		"/analogue/input/adc_0-50V_7"
};

// Digital Outputs
const char* digitalOutputsHighSide[] = {
		"/digital/output/highside_0",
		"/digital/output/highside_1",
		"/digital/output/highside_2",
		"/digital/output/highside_3",
		"/digital/output/highside_4",
		"/digital/output/highside_5",
		"/digital/output/highside_6",
		"/digital/output/highside_7"
};

const char* digitalOutputsOpenCollector[] = {
		"/digital/output/oc_0",
		"/digital/output/oc_1",
		"/digital/output/oc_2",
		"/digital/output/oc_3",
		"/digital/output/oc_4",
		"/digital/output/oc_5",
		"/digital/output/oc_6",
		"/digital/output/oc_7"
};

const char* digitalOutputsRelais[] = {
		"/digital/output/relais_0",
		"/digital/output/relais_1",
		"/digital/output/relais_2",
		"/digital/output/relais_3",
		"/digital/output/relais_4",
		"/digital/output/relais_5",
		"/digital/output/relais_6",
		"/digital/output/relais_7"
};

mqtt_client_t *client;

extern osMutexId UART_MutexHandle;

ip4_addr_t ipAddrBroker;
uint8_t IP_ADDRESS_BROKER[4];
char MQTTusername[64];
char MQTTpassword[64];
char MQTTid[64];

char debugMQTT[128];

char* receivedMqttCommand;
char receivedDA[20];
char receivedIO[20];
char receivedIndex[20];

extern osMessageQId IO_OUTPUT_PIN_QUEUEHandle;
extern osMessageQId IO_OUTPUT_DATA_QUEUEHandle;

//CONFIG struct
extern IRISConfigstr configIRIS;

//IO
extern uint8_t digitalInputsFlash[24];
extern uint8_t openCollectorFlash[8];
extern uint8_t highsideDriverFlash[8];
extern uint8_t relaisFlash[8];
extern float ADCInputFlash[8];

int initFlagMQTT = 0;

void MQTT_Init(void){
	//TODO
	IP_ADDRESS_BROKER[0] = 192;	//configIRIS.MQTTbroker[0]
	IP_ADDRESS_BROKER[1] = 168;	//configIRIS.MQTTbroker[1]
	IP_ADDRESS_BROKER[2] = 0;	//configIRIS.MQTTbroker[2]
	IP_ADDRESS_BROKER[3] = 1;	//configIRIS.MQTTbroker[3]

	configIRIS.MQTTbroker[0] = 192;
	configIRIS.MQTTbroker[1] = 168;
	configIRIS.MQTTbroker[2] = 0;
	configIRIS.MQTTbroker[3] = 1;

	configIRIS.MQTTport = 1883;

	strcpy(configIRIS.MQTTtopic, "/IRIS");


	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);

	client = mqtt_client_new();

	osPrintf("MQTT", "new cient made");

	if(client != NULL) {
		example_do_connect(client);
	}

	initFlagMQTT = 1;
}

void MQTT_Subscribe(void){
	char fullTopic[50] = "";

	osPrintf("MQTT", "subscribed to topics:");

	//	SUBSCRIBE to all available outputs
	//	OC
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsOpenCollector[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsOpenCollector[i]);
	}

	//	HSD
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsHighSide[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsHighSide[i]);

	}

	//	REL
	for (int i = 0; i < 8; i++) {
		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsRelais[i]);
		mqtt_subscribe(client, fullTopic, 0,0, NULL);
		osPrintf("MQTT-subscribe", digitalOutputsRelais[i]);

	}
}

void example_do_connect(mqtt_client_t *client)
{
	struct mqtt_connect_client_info_t ci;
	err_t err;

	/* Setup an empty client info structure */
	memset(&ci, 0, sizeof(ci));

	/* Minimal amount of information required is client identifier, so set it here */
	ci.client_id = "IRIS";
	ci.client_user = "username";
	ci.client_pass = "password";

	strcpy(MQTTusername, ci.client_user);
	strcpy(MQTTpassword, ci.client_pass);
	strcpy(MQTTid, ci.client_id);

	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);

	/* Initiate client and connect to server, if this fails immediately an error code is returned
     otherwise mqtt_connection_cb will be called with connection result after attempting
     to establish a connection with the server.
     For now MQTT version 3.1.1 is always used */

	err = mqtt_client_connect(client, &ipAddrBroker, 1883, mqtt_connection_cb, 0, &ci);

	/* For now just print the result code if something goes wrong */
	if(err != ERR_OK) {
		osPrintf("MQTT", "mqtt_connect return: ");
		itoa(err, debugMQTT, 10);
		osPrintf("MQTT", debugMQTT);
	}
}

static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
	err_t err;
	if(status == MQTT_CONNECT_ACCEPTED) {
		osPrintf("MQTT", "mqtt_connection_cb: Successfully connected\r\n");

		/* Setup callback for incoming publish requests */
		mqtt_set_inpub_callback(client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, arg);

		/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
		err = mqtt_subscribe(client, "subtopic", 1, mqtt_sub_request_cb, arg);

		//subscribe to own topics
		MQTT_Subscribe();

		if(err != ERR_OK) {
			snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_subscribe return: %d", err);
			osPrintf("MQTT", debugMQTT);
		}
	}
	else
	{
		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_connection_cb: Disconnected, reason: %d", status);
		osPrintf("MQTT", debugMQTT);

		/* Its more nice to be connected, so try to reconnect */
		example_do_connect(client);
	}
}

static void mqtt_sub_request_cb(void *arg, err_t result)
{
	/* Just print the result code here for simplicity,
     normal behaviour would be to take some action if subscribe fails like
     notifying user, retry subscribe or disconnect from server */

	snprintf(debugMQTT, sizeof(debugMQTT), "Subscribe result: %d (0 = ERR_OK)", result);
	osPrintf("MQTT", debugMQTT);
}


//-----------------------------------------------------------------------------------
//	INCOMIMING


static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
	char inpub_type[20];
	int inpub_pin = 0;
	int index = 0;
	char *parts[4]; // Array to store extracted parts

	//DEBUG
	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish at topic: %s", topic);
	osPrintf("MQTT", debugMQTT);

	//extract pin number and type
	receivedMqttCommand = strtok(topic, "/");
	while (receivedMqttCommand != NULL && index < 4) {
		parts[index++] = receivedMqttCommand;
		receivedMqttCommand = strtok(NULL, "/");
	}
	sscanf(parts[3], "%[^_]_%d", inpub_type, &inpub_pin);


	if(strcmp(parts[0], &Topic[1]) != 0){
		osPrintf("debug", "received MQTT packet not for this device or wrong topic");
	}
	else{
		snprintf(debugMQTT, sizeof(debugMQTT), "type: %s, pin: %d", inpub_type, inpub_pin);
		osPrintf("MQTT", debugMQTT);

		uint8_t offset = 0;
		if(strcmp(inpub_type, "highside") == 0){
			offset = HIGHSIDE_0;
		}
		else if(strcmp(inpub_type, "oc") == 0){
			offset = OPEN_COLLECTOR_0;
		}
		else if(strcmp(inpub_type, "relais") == 0){
			offset = RELAY_0;
		}
		else{
			osPrintf("ERROR", "MQTT - Not a valid topic");
		}

		osMessagePut(IO_OUTPUT_PIN_QUEUEHandle, offset + inpub_pin, HAL_MAX_DELAY);

		//osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, inpub_type, HAL_MAX_DELAY);
	}
}

static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish payload with length: %d with flags: %d", len, (unsigned int)flags);
	osPrintf("MQTT", debugMQTT);

	if(flags & MQTT_DATA_FLAG_LAST) {
		/* Last fragment of payload received (or whole part if payload fits receive buffer
       See MQTT_VAR_HEADER_BUFFER_LEN)  */
		uint8_t dataNew = 0;


		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_incoming_data_cb: %s", (char*)data);
		osPrintf("MQTT", debugMQTT);

		dataNew = atoi((char*)data);
		osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, dataNew, HAL_MAX_DELAY);

	} else {
		/* Handle fragmented payload, store in buffer, write to file or whatever */
	}
}

void example_publish(mqtt_client_t *client, void *arg, const char* subTopic)
{
	const char *pub_payload= "1";	//tmp
	err_t err;
	u8_t qos = 2; /* 0 1 or 2, see MQTT specification */
	u8_t retain = 0; /* No don't retain such crappy payload... */

	char fullTopic[50] = "";
	strcat(fullTopic, Topic);
	strcat(fullTopic, subTopic);

	snprintf(fullTopic, 50, "%s%s", Topic, subTopic);

	snprintf(debugMQTT, sizeof(debugMQTT), "Publish at %s", fullTopic);
	osPrintf("MQTT",debugMQTT);
	err = mqtt_publish(client, fullTopic, pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, arg);

	if(err != ERR_OK) {
		snprintf(debugMQTT, sizeof(debugMQTT), "Publish err: %d", err);
		osPrintf("MQTT", debugMQTT);

		if(err == ERR_MEM){
			osPrintf("MQTT", "oeps, buffer vol");
		}
	}
}

/* Called when publish is complete either with sucess or failure */
static void mqtt_pub_request_cb(void *arg, err_t result)
{
	if(result != ERR_OK) {
		snprintf(debugMQTT, sizeof(debugMQTT), "Publish result: %d", result);
		osPrintf("MQTT",debugMQTT);
	}
	else{
		osPrintf("MQTT", "Publish result: Success!");
	}
}


void MQTT_Task(void){
	//MQTT_REQ_MAX_IN_FLIGHT
	if(initFlagMQTT){
		//digital inputs
		for (int i = 0; i < 24; i++) {
			example_publish(client,	NULL, digitalInputs[i]);
			HAL_Delay(500);
		}

		//analog inputs
		for (int i = 0; i < 8; i++) {
			example_publish(client,	NULL, analogInputs[i]);
			HAL_Delay(500);
		}
	}

}














 

Well, I'm not sure, but I think that the MQTT software that comes with lwip in cubeIDE is not thread save and should not be used in combination with freertos. My time for working on this project is up, but what I think a solution can be is: switch to coreMQTT and the necessary dependencies. 

 

I don't know if anyone from ST can confirm this.