From 6c0711acd57a68fe963f1e56a04a353ec8fbc341 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Wed, 25 Mar 2026 08:52:34 +0100 Subject: [PATCH] get mqtt reciving working --- src/mqtt.gleam | 20 ++++++------ src/readings_broadcaster.gleam | 56 ++++++++++++++++++++++++++++++++++ src/sensors.gleam | 2 ++ src/weather_portal.gleam | 54 ++++---------------------------- 4 files changed, 75 insertions(+), 57 deletions(-) create mode 100644 src/readings_broadcaster.gleam diff --git a/src/mqtt.gleam b/src/mqtt.gleam index 96fb968..8358776 100644 --- a/src/mqtt.gleam +++ b/src/mqtt.gleam @@ -1,14 +1,12 @@ import config import gleam/erlang/process import gleam/option.{None, Some} +import readings_broadcaster import spoke/mqtt import spoke/mqtt_actor import spoke/tcp -pub fn start( - cfg: config.Config, - updates_subject: process.Subject(mqtt.Update), -) -> mqtt_actor.Client { +pub fn start(cfg: config.Config) -> mqtt_actor.Client { let client_id = cfg.mqtt_client_id let password = <> let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) @@ -18,18 +16,22 @@ pub fn start( client_id: client_id, authentication: Some(auth), transport_options: transport, - keep_alive_seconds: 15, - server_timeout_ms: 5000, + keep_alive_seconds: 120, + server_timeout_ms: 15_000, ) let assert Ok(started) = mqtt_actor.build(connection_options) |> mqtt_actor.start(100) let client = started.data - mqtt_actor.subscribe_to_updates(client, updates_subject) + let start_up_checker = process.new_subject() + mqtt_actor.subscribe_to_updates(client, start_up_checker) mqtt_actor.connect(client, True, None) - let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = - process.receive(updates_subject, 5000) + process.receive(start_up_checker, 15_000) + + let assert Ok(broadcaster) = readings_broadcaster.start([]) + mqtt_actor.subscribe_to_updates(client, broadcaster) + client } diff --git a/src/readings_broadcaster.gleam b/src/readings_broadcaster.gleam new file mode 100644 index 0000000..840e8e5 --- /dev/null +++ b/src/readings_broadcaster.gleam @@ -0,0 +1,56 @@ +import gleam/erlang/process +import gleam/io +import gleam/list +import gleam/otp/actor +import sensors +import spoke/mqtt + +// pub type Message { +// MqttUpdate(update: mqtt.Update) +// } + +type State { + State(subscribers: List(process.Subject(sensors.SensorReading))) +} + +fn handle_message( + state: State, + update: mqtt.Update, +) -> actor.Next(State, mqtt.Update) { + case parse_mqtt_update(update) { + Ok(reading) -> { + // Publish to all subscribers + list.each(state.subscribers, fn(subscriber) { + process.send(subscriber, reading) + }) + actor.continue(state) + } + Error(_) -> actor.continue(state) + } +} + +fn parse_mqtt_update(update: mqtt.Update) -> Result(sensors.SensorReading, Nil) { + io.println("GOT MQTT MESSAGE!") + echo update + // TODO: Implement parsing logic based on your MQTT topic/payload structure + // For now, return an error + Error(Nil) +} + +pub fn start( + subscribers: List(process.Subject(sensors.SensorReading)), +) -> Result(process.Subject(mqtt.Update), actor.StartError) { + let initial_state = State(subscribers: subscribers) + let assert Ok(started) = + actor.new(initial_state) + |> actor.on_message(handle_message) + |> actor.start + Ok(started.data) +} +// pub fn subscribe( +// reader: process.Subject(Message), +// ) -> process.Subject(sensors.SensorReading) { +// let mailbox = process.new_subject() +// process.send(reader, Subscribe(mailbox)) +// mailbox +// } diff --git a/src/sensors.gleam b/src/sensors.gleam index 0cc1df2..9e05cde 100644 --- a/src/sensors.gleam +++ b/src/sensors.gleam @@ -25,6 +25,8 @@ pub fn sensor_name(sensor: Sensor) -> String { } } +// homeassistant/sensor/bws/node1/state1 + pub fn print_sensor_reading(reading: SensorReading) -> Nil { let sensor_name = sensor_name(reading.sensor) io.println(sensor_name <> ": " <> float.to_string(reading.value)) diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 6d78660..943b858 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -2,59 +2,17 @@ import config import gleam/erlang/process import gleam/io import mqtt -import mqtt_dummy -import sensors pub fn main() -> Nil { let cfg = config.load_config() io.println("Config loaded successfully!") - let mqtt_updates = process.new_subject() - let client = mqtt.start(cfg, mqtt_updates) + // Start MQTT, which will forward updates to sensor_reader + let client = mqtt.start(cfg) + let assert Ok(s) = + mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1") + echo s - let assert Ok(dummy_one) = mqtt_dummy.start() - let assert Ok(dummy_two) = mqtt_dummy.start() - process.send_after(dummy_one, 1000, mqtt_dummy.Proc) - process.send_after(dummy_two, 1000, mqtt_dummy.Proc) - loop(LoopState( - dummy_one_receive_subject: mqtt_dummy.subscribe(dummy_one), - dummy_one_send_subject: dummy_one, - dummy_two_receive_subject: mqtt_dummy.subscribe(dummy_two), - dummy_two_send_subject: dummy_two, - )) + process.sleep_forever() Nil } - -type LoopState { - LoopState( - dummy_one_receive_subject: process.Subject(sensors.SensorReading), - dummy_one_send_subject: process.Subject(mqtt_dummy.Message), - dummy_two_receive_subject: process.Subject(sensors.SensorReading), - dummy_two_send_subject: process.Subject(mqtt_dummy.Message), - ) -} - -fn loop(state: LoopState) -> Nil { - case process.receive(state.dummy_one_receive_subject, 1500) { - Ok(msg) -> { - sensors.print_sensor_reading(msg) - process.send_after(state.dummy_one_send_subject, 1000, mqtt_dummy.Proc) - Nil - } - Error(Nil) -> { - io.println("Timeout on dummy one") - } - } - - case process.receive(state.dummy_two_receive_subject, 1500) { - Ok(msg) -> { - sensors.print_sensor_reading(msg) - process.send_after(state.dummy_two_send_subject, 1000, mqtt_dummy.Proc) - Nil - } - Error(Nil) -> { - io.println("Timeout on dummy two..") - } - } - loop(state) -}