2022-08-24 11:37 PM
mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
const struct mqtt_connect_client_info_t *client_info)
{
err_t err;
size_t len;
u16_t client_id_length;
/* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
if (client->conn_state != TCP_DISCONNECTED) {
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
return ERR_ISCONN;
}
/* Wipe clean */
memset(client, 0, sizeof(mqtt_client_t));
client->connect_arg = arg;
client->connect_cb = cb;
client->keep_alive = client_info->keep_alive;
mqtt_init_requests(client->req_list);
/* Build connect message */
if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
flags |= MQTT_CONNECT_FLAG_WILL;
flags |= (client_info->will_qos & 3) << 3;
if (client_info->will_retain) {
flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
}
len = strlen(client_info->will_topic);
LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
will_topic_len = (u8_t)len;
len = strlen(client_info->will_msg);
LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
will_msg_len = (u8_t)len;
len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
remaining_length = (u16_t)len;
}
/* Don't complicate things, always connect using clean session */
flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
len = strlen(client_info->client_id);
LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
client_id_length = (u16_t)len;
len = remaining_length + 2 + client_id_length;
LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
remaining_length = (u16_t)len;
if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
return ERR_MEM;
}
client->conn = tcp_new();
if (client->conn == NULL) {
return ERR_MEM;
}
/* Set arg pointer for callbacks */
tcp_arg(client->conn, client);
/* Any local address, pick random local port number */
err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
if (err != ERR_OK) {
LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
goto tcp_fail;
}
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
/* Connect to server */
err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
if (err != ERR_OK) {
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
goto tcp_fail;
}
/* Set error callback */
tcp_err(client->conn, mqtt_tcp_err_cb);
client->conn_state = TCP_CONNECTING;
/* Append fixed header */
mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
/* Append Protocol string */
mqtt_output_append_string(&client->output, "MQTT", 4);
/* Append Protocol level */
mqtt_output_append_u8(&client->output, 4);
/* Append connect flags */
mqtt_output_append_u8(&client->output, flags);
/* Append keep-alive */
mqtt_output_append_u16(&client->output, client_info->keep_alive);
/* Append client id */
mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
/* Append will message if used */
if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
}
return ERR_OK;
tcp_fail:
tcp_abort(client->conn);
client->conn = NULL;
return err;
}
err_t
mqtt_publish(mqtt_client_t *client,const char *topic, const void *payload, uint16_t payload_length, u8_t qos, u8_t retain,
mqtt_request_cb_t cb, void *arg)
{
struct mqtt_request_t *r;
u16_t pkt_id;
size_t topic_strlen;
size_t total_len;
u16_t topic_len;
u16_t remaining_length;
LWIP_ASSERT("mqtt_publish: client != NULL", client);
LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
topic_strlen = strlen(topic);
LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
topic_len = (u16_t)topic_strlen;
total_len = 2 + topic_len + payload_length;
LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
remaining_length = (u16_t)total_len;
LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
if (qos > 0) {
remaining_length += 2;
/* Generate pkt_id id for QoS1 and 2 */
pkt_id = msg_generate_packet_id(client);
} else {
/* Use reserved value pkt_id 0 for QoS 0 in request handle */
pkt_id = 0;
}
r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
if (r == NULL) {
return ERR_MEM;
}
if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
mqtt_delete_request(r);
return ERR_MEM;
}
/* Append fixed header */
mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
/* Append Topic */
mqtt_output_append_string(&client->output, topic, topic_len);
/* Append packet if for QoS 1 and 2*/
if (qos > 0) {
mqtt_output_append_u16(&client->output, pkt_id);
}
/* Append optional publish payload */
if ((payload != NULL) && (payload_length > 0)) {
mqtt_output_append_buf(&client->output, payload, payload_length);
}
mqtt_append_request(&client->pend_req_queue, r);
mqtt_output_send(&client->output, client->conn);
return ERR_OK;
}
2022-10-26 06:50 AM
Hi,
There's an MQTT application example working on STM32F429ZI-Nucleo based on NetXDuo available here and which could help :
Regards
Mahdy