From 1f4942726eefec55a3172ca8f5e7b7f3c9ef2575 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Tue, 24 Mar 2026 19:34:40 +0100 Subject: [PATCH] try mutiple dummies --- src/mqtt.gleam | 47 +++++++++++++++++++++++++++++ src/weather_portal.gleam | 65 ++++++++++++++++++++++++---------------- 2 files changed, 86 insertions(+), 26 deletions(-) create mode 100644 src/mqtt.gleam diff --git a/src/mqtt.gleam b/src/mqtt.gleam new file mode 100644 index 0000000..e9e63b6 --- /dev/null +++ b/src/mqtt.gleam @@ -0,0 +1,47 @@ +import config +import gleam/erlang/process +import gleam/int +import gleam/io +import gleam/option.{None} +import gleam/string +import spoke/mqtt +import spoke/mqtt_actor +import spoke/tcp + +pub fn start(cfg: config.Config) { + let client_id = cfg.mqtt_client_id + let topic = "spoke-test" + + let assert Ok(started) = + tcp.connector_with_defaults(cfg.mqtt_host) + |> mqtt.connect_with_id(client_id) + |> mqtt_actor.build() + |> mqtt_actor.start(100) + let client = started.data + + let updates = process.new_subject() + mqtt_actor.subscribe_to_updates(client, updates) + mqtt_actor.connect(client, True, None) + + let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = + process.receive(updates, 5000) + + let assert Ok(_) = + mqtt_actor.subscribe(client, [ + mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), + ]) + + let message = + mqtt.PublishData( + topic, + <<"Hello from spoke!">>, + mqtt.AtLeastOnce, + retain: False, + ) + mqtt_actor.publish(client, message) + + let message = process.receive(updates, 1000) + io.println(string.inspect(message)) + + mqtt_actor.disconnect(client) +} diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 629dc7c..0e34e98 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,45 +1,58 @@ import config import gleam/erlang/process import gleam/io + +// import mqtt import mqtt_dummy import sensors pub fn main() -> Nil { - let cfg = config.load_config() + let _cfg = config.load_config() io.println("Config loaded successfully!") - let assert Ok(subject) = mqtt_dummy.start() - let mailbox = mqtt_dummy.subscribe(subject) - - // Kick off the first message - process.send_after(subject, 1000, mqtt_dummy.Proc) - - receive_and_reschedule(mailbox, subject) - // case process.receive(mailbox, 5000) { - // Ok(msg) -> { - // io.println("Got message:" <> msg) - // // process_messages(mailbox, n - 1) - // } - // Error(Nil) -> { - // io.println("Timeout waiting for message") - // } - // } + let assert Ok(dummy_one) = mqtt_dummy.start() + let assert Ok(dummy_two) = mqtt_dummy.start() + process.send_after(dummy_one, 1000, mqtt_dummy.Proc) + process.send_after(dummy_two, 1000, mqtt_dummy.Proc) + loop(LoopState( + dummy_one_receive_subject: mqtt_dummy.subscribe(dummy_one), + dummy_one_send_subject: dummy_one, + dummy_two_receive_subject: mqtt_dummy.subscribe(dummy_two), + dummy_two_send_subject: dummy_two, + )) Nil } -fn receive_and_reschedule( - mailbox: process.Subject(sensors.SensorReading), - subject: process.Subject(mqtt_dummy.Message), -) -> Nil { - case process.receive(mailbox, 2000) { +type LoopState { + LoopState( + dummy_one_receive_subject: process.Subject(sensors.SensorReading), + dummy_one_send_subject: process.Subject(mqtt_dummy.Message), + dummy_two_receive_subject: process.Subject(sensors.SensorReading), + dummy_two_send_subject: process.Subject(mqtt_dummy.Message), + ) +} + +fn loop(state: LoopState) -> Nil { + case process.receive(state.dummy_one_receive_subject, 1500) { Ok(msg) -> { sensors.print_sensor_reading(msg) - process.send_after(subject, 1000, mqtt_dummy.Proc) - receive_and_reschedule(mailbox, subject) + process.send_after(state.dummy_one_send_subject, 1000, mqtt_dummy.Proc) + Nil } Error(Nil) -> { - io.println("Timeout - waiting again...") - receive_and_reschedule(mailbox, subject) + io.println("Timeout on dummy one") } } + + case process.receive(state.dummy_two_receive_subject, 1500) { + Ok(msg) -> { + sensors.print_sensor_reading(msg) + process.send_after(state.dummy_two_send_subject, 1000, mqtt_dummy.Proc) + Nil + } + Error(Nil) -> { + io.println("Timeout on dummy two..") + } + } + loop(state) }