get basic mqtt pub working
This commit is contained in:
parent
d70d551638
commit
924630b1d9
1 changed files with 15 additions and 164 deletions
179
mqtt_client.c
179
mqtt_client.c
|
|
@ -74,23 +74,13 @@ typedef struct {
|
||||||
// At most once (QoS 0)
|
// At most once (QoS 0)
|
||||||
// At least once (QoS 1)
|
// At least once (QoS 1)
|
||||||
// Exactly once (QoS 2)
|
// Exactly once (QoS 2)
|
||||||
#define MQTT_SUBSCRIBE_QOS 1
|
|
||||||
#define MQTT_PUBLISH_QOS 1
|
#define MQTT_PUBLISH_QOS 1
|
||||||
#define MQTT_PUBLISH_RETAIN 0
|
#define MQTT_PUBLISH_RETAIN 0
|
||||||
|
#define MQTT_DEVICE_NAME "pico_test_001"
|
||||||
// topic used for last will and testament
|
|
||||||
#define MQTT_WILL_TOPIC "/online"
|
|
||||||
#define MQTT_WILL_MSG "0"
|
|
||||||
#define MQTT_WILL_QOS 1
|
|
||||||
|
|
||||||
#ifndef MQTT_DEVICE_NAME
|
|
||||||
#define MQTT_DEVICE_NAME "pico"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Set to 1 to add the client name to topics, to support multiple devices using the same server
|
// Set to 1 to add the client name to topics, to support multiple devices using the same server
|
||||||
#ifndef MQTT_UNIQUE_TOPIC
|
//
|
||||||
#define MQTT_UNIQUE_TOPIC 0
|
#define MQTT_TOPIC "bws/test1/state"
|
||||||
#endif
|
|
||||||
|
|
||||||
/* References for this implementation:
|
/* References for this implementation:
|
||||||
* raspberry-pi-pico-c-sdk.pdf, Section '4.1.1. hardware_adc'
|
* raspberry-pi-pico-c-sdk.pdf, Section '4.1.1. hardware_adc'
|
||||||
|
|
@ -118,38 +108,16 @@ static void pub_request_cb(__unused void *arg, err_t err) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *full_topic(MQTT_CLIENT_DATA_T *state, const char *name) {
|
|
||||||
#if MQTT_UNIQUE_TOPIC
|
|
||||||
static char full_topic[MQTT_TOPIC_LEN];
|
|
||||||
snprintf(full_topic, sizeof(full_topic), "/%s%s", state->mqtt_client_info.client_id, name);
|
|
||||||
return full_topic;
|
|
||||||
#else
|
|
||||||
return name;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
static void control_led(MQTT_CLIENT_DATA_T *state, bool on) {
|
|
||||||
// Publish state on /state topic and on/off led board
|
|
||||||
const char* message = on ? "On" : "Off";
|
|
||||||
if (on)
|
|
||||||
cyw43_arch_gpio_put(CYW43_WL_GPIO_LED_PIN, 1);
|
|
||||||
else
|
|
||||||
cyw43_arch_gpio_put(CYW43_WL_GPIO_LED_PIN, 0);
|
|
||||||
|
|
||||||
mqtt_publish(state->mqtt_client_inst, full_topic(state, "/led/state"), message, strlen(message), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void publish_temperature(MQTT_CLIENT_DATA_T *state) {
|
static void publish_temperature(MQTT_CLIENT_DATA_T *state) {
|
||||||
static float old_temperature;
|
static float old_temperature;
|
||||||
const char *temperature_key = full_topic(state, "/temperature");
|
|
||||||
float temperature = read_onboard_temperature(TEMPERATURE_UNITS);
|
float temperature = read_onboard_temperature(TEMPERATURE_UNITS);
|
||||||
if (temperature != old_temperature) {
|
if (temperature != old_temperature) {
|
||||||
old_temperature = temperature;
|
old_temperature = temperature;
|
||||||
// Publish temperature on /temperature topic
|
// Publish temperature on /temperature topic
|
||||||
char temp_str[16];
|
char temp_str[16];
|
||||||
snprintf(temp_str, sizeof(temp_str), "%.2f", temperature);
|
snprintf(temp_str, sizeof(temp_str), "%.2f", temperature);
|
||||||
INFO_printf("Publishing %s to %s\n", temp_str, temperature_key);
|
INFO_printf("Publishing Temp %s\n", temp_str);
|
||||||
mqtt_publish(state->mqtt_client_inst, temperature_key, temp_str, strlen(temp_str), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
|
mqtt_publish(state->mqtt_client_inst, MQTT_TOPIC, temp_str, strlen(temp_str), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,49 +143,6 @@ static void unsub_request_cb(void *arg, err_t err) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sub_unsub_topics(MQTT_CLIENT_DATA_T* state, bool sub) {
|
|
||||||
mqtt_request_cb_t cb = sub ? sub_request_cb : unsub_request_cb;
|
|
||||||
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/led"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
|
|
||||||
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/print"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
|
|
||||||
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/ping"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
|
|
||||||
mqtt_sub_unsub(state->mqtt_client_inst, full_topic(state, "/exit"), MQTT_SUBSCRIBE_QOS, cb, state, sub);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags) {
|
|
||||||
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)arg;
|
|
||||||
#if MQTT_UNIQUE_TOPIC
|
|
||||||
const char *basic_topic = state->topic + strlen(state->mqtt_client_info.client_id) + 1;
|
|
||||||
#else
|
|
||||||
const char *basic_topic = state->topic;
|
|
||||||
#endif
|
|
||||||
strncpy(state->data, (const char *)data, len);
|
|
||||||
state->len = len;
|
|
||||||
state->data[len] = '\0';
|
|
||||||
|
|
||||||
DEBUG_printf("Topic: %s, Message: %s\n", state->topic, state->data);
|
|
||||||
if (strcmp(basic_topic, "/led") == 0)
|
|
||||||
{
|
|
||||||
if (lwip_stricmp((const char *)state->data, "On") == 0 || strcmp((const char *)state->data, "1") == 0)
|
|
||||||
control_led(state, true);
|
|
||||||
else if (lwip_stricmp((const char *)state->data, "Off") == 0 || strcmp((const char *)state->data, "0") == 0)
|
|
||||||
control_led(state, false);
|
|
||||||
} else if (strcmp(basic_topic, "/print") == 0) {
|
|
||||||
INFO_printf("%.*s\n", len, data);
|
|
||||||
} else if (strcmp(basic_topic, "/ping") == 0) {
|
|
||||||
char buf[11];
|
|
||||||
snprintf(buf, sizeof(buf), "%u", to_ms_since_boot(get_absolute_time()) / 1000);
|
|
||||||
mqtt_publish(state->mqtt_client_inst, full_topic(state, "/uptime"), buf, strlen(buf), MQTT_PUBLISH_QOS, MQTT_PUBLISH_RETAIN, pub_request_cb, state);
|
|
||||||
} else if (strcmp(basic_topic, "/exit") == 0) {
|
|
||||||
state->stop_client = true; // stop the client when ALL subscriptions are stopped
|
|
||||||
sub_unsub_topics(state, false); // unsubscribe
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len) {
|
|
||||||
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)arg;
|
|
||||||
strncpy(state->topic, topic, sizeof(state->topic));
|
|
||||||
}
|
|
||||||
|
|
||||||
static void temperature_worker_fn(async_context_t *context, async_at_time_worker_t *worker) {
|
static void temperature_worker_fn(async_context_t *context, async_at_time_worker_t *worker) {
|
||||||
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)worker->user_data;
|
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)worker->user_data;
|
||||||
publish_temperature(state);
|
publish_temperature(state);
|
||||||
|
|
@ -229,12 +154,6 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
|
||||||
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)arg;
|
MQTT_CLIENT_DATA_T* state = (MQTT_CLIENT_DATA_T*)arg;
|
||||||
if (status == MQTT_CONNECT_ACCEPTED) {
|
if (status == MQTT_CONNECT_ACCEPTED) {
|
||||||
state->connect_done = true;
|
state->connect_done = true;
|
||||||
sub_unsub_topics(state, true); // subscribe;
|
|
||||||
|
|
||||||
// indicate online
|
|
||||||
if (state->mqtt_client_info.will_topic) {
|
|
||||||
mqtt_publish(state->mqtt_client_inst, state->mqtt_client_info.will_topic, "1", 1, MQTT_WILL_QOS, true, pub_request_cb, state);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish temperature every 10 sec if it's changed
|
// Publish temperature every 10 sec if it's changed
|
||||||
temperature_worker.user_data = state;
|
temperature_worker.user_data = state;
|
||||||
|
|
@ -250,14 +169,7 @@ static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection
|
||||||
}
|
}
|
||||||
|
|
||||||
static void start_client(MQTT_CLIENT_DATA_T *state) {
|
static void start_client(MQTT_CLIENT_DATA_T *state) {
|
||||||
#if LWIP_ALTCP && LWIP_ALTCP_TLS
|
|
||||||
const int port = MQTT_TLS_PORT;
|
|
||||||
INFO_printf("Using TLS\n");
|
|
||||||
#else
|
|
||||||
const int port = MQTT_PORT;
|
const int port = MQTT_PORT;
|
||||||
INFO_printf("Warning: Not using TLS\n");
|
|
||||||
#endif
|
|
||||||
|
|
||||||
state->mqtt_client_inst = mqtt_client_new();
|
state->mqtt_client_inst = mqtt_client_new();
|
||||||
if (!state->mqtt_client_inst) {
|
if (!state->mqtt_client_inst) {
|
||||||
panic("MQTT client instance creation error");
|
panic("MQTT client instance creation error");
|
||||||
|
|
@ -269,25 +181,9 @@ static void start_client(MQTT_CLIENT_DATA_T *state) {
|
||||||
if (mqtt_client_connect(state->mqtt_client_inst, &state->mqtt_server_address, port, mqtt_connection_cb, state, &state->mqtt_client_info) != ERR_OK) {
|
if (mqtt_client_connect(state->mqtt_client_inst, &state->mqtt_server_address, port, mqtt_connection_cb, state, &state->mqtt_client_info) != ERR_OK) {
|
||||||
panic("MQTT broker connection error");
|
panic("MQTT broker connection error");
|
||||||
}
|
}
|
||||||
#if LWIP_ALTCP && LWIP_ALTCP_TLS
|
|
||||||
// This is important for MBEDTLS_SSL_SERVER_NAME_INDICATION
|
|
||||||
mbedtls_ssl_set_hostname(altcp_tls_context(state->mqtt_client_inst->conn), MQTT_SERVER);
|
|
||||||
#endif
|
|
||||||
mqtt_set_inpub_callback(state->mqtt_client_inst, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, state);
|
|
||||||
cyw43_arch_lwip_end();
|
cyw43_arch_lwip_end();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call back with a DNS result
|
|
||||||
static void dns_found(const char *hostname, const ip_addr_t *ipaddr, void *arg) {
|
|
||||||
MQTT_CLIENT_DATA_T *state = (MQTT_CLIENT_DATA_T*)arg;
|
|
||||||
if (ipaddr) {
|
|
||||||
state->mqtt_server_address = *ipaddr;
|
|
||||||
start_client(state);
|
|
||||||
} else {
|
|
||||||
panic("dns request failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(void) {
|
int main(void) {
|
||||||
stdio_init_all();
|
stdio_init_all();
|
||||||
INFO_printf("mqtt client starting\n");
|
INFO_printf("mqtt client starting\n");
|
||||||
|
|
@ -302,52 +198,18 @@ int main(void) {
|
||||||
panic("Failed to inizialize CYW43");
|
panic("Failed to inizialize CYW43");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use board unique id
|
state.mqtt_client_info.client_id = MQTT_DEVICE_NAME;
|
||||||
char unique_id_buf[5];
|
|
||||||
pico_get_unique_board_id_string(unique_id_buf, sizeof(unique_id_buf));
|
|
||||||
for(int i=0; i < sizeof(unique_id_buf) - 1; i++) {
|
|
||||||
unique_id_buf[i] = tolower(unique_id_buf[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate a unique name, e.g. pico1234
|
|
||||||
char client_id_buf[sizeof(MQTT_DEVICE_NAME) + sizeof(unique_id_buf) - 1];
|
|
||||||
memcpy(&client_id_buf[0], MQTT_DEVICE_NAME, sizeof(MQTT_DEVICE_NAME) - 1);
|
|
||||||
memcpy(&client_id_buf[sizeof(MQTT_DEVICE_NAME) - 1], unique_id_buf, sizeof(unique_id_buf) - 1);
|
|
||||||
client_id_buf[sizeof(client_id_buf) - 1] = 0;
|
|
||||||
INFO_printf("Device name %s\n", client_id_buf);
|
|
||||||
|
|
||||||
state.mqtt_client_info.client_id = client_id_buf;
|
|
||||||
state.mqtt_client_info.keep_alive = MQTT_KEEP_ALIVE_S; // Keep alive in sec
|
state.mqtt_client_info.keep_alive = MQTT_KEEP_ALIVE_S; // Keep alive in sec
|
||||||
#if defined(MQTT_USERNAME) && defined(MQTT_PASSWORD)
|
|
||||||
state.mqtt_client_info.client_user = MQTT_USERNAME;
|
state.mqtt_client_info.client_user = MQTT_USERNAME;
|
||||||
state.mqtt_client_info.client_pass = MQTT_PASSWORD;
|
state.mqtt_client_info.client_pass = MQTT_PASSWORD;
|
||||||
#else
|
ip_addr_t ip_addr;
|
||||||
state.mqtt_client_info.client_user = NULL;
|
// Initialize the IP address
|
||||||
state.mqtt_client_info.client_pass = NULL;
|
if (ipaddr_aton(MQTT_SERVER, &ip_addr)) {
|
||||||
#endif
|
printf("IP address is valid and initialized.\n");
|
||||||
static char will_topic[MQTT_TOPIC_LEN];
|
} else {
|
||||||
strncpy(will_topic, full_topic(&state, MQTT_WILL_TOPIC), sizeof(will_topic));
|
printf("Invalid IP address format.\n");
|
||||||
state.mqtt_client_info.will_topic = will_topic;
|
}
|
||||||
state.mqtt_client_info.will_msg = MQTT_WILL_MSG;
|
state.mqtt_server_address = ip_addr;
|
||||||
state.mqtt_client_info.will_qos = MQTT_WILL_QOS;
|
|
||||||
state.mqtt_client_info.will_retain = true;
|
|
||||||
#if LWIP_ALTCP && LWIP_ALTCP_TLS
|
|
||||||
// TLS enabled
|
|
||||||
#ifdef MQTT_CERT_INC
|
|
||||||
static const uint8_t ca_cert[] = TLS_ROOT_CERT;
|
|
||||||
static const uint8_t client_key[] = TLS_CLIENT_KEY;
|
|
||||||
static const uint8_t client_cert[] = TLS_CLIENT_CERT;
|
|
||||||
// This confirms the indentity of the server and the client
|
|
||||||
state.mqtt_client_info.tls_config = altcp_tls_create_config_client_2wayauth(ca_cert, sizeof(ca_cert),
|
|
||||||
client_key, sizeof(client_key), NULL, 0, client_cert, sizeof(client_cert));
|
|
||||||
#if ALTCP_MBEDTLS_AUTHMODE != MBEDTLS_SSL_VERIFY_REQUIRED
|
|
||||||
WARN_printf("Warning: tls without verification is insecure\n");
|
|
||||||
#endif
|
|
||||||
#else
|
|
||||||
state->client_info.tls_config = altcp_tls_create_config_client(NULL, 0);
|
|
||||||
WARN_printf("Warning: tls without a certificate is insecure\n");
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
|
|
||||||
cyw43_arch_enable_sta_mode();
|
cyw43_arch_enable_sta_mode();
|
||||||
if (cyw43_arch_wifi_connect_timeout_ms(WIFI_SSID, WIFI_PASSWORD, CYW43_AUTH_WPA2_AES_PSK, 30000)) {
|
if (cyw43_arch_wifi_connect_timeout_ms(WIFI_SSID, WIFI_PASSWORD, CYW43_AUTH_WPA2_AES_PSK, 30000)) {
|
||||||
|
|
@ -355,18 +217,7 @@ int main(void) {
|
||||||
}
|
}
|
||||||
INFO_printf("\nConnected to Wifi\n");
|
INFO_printf("\nConnected to Wifi\n");
|
||||||
|
|
||||||
// We are not in a callback so locking is needed when calling lwip
|
start_client(&state);
|
||||||
// Make a DNS request for the MQTT server IP address
|
|
||||||
cyw43_arch_lwip_begin();
|
|
||||||
int err = dns_gethostbyname(MQTT_SERVER, &state.mqtt_server_address, dns_found, &state);
|
|
||||||
cyw43_arch_lwip_end();
|
|
||||||
|
|
||||||
if (err == ERR_OK) {
|
|
||||||
// We have the address, just start the client
|
|
||||||
start_client(&state);
|
|
||||||
} else if (err != ERR_INPROGRESS) { // ERR_INPROGRESS means expect a callback
|
|
||||||
panic("dns request failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!state.connect_done || mqtt_client_is_connected(state.mqtt_client_inst)) {
|
while (!state.connect_done || mqtt_client_is_connected(state.mqtt_client_inst)) {
|
||||||
cyw43_arch_poll();
|
cyw43_arch_poll();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue