cancel
Showing results for 
Search instead for 
Did you mean: 

STM32F429ZI NUCLEO connect MQTT broker

QuangIoT
Associate II
I'm facing an issue connecting STM32 to the MQTT broker. The mqtt_status has 4 states: MQTT_START, MQTT_CONNECT, MQTT_SUBTOPIC, and MQTT_RUNNING.
enum mqtt_status{
    MQTT_START = 0,
    MQTT_CONNECT = 1,
    MQTT_SUBTOPIC = 2,
 MQTT_RUNNING = 3
};
this is MQTTClient.h

 

#if !defined(__MQTT_CLIENT_C_)
#define __MQTT_CLIENT_C_

#if defined(__cplusplus)
 extern "C" {
#endif

#if defined(WIN32_DLL) || defined(WIN64_DLL)
  #define DLLImport __declspec(dllimport)
  #define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
  #define DLLImport extern
  #define DLLExport  __attribute__ ((visibility ("default")))
#else
  #define DLLImport
  #define DLLExport
#endif

#include "../MQTTPacket/MQTTPacket.h"
#include "stdio.h"
#include "MQTTFreertos.h"
#include <stdbool.h>

#define MQTT_TASK
#if !defined(MQTT_TASK)
#define WAIT_FOR_ACK
#endif

#define MQTT_SENDBUF_LEN  2048
#define MQTT_READBUF_LEN  2048


enum mqtt_status{
	MQTT_START       = 0,
	MQTT_CONNECT  = 1,
	MQTT_SUBTOPIC = 2,
	MQTT_RUNNING  = 3
};

extern bool mqtt_bMqttStatus;
extern bool first_time_connect;
#if defined(MQTTCLIENT_PLATFORM_HEADER)
/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value
 * into a string constant suitable for use with include.
 */
#define xstr(s) str(s)
#define str(s) #s
#include xstr(MQTTCLIENT_PLATFORM_HEADER)
#endif

#define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */

#if !defined(MAX_MESSAGE_HANDLERS)
#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
#endif

enum QoS { QOS0, QOS1, QOS2 };

/* all failure return codes must be negative */
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1 };//, SUCCESS = 0

/* The Platform specific header must define the Network and Timer structures and functions
 * which operate on them.
 *
typedef struct Network
{
	int (*mqttread)(Network*, unsigned char* read_buffer, int, int);
	int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int);
} Network;*/

/* The Timer structure must be defined in the platform specific header,
 * and have the following functions to operate on it.  */
extern void TimerInit(Timer*);
extern char TimerIsExpired(Timer*);
extern void TimerCountdownMS(Timer*, unsigned int);
extern void TimerCountdown(Timer*, unsigned int);
extern int TimerLeftMS(Timer*);

typedef struct MQTTMessage
{
    enum QoS qos;
    unsigned char retained;
    unsigned char dup;
    unsigned short id;
    void *payload;
    size_t payloadlen;
} MQTTMessage;

typedef struct MessageData
{
    MQTTMessage* message;
    MQTTString* topicName;
} MessageData;

typedef void (*messageHandler)(MessageData*);

typedef struct MQTTClient
{
    unsigned int next_packetid,
      command_timeout_ms;
    size_t buf_size,
      readbuf_size;
    unsigned char *buf,
      *readbuf;
    unsigned int keepAliveInterval;
    char ping_outstanding;
    int isconnected;

    struct MessageHandlers
    {
        const char* topicFilter;
        void (*fp) (MessageData*);
    } messageHandlers[MAX_MESSAGE_HANDLERS];      /* Message handlers are indexed by subscription topic */

    void (*defaultMessageHandler) (MessageData*);

    Network* ipstack;
    Timer ping_timer;

    Timer cmd_timer;
    int mqttstatus;
} MQTTClient;

#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}


/**
 * Create an MQTT client object
 * @PAram client
 * @PAram network
 * @PAram command_timeout_ms
 * @PAram
 */
DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms,
		unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size);

/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
 *  The nework object must be connected to the network endpoint before calling this
 *  @PAram options - connect options
 *  @return success code
 */
DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);

/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
 *  @PAram client - the client object to use
 *  @PAram topic - the topic to publish to
 *  @PAram message - the message to send
 *  @return success code
 */
DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);

/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
 *  @PAram client - the client object to use
 *  @PAram topicFilter - the topic filter to subscribe to
 *  @PAram message - the message to send
 *  @return success code
 */
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);

/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
 *  @PAram client - the client object to use
 *  @PAram topicFilter - the topic filter to unsubscribe from
 *  @return success code
 */
DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);

/** MQTT Disconnect - send an MQTT disconnect packet and close the connection
 *  @PAram client - the client object to use
 *  @return success code
 */
DLLExport int MQTTDisconnect(MQTTClient* client);

/** MQTT Yield - MQTT background
 *  @PAram client - the client object to use
 *  @PAram time - the time, in milliseconds, to yield for 
 *  @return success code
 */
DLLExport int MQTTYield(MQTTClient* client, int time);

//static int readPacket(MQTTClient* c, Timer* timer);
//static int sendPacket(MQTTClient* c, int length, Timer* timer);
#if defined(MQTT_TASK)
void MQTTSetStatus(MQTTClient* c, int mqttstatus);
int MQTTDataHandle(MQTTClient* c, fd_set *readfd, MQTTPacket_connectData *connectData, messageHandler messageHandler, char* address, char* topic);
int MQTTDataHandle_custom(MQTTClient* c, fd_set *readfd, MQTTPacket_connectData *connectData, messageHandler messageHandler, char* address, int port, char* topic, bool *bResub);
#endif

#if defined(__cplusplus)
     }
#endif

#endif

 

In the mqtt_app.c file, when checking client.mqttstatus, the comparison is (client.mqttstatus == MQTT_RUNNING), which should trigger further tasks. However, the mqttstatus only reaches MQTT_CONNECT and then returns to MQTT_START instead of progressing to MQTT_RUNNING.
This is mqtt_app.c

 

#include "mqtt_app.h"

#define MQTT_SELECT_TIMEOUT 1

static char strSubscribeTopic[MQTT_MAXIMUM_TOPIC_LENGTH] = "/from/app/to/device";
static char strPublishTopic[MQTT_MAXIMUM_TOPIC_LENGTH] = "/from/device/to/app";
static char strAllertTopic[MQTT_MAXIMUM_TOPIC_LENGTH] = "/from/device/to/be";
static char strRxMessage[MQTT_MAXIMUM_MESSAGE_LENGTH] = {0};
static char strRxTopic[MQTT_MAXIMUM_TOPIC_LENGTH];
static char strSubscribeTopic_Endpoint_Link[MQTT_MAXIMUM_TOPIC_LENGTH] = {0};
static char strPublishTopic_Endpoint_Link[MQTT_MAXIMUM_TOPIC_LENGTH] = {0};

TaskHandle_t xMqttHandle = NULL;
TaskHandle_t xMqttRx = NULL;
#define BROKER_TEST
#ifdef BROKER_TEST
char* strUrlDefault = "mqtt://5d68878698ac4607b82fcf50649cc7a3.s1.eu.hivemq.cloud:8883 ";  
char* strUsernameDefault = "SmartHome"; 
char* strPassWordDefault = "Sm@rtH0me!2020";
char* strCmdTopicDefault = "CmdTopicTest";
char* strContentTopicDefault = "ContentTopicTest";

#endif

static char strBrokerUrl[100] = {0};
static char strBrokerUser[100] = {0};
static char strBrokerPass[100] = {0};

bool mqtt_bMqttStatus = false;
bool g_bMqttPause = false;
bool mqtt_bMqttRefresh = false;
uint64_t g_LastestMessageTimestamp = 0;

static xQueueHandle mqttRxQueue = NULL;

static void task_mqtt_rx();

char mqtt_cLwipHostName[15] = {'V','C','N'};

static MQTTClient client;
static Network network;
static bool bMqttQueueCreated = false;
bool bFirstConnect = true;

static char strAddress[MQTT_MAXIMUM_URL_LENGTH] = {0};
static MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
static int iPort = 8884;
static unsigned char iUseTls = 1;
static unsigned char sendbuf[MQTT_SENDBUF_LEN] = {0};
static unsigned char readbuf[MQTT_READBUF_LEN] = {0};
static bool bResub = false;

static bool app_mqtt_handle_url(const char *src, char *url, int *port, uint8_t *useTls){
	int len = strlen(src);
	int index = 0;
	char header[6] = {0};
	memcpy(header,src,5);
	if (strcmp(header,"mqtt:") == 0){
		*useTls = 0;
		index = 7;
	}
	else if (strcmp(header,"mqtts") == 0){
		*useTls = 1;
		index = 8;
	}
	else return false;

	int indexTemp = 0;
	char *output = url;
	char porttemp[10] = {0};
	while (index<len){
		if (src[index] == ':'){
			output[index] = 0;
			output = porttemp;
			indexTemp = 0;
			index++;
			continue;
		}
		output[indexTemp++] = src[index];
		index++;
	}
	*port = atoi(porttemp);
	//printf("'%s' '%s'",url,porttemp);
}

static void app_mqtt_message_cmd(MessageData* data)
{
        if(data->message->payloadlen > (MQTT_MAXIMUM_MESSAGE_LENGTH-1) || data->topicName->lenstring.len > (MQTT_MAXIMUM_TOPIC_LENGTH-1)){
          return;
        }
        memcpy(strRxTopic,data->topicName->lenstring.data,data->topicName->lenstring.len);
        memcpy(strRxMessage,data->message->payload,data->message->payloadlen);
//        LREP(__FUNCTION__,"Message arrived on topic %s: %s",strRxTopic,strRxMessage);
        uint16_t datalen = data->message->payloadlen;
        if (xQueueSend(mqttRxQueue,&datalen,10/portTICK_RATE_MS) != pdTRUE){
          LREP_ERROR(__FUNCTION__, "xQueue send failed");
        }
        return;
}

static bool app_mqtt_publish(char *topic, char *data)
{
	int rc = 0;
	MQTTMessage message;
	message.qos = QOS1;
	message.retained = 0;
	message.payload = data;
//	LREP(__FUNCTION__,"mqtt status: %d",client.mqttstatus);


	printf("MQTT Status: %d\n", client.mqttstatus);


	if(client.mqttstatus == MQTT_RUNNING){
//		send_message();
		message.payloadlen = strlen(data);
//		LREP(__FUNCTION__, "Publish Topic %s : '%s'", topic, data);
		if ((rc = MQTTPublish(&client, topic, &message)) != 0){
			mqtt_printf(MQTT_INFO, "Return code from MQTT publish is %d\n", rc);
                        mqtt_bMqttStatus = false;
			MQTTSetStatus(&client, MQTT_START);
                        client.ipstack->disconnect(client.ipstack);
//			client.ipstack->disconnect(client.ipstack);
                        return false;
		}
                else{
                        return true;
                }
	}
        return false;
}

void app_mqtt_subscribe(char *topic, messageHandler messageHandler){
     MQTTSubscribe(&client,strSubscribeTopic,QOS1,messageHandler);
      int i;
      int isSubscribed = 0;
      for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
      {
              if (client.messageHandlers[i].topicFilter == topic)
              {
                      isSubscribed = 1;
                      break;
              }
      }
      if(!isSubscribed){
          for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
          {
              if (client.messageHandlers[i].topicFilter == 0)
              {
                  client.messageHandlers[i].topicFilter = topic;
                  client.messageHandlers[i].fp = messageHandler;
                  break;
              }
          }
      }
}



void app_mqtt_init_task(void){

#ifdef BROKER_TEST
    memcpy(strBrokerUrl,strUrlDefault,strlen(strUrlDefault));
    if (strUsernameDefault) memcpy(strBrokerUser,strUsernameDefault,strlen(strUsernameDefault));
    if(strUsernameDefault) memcpy(strBrokerPass,strPassWordDefault,strlen(strPassWordDefault));
    memset(strSubscribeTopic,0,strlen(strSubscribeTopic));
    memset(strPublishTopic,0,strlen(strPublishTopic));
    memset(strAllertTopic,0,strlen(strAllertTopic));
    memcpy(strSubscribeTopic,strCmdTopicDefault,strlen(strCmdTopicDefault));
    memcpy(strPublishTopic,strContentTopicDefault,strlen(strContentTopicDefault));
//    memcpy(strAllertTopic,strAlertTopicDefault,strlen(strAlertTopicDefault));
//#else /*BROKER_TEST*/
////    app_filesystem_read_cloud_info(&g_CloudInfo);
#endif /*BROKER_TEST*/

  char cliId[40] = {0};
  int iMqtt_rc = 0;

  sprintf(cliId,"VinhTran");

  app_mqtt_handle_url(strBrokerUrl,strAddress,&iPort,&iUseTls);
  connectData.MQTTVersion = 3;
  connectData.clientID.cstring = cliId;
  connectData.clientID.lenstring.data = cliId;
  connectData.clientID.lenstring.len = strlen(cliId);
  LREP(__FUNCTION__,"client ID: '%s'",connectData.clientID.cstring);

  connectData.username.cstring = strBrokerUser;
  connectData.username.lenstring.data = strBrokerUser;
  connectData.username.lenstring.len = strlen(strBrokerUser);

  connectData.password.cstring = strBrokerPass;
  connectData.password.lenstring.data = strBrokerPass;
  connectData.password.lenstring.len = strlen(strBrokerPass);
  LREP(__FUNCTION__,"Broker url: '%s' port: %d",strAddress,iPort);
  LREP(__FUNCTION__,"Broker user name: '%s'",strBrokerUser);
  LREP(__FUNCTION__,"Broker pass word: '%s'",strBrokerPass);
  LREP(__FUNCTION__,"Broker sub topic: '%s'",strSubscribeTopic);
  LREP(__FUNCTION__,"Broker pub topic: '%s'",strPublishTopic);
  LREP(__FUNCTION__,"Broker alert topic: '%s'",strAllertTopic);
//  LREP(__FUNCTION__,"force OTA: '%s'",strForceOtaUrl);
//  LREP(__FUNCTION__,"be shared key: '%s'",strBeSharedKey);

  NetworkInit(&network);
  network.use_ssl = iUseTls;
  MQTTClientInit(&client, &network, 5000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));

  while (1){
	while (app_ethernet_dhcp_ready() == false){
	  osDelay(100);
      mqtt_printf(MQTT_INFO, "Wait Wi-Fi to be connected.");
    }

	printf("MQTT Status: %d\n", client.mqttstatus);

    fd_set read_fds;
    fd_set except_fds;
    struct timeval timeout;

    FD_ZERO(&read_fds);
    FD_ZERO(&except_fds);
    timeout.tv_sec = MQTT_SELECT_TIMEOUT;
    timeout.tv_usec = 0;



    if(network.my_socket>=0){
      FD_SET(network.my_socket, &read_fds);
      FD_SET(network.my_socket, &except_fds);
      FreeRTOS_Select(network.my_socket + 1,&read_fds,NULL,&except_fds,&timeout);
      if(FD_ISSET(network.my_socket, &except_fds)){
        mqtt_printf(MQTT_INFO,"except_fds is set");
        MQTTSetStatus(&client,MQTT_START);
      }
    }

    iMqtt_rc = MQTTDataHandle_custom(&client, &read_fds, &connectData, app_mqtt_message_cmd, strAddress, iPort, strSubscribeTopic, &bResub);

    if (bResub){

      bResub = false;
      LREP_WARNING(__FUNCTION__,"MQTT is reconnect -> resub topic!");
    }
    osDelay(10);
//    delay_ms(10);
  }
}



void app_mqtt_init(void)
{


    xTaskCreate(app_mqtt_init_task,	/* The function that implements the task. */
			"MQTTTask",			/* Just a text name for the task to aid debugging. */
			4096,	/* The stack size is defined in FreeRTOSIPConfig.h. */
			NULL,		/* The task parameter, not used in this case. */
			3,		/* The priority assigned to the task is defined in FreeRTOSConfig.h. */
			&xMqttHandle);				/* The task handle is not used. */
    LREP_WARNING(__FUNCTION__,"Mqtt init done!");
}

 

I would like to understand the cause and how to fix this issue.

5 REPLIES 5

@QuangIoT wrote:
I'm facing an issue connecting STM32 to the MQTT broker

What MQTT broker?

How are you trying to connect: Ethernet? WiFi? Other?

Are you sure that the basic IP connection is working?

Are you sure that your login/connection credentials & settings are correct?

 


@QuangIoT wrote:
 
the mqttstatus only reaches MQTT_CONNECT and then returns to MQTT_START instead of progressing to MQTT_RUNNING.

Sounds like the connection is failing

I'm using HiveMQ with Ethernet.
I'm certain it's working because I can ping that IP address.
When I log the status, it shows the connection state, but it doesn't reach the running state to send/receive data.

 

Screenshot 2024-10-24 152953.png


@QuangIoT wrote:

I'm using HiveMQ with Ethernet.


So have you resolved this question yet: https://community.st.com/t5/stm32-mcus-embedded-software/send-a-message-to-hivemq/td-p/733865 ?

 


@QuangIoT wrote:

I'm certain it's working because I can ping that IP address.


So that's the basic IP connection.

Successful ping is certainly necessary - but may not be sufficient.

 


@QuangIoT wrote:

When I log the status, it shows the connection state, but it doesn't reach the running state


So look at what's necessary to get from 'connected' to 'running', and see where that's failing.

Is the connection dropping?

I have solved the previous issue, but with this one, I still don't know why it can't reach the running state. I also haven't found a solution for this problem yet.


@QuangIoT wrote:

I have solved the previous issue, .


Then please mark the solution in that thread:

https://community.st.com/t5/community-guidelines/help-others-to-solve-their-issues/ta-p/575256

 


@QuangIoT wrote:

I still don't know why it can't reach the running state.


Again, look at what's necessary to get from 'connected' to 'running', and see where that's failing.

Is the connection dropping?

Have you used Wireshark or similar to see what's happening on the Ethernet?

https://www.google.com/search?q=Wireshark+mqtt+debugging 

Can you enable extra logging at the client?

Are there logs available at the Broker end?