From 86e40f90a1e70f94a2e49d15d937b8c37959d209 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Tue, 24 Mar 2026 11:50:04 +0100 Subject: [PATCH 1/6] refactor config module --- config.yml | 2 +- src/config.gleam | 115 ++++++++++++++++++--------------------- src/weather_portal.gleam | 14 +---- test/config_test.gleam | 4 +- test/sensors_test.gleam | 1 - 5 files changed, 60 insertions(+), 76 deletions(-) diff --git a/config.yml b/config.yml index 0e4a279..17d157d 100644 --- a/config.yml +++ b/config.yml @@ -3,4 +3,4 @@ mqtt: client_id: "weather_portal_dev" user: "homeassistant" # password comes from env -jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod +jwt_secret: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod diff --git a/src/config.gleam b/src/config.gleam index 2fecd45..9891a17 100644 --- a/src/config.gleam +++ b/src/config.gleam @@ -1,5 +1,10 @@ import envoy import glaml +import gleam/dict +import gleam/erlang/node +import gleam/io +import gleam/list +import gleam/option.{type Option, None, Some} import gleam/result import simplifile @@ -9,7 +14,7 @@ pub type Config { mqtt_user: String, mqtt_pw: String, mqtt_client_id: String, - jwt_key: String, + jwt_secret: String, ) } @@ -30,71 +35,59 @@ fn parse_yaml(contents: String) -> Result(glaml.Document, String) { } } -fn compile_config(doc: glaml.Document) -> Result(Config, String) { - let root = glaml.document_root(doc) +fn get_config_value( + root: glaml.Node, + yaml_key: Option(String), + env_var: Option(String), +) -> String { + let from_env = case env_var { + Some(var) -> envoy.get(var) + None -> Error(Nil) + } - // Extract values from YAML - use mqtt_host_result <- result.try( - glaml.select_sugar(root, "mqtt.host") - |> result.map_error(fn(_) { "mqtt.host not found in config.yml" }), - ) - use mqtt_user_result <- result.try( - glaml.select_sugar(root, "mqtt.user") - |> result.map_error(fn(_) { "mqtt.user not found in config.yml" }), - ) - use mqtt_client_id_result <- result.try( - glaml.select_sugar(root, "mqtt.client_id") - |> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }), - ) - use jwt_key_result <- result.try( - glaml.select_sugar(root, "jwt_key") - |> result.map_error(fn(_) { "jwt_key not found in config.yml" }), - ) - - // Extract strings from nodes - case - mqtt_host_result, - mqtt_user_result, - jwt_key_result, - mqtt_client_id_result - { - glaml.NodeStr(host), - glaml.NodeStr(user), - glaml.NodeStr(key), - glaml.NodeStr(client_id) - -> { - let yaml_config = - Config( - mqtt_host: host, - mqtt_user: user, - mqtt_pw: "placeholder", - mqtt_client_id: client_id, - jwt_key: key, - ) - - // Override with env vars - let final_config = - Config( - mqtt_host: envoy.get("MQTT_HOST") - |> result.unwrap(yaml_config.mqtt_host), - mqtt_user: envoy.get("MQTT_USER") - |> result.unwrap(yaml_config.mqtt_user), - mqtt_pw: envoy.get("MQTT_PW") - |> result.unwrap(yaml_config.mqtt_pw), - mqtt_client_id: envoy.get("MQTT_CLIENT_ID") - |> result.unwrap(yaml_config.mqtt_client_id), - jwt_key: envoy.get("JWT_KEY") - |> result.unwrap(yaml_config.jwt_key), - ) - - Ok(final_config) + case from_env { + Ok(val) -> val + Error(_) -> { + // Fall back to YAML if env var not found + case yaml_key { + Some(key) -> { + let assert Ok(node) = glaml.select_sugar(root, key) + let assert glaml.NodeStr(str) = node + str + } + None -> { + panic as { "Config value not found in environment or YAML" } + } + } } - _, _, _, _ -> Error("Config values must be strings in config.yml") } } -pub fn load_config() -> Result(Config, String) { +fn compile_config(doc: glaml.Document) -> Config { + let root = glaml.document_root(doc) + let get = fn(yaml_key: option.Option(String), env_var: option.Option(String)) -> String { + get_config_value(root, yaml_key, env_var) + } + + Config( + mqtt_host: get(Some("mqtt.host"), None), + mqtt_pw: get(None, Some("MQTT_PW")), + mqtt_user: get(Some("mqtt.user"), None), + jwt_secret: get(Some("jwt_secret"), Some("JWT_SECRET")), + mqtt_client_id: get(Some("mqtt.client_id"), Some("MQTT_CLIENT_ID")), + ) +} + +fn get_doc() -> Result(glaml.Document, String) { use file_content <- result.try(load_file()) use doc <- result.try(parse_yaml(file_content)) - compile_config(doc) + Ok(doc) +} + +pub fn load_config() -> Config { + let doc = get_doc() + case doc { + Ok(doc) -> compile_config(doc) + Error(err) -> panic as { "fail to load config: " <> err } + } } diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 2888f43..629dc7c 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -5,18 +5,8 @@ import mqtt_dummy import sensors pub fn main() -> Nil { - case config.load_config() { - Ok(cfg) -> { - io.println("Config loaded successfully!") - io.println("MQTT Host: " <> cfg.mqtt_host) - io.println("MQTT User: " <> cfg.mqtt_user) - io.println("MQTT PW: " <> cfg.mqtt_pw) - io.println("JWT Key: " <> cfg.jwt_key) - } - Error(err) -> { - io.println("Failed to load config: " <> err) - } - } + let cfg = config.load_config() + io.println("Config loaded successfully!") let assert Ok(subject) = mqtt_dummy.start() let mailbox = mqtt_dummy.subscribe(subject) diff --git a/test/config_test.gleam b/test/config_test.gleam index 1ed224d..ea18727 100644 --- a/test/config_test.gleam +++ b/test/config_test.gleam @@ -1,4 +1,5 @@ import config +import envoy import gleeunit import gleeunit/should @@ -8,7 +9,8 @@ pub fn main() -> Nil { // gleeunit test functions end in `_test` pub fn file_read_test() { - let assert Ok(cfg) = config.load_config() + envoy.set("MQTT_PW", "TEST") + let cfg = config.load_config() cfg.mqtt_host |> should.equal("192.168.1.11") diff --git a/test/sensors_test.gleam b/test/sensors_test.gleam index fd4ce0e..f9f9d97 100644 --- a/test/sensors_test.gleam +++ b/test/sensors_test.gleam @@ -6,7 +6,6 @@ pub fn main() -> Nil { gleeunit.main() } -// gleeunit test functions end in `_test` pub fn file_read_test() { let reading = sensors.sensor_name(sensors.Temperature) From 1f4942726eefec55a3172ca8f5e7b7f3c9ef2575 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Tue, 24 Mar 2026 19:34:40 +0100 Subject: [PATCH 2/6] try mutiple dummies --- src/mqtt.gleam | 47 +++++++++++++++++++++++++++++ src/weather_portal.gleam | 65 ++++++++++++++++++++++++---------------- 2 files changed, 86 insertions(+), 26 deletions(-) create mode 100644 src/mqtt.gleam diff --git a/src/mqtt.gleam b/src/mqtt.gleam new file mode 100644 index 0000000..e9e63b6 --- /dev/null +++ b/src/mqtt.gleam @@ -0,0 +1,47 @@ +import config +import gleam/erlang/process +import gleam/int +import gleam/io +import gleam/option.{None} +import gleam/string +import spoke/mqtt +import spoke/mqtt_actor +import spoke/tcp + +pub fn start(cfg: config.Config) { + let client_id = cfg.mqtt_client_id + let topic = "spoke-test" + + let assert Ok(started) = + tcp.connector_with_defaults(cfg.mqtt_host) + |> mqtt.connect_with_id(client_id) + |> mqtt_actor.build() + |> mqtt_actor.start(100) + let client = started.data + + let updates = process.new_subject() + mqtt_actor.subscribe_to_updates(client, updates) + mqtt_actor.connect(client, True, None) + + let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = + process.receive(updates, 5000) + + let assert Ok(_) = + mqtt_actor.subscribe(client, [ + mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), + ]) + + let message = + mqtt.PublishData( + topic, + <<"Hello from spoke!">>, + mqtt.AtLeastOnce, + retain: False, + ) + mqtt_actor.publish(client, message) + + let message = process.receive(updates, 1000) + io.println(string.inspect(message)) + + mqtt_actor.disconnect(client) +} diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 629dc7c..0e34e98 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,45 +1,58 @@ import config import gleam/erlang/process import gleam/io + +// import mqtt import mqtt_dummy import sensors pub fn main() -> Nil { - let cfg = config.load_config() + let _cfg = config.load_config() io.println("Config loaded successfully!") - let assert Ok(subject) = mqtt_dummy.start() - let mailbox = mqtt_dummy.subscribe(subject) - - // Kick off the first message - process.send_after(subject, 1000, mqtt_dummy.Proc) - - receive_and_reschedule(mailbox, subject) - // case process.receive(mailbox, 5000) { - // Ok(msg) -> { - // io.println("Got message:" <> msg) - // // process_messages(mailbox, n - 1) - // } - // Error(Nil) -> { - // io.println("Timeout waiting for message") - // } - // } + let assert Ok(dummy_one) = mqtt_dummy.start() + let assert Ok(dummy_two) = mqtt_dummy.start() + process.send_after(dummy_one, 1000, mqtt_dummy.Proc) + process.send_after(dummy_two, 1000, mqtt_dummy.Proc) + loop(LoopState( + dummy_one_receive_subject: mqtt_dummy.subscribe(dummy_one), + dummy_one_send_subject: dummy_one, + dummy_two_receive_subject: mqtt_dummy.subscribe(dummy_two), + dummy_two_send_subject: dummy_two, + )) Nil } -fn receive_and_reschedule( - mailbox: process.Subject(sensors.SensorReading), - subject: process.Subject(mqtt_dummy.Message), -) -> Nil { - case process.receive(mailbox, 2000) { +type LoopState { + LoopState( + dummy_one_receive_subject: process.Subject(sensors.SensorReading), + dummy_one_send_subject: process.Subject(mqtt_dummy.Message), + dummy_two_receive_subject: process.Subject(sensors.SensorReading), + dummy_two_send_subject: process.Subject(mqtt_dummy.Message), + ) +} + +fn loop(state: LoopState) -> Nil { + case process.receive(state.dummy_one_receive_subject, 1500) { Ok(msg) -> { sensors.print_sensor_reading(msg) - process.send_after(subject, 1000, mqtt_dummy.Proc) - receive_and_reschedule(mailbox, subject) + process.send_after(state.dummy_one_send_subject, 1000, mqtt_dummy.Proc) + Nil } Error(Nil) -> { - io.println("Timeout - waiting again...") - receive_and_reschedule(mailbox, subject) + io.println("Timeout on dummy one") } } + + case process.receive(state.dummy_two_receive_subject, 1500) { + Ok(msg) -> { + sensors.print_sensor_reading(msg) + process.send_after(state.dummy_two_send_subject, 1000, mqtt_dummy.Proc) + Nil + } + Error(Nil) -> { + io.println("Timeout on dummy two..") + } + } + loop(state) } From 45b934b3adaa3f698068fe56eb26df407cefa5cf Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Tue, 24 Mar 2026 19:43:52 +0100 Subject: [PATCH 3/6] get real mqtt connection working --- src/mqtt.gleam | 57 +++++++++++++++++++--------------------- src/weather_portal.gleam | 8 +++--- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/src/mqtt.gleam b/src/mqtt.gleam index e9e63b6..96fb968 100644 --- a/src/mqtt.gleam +++ b/src/mqtt.gleam @@ -1,47 +1,44 @@ import config import gleam/erlang/process -import gleam/int -import gleam/io -import gleam/option.{None} -import gleam/string +import gleam/option.{None, Some} import spoke/mqtt import spoke/mqtt_actor import spoke/tcp -pub fn start(cfg: config.Config) { +pub fn start( + cfg: config.Config, + updates_subject: process.Subject(mqtt.Update), +) -> mqtt_actor.Client { let client_id = cfg.mqtt_client_id - let topic = "spoke-test" - + let password = <> + let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) + let transport = tcp.connector_with_defaults(cfg.mqtt_host) + let connection_options = + mqtt.ConnectOptions( + client_id: client_id, + authentication: Some(auth), + transport_options: transport, + keep_alive_seconds: 15, + server_timeout_ms: 5000, + ) let assert Ok(started) = - tcp.connector_with_defaults(cfg.mqtt_host) - |> mqtt.connect_with_id(client_id) - |> mqtt_actor.build() - |> mqtt_actor.start(100) + mqtt_actor.build(connection_options) |> mqtt_actor.start(100) let client = started.data - let updates = process.new_subject() - mqtt_actor.subscribe_to_updates(client, updates) + mqtt_actor.subscribe_to_updates(client, updates_subject) mqtt_actor.connect(client, True, None) let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = - process.receive(updates, 5000) + process.receive(updates_subject, 5000) + client +} - let assert Ok(_) = - mqtt_actor.subscribe(client, [ - mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), - ]) - - let message = - mqtt.PublishData( - topic, - <<"Hello from spoke!">>, - mqtt.AtLeastOnce, - retain: False, - ) - mqtt_actor.publish(client, message) - - let message = process.receive(updates, 1000) - io.println(string.inspect(message)) +pub fn subscribe(client: mqtt_actor.Client, topic: String) { + mqtt_actor.subscribe(client, [ + mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), + ]) +} +pub fn disconnect(client: mqtt_actor.Client) { mqtt_actor.disconnect(client) } diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 0e34e98..6d78660 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,15 +1,17 @@ import config import gleam/erlang/process import gleam/io - -// import mqtt +import mqtt import mqtt_dummy import sensors pub fn main() -> Nil { - let _cfg = config.load_config() + let cfg = config.load_config() io.println("Config loaded successfully!") + let mqtt_updates = process.new_subject() + let client = mqtt.start(cfg, mqtt_updates) + let assert Ok(dummy_one) = mqtt_dummy.start() let assert Ok(dummy_two) = mqtt_dummy.start() process.send_after(dummy_one, 1000, mqtt_dummy.Proc) From 6c0711acd57a68fe963f1e56a04a353ec8fbc341 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Wed, 25 Mar 2026 08:52:34 +0100 Subject: [PATCH 4/6] get mqtt reciving working --- src/mqtt.gleam | 20 ++++++------ src/readings_broadcaster.gleam | 56 ++++++++++++++++++++++++++++++++++ src/sensors.gleam | 2 ++ src/weather_portal.gleam | 54 ++++---------------------------- 4 files changed, 75 insertions(+), 57 deletions(-) create mode 100644 src/readings_broadcaster.gleam diff --git a/src/mqtt.gleam b/src/mqtt.gleam index 96fb968..8358776 100644 --- a/src/mqtt.gleam +++ b/src/mqtt.gleam @@ -1,14 +1,12 @@ import config import gleam/erlang/process import gleam/option.{None, Some} +import readings_broadcaster import spoke/mqtt import spoke/mqtt_actor import spoke/tcp -pub fn start( - cfg: config.Config, - updates_subject: process.Subject(mqtt.Update), -) -> mqtt_actor.Client { +pub fn start(cfg: config.Config) -> mqtt_actor.Client { let client_id = cfg.mqtt_client_id let password = <> let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) @@ -18,18 +16,22 @@ pub fn start( client_id: client_id, authentication: Some(auth), transport_options: transport, - keep_alive_seconds: 15, - server_timeout_ms: 5000, + keep_alive_seconds: 120, + server_timeout_ms: 15_000, ) let assert Ok(started) = mqtt_actor.build(connection_options) |> mqtt_actor.start(100) let client = started.data - mqtt_actor.subscribe_to_updates(client, updates_subject) + let start_up_checker = process.new_subject() + mqtt_actor.subscribe_to_updates(client, start_up_checker) mqtt_actor.connect(client, True, None) - let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = - process.receive(updates_subject, 5000) + process.receive(start_up_checker, 15_000) + + let assert Ok(broadcaster) = readings_broadcaster.start([]) + mqtt_actor.subscribe_to_updates(client, broadcaster) + client } diff --git a/src/readings_broadcaster.gleam b/src/readings_broadcaster.gleam new file mode 100644 index 0000000..840e8e5 --- /dev/null +++ b/src/readings_broadcaster.gleam @@ -0,0 +1,56 @@ +import gleam/erlang/process +import gleam/io +import gleam/list +import gleam/otp/actor +import sensors +import spoke/mqtt + +// pub type Message { +// MqttUpdate(update: mqtt.Update) +// } + +type State { + State(subscribers: List(process.Subject(sensors.SensorReading))) +} + +fn handle_message( + state: State, + update: mqtt.Update, +) -> actor.Next(State, mqtt.Update) { + case parse_mqtt_update(update) { + Ok(reading) -> { + // Publish to all subscribers + list.each(state.subscribers, fn(subscriber) { + process.send(subscriber, reading) + }) + actor.continue(state) + } + Error(_) -> actor.continue(state) + } +} + +fn parse_mqtt_update(update: mqtt.Update) -> Result(sensors.SensorReading, Nil) { + io.println("GOT MQTT MESSAGE!") + echo update + // TODO: Implement parsing logic based on your MQTT topic/payload structure + // For now, return an error + Error(Nil) +} + +pub fn start( + subscribers: List(process.Subject(sensors.SensorReading)), +) -> Result(process.Subject(mqtt.Update), actor.StartError) { + let initial_state = State(subscribers: subscribers) + let assert Ok(started) = + actor.new(initial_state) + |> actor.on_message(handle_message) + |> actor.start + Ok(started.data) +} +// pub fn subscribe( +// reader: process.Subject(Message), +// ) -> process.Subject(sensors.SensorReading) { +// let mailbox = process.new_subject() +// process.send(reader, Subscribe(mailbox)) +// mailbox +// } diff --git a/src/sensors.gleam b/src/sensors.gleam index 0cc1df2..9e05cde 100644 --- a/src/sensors.gleam +++ b/src/sensors.gleam @@ -25,6 +25,8 @@ pub fn sensor_name(sensor: Sensor) -> String { } } +// homeassistant/sensor/bws/node1/state1 + pub fn print_sensor_reading(reading: SensorReading) -> Nil { let sensor_name = sensor_name(reading.sensor) io.println(sensor_name <> ": " <> float.to_string(reading.value)) diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 6d78660..943b858 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -2,59 +2,17 @@ import config import gleam/erlang/process import gleam/io import mqtt -import mqtt_dummy -import sensors pub fn main() -> Nil { let cfg = config.load_config() io.println("Config loaded successfully!") - let mqtt_updates = process.new_subject() - let client = mqtt.start(cfg, mqtt_updates) + // Start MQTT, which will forward updates to sensor_reader + let client = mqtt.start(cfg) + let assert Ok(s) = + mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1") + echo s - let assert Ok(dummy_one) = mqtt_dummy.start() - let assert Ok(dummy_two) = mqtt_dummy.start() - process.send_after(dummy_one, 1000, mqtt_dummy.Proc) - process.send_after(dummy_two, 1000, mqtt_dummy.Proc) - loop(LoopState( - dummy_one_receive_subject: mqtt_dummy.subscribe(dummy_one), - dummy_one_send_subject: dummy_one, - dummy_two_receive_subject: mqtt_dummy.subscribe(dummy_two), - dummy_two_send_subject: dummy_two, - )) + process.sleep_forever() Nil } - -type LoopState { - LoopState( - dummy_one_receive_subject: process.Subject(sensors.SensorReading), - dummy_one_send_subject: process.Subject(mqtt_dummy.Message), - dummy_two_receive_subject: process.Subject(sensors.SensorReading), - dummy_two_send_subject: process.Subject(mqtt_dummy.Message), - ) -} - -fn loop(state: LoopState) -> Nil { - case process.receive(state.dummy_one_receive_subject, 1500) { - Ok(msg) -> { - sensors.print_sensor_reading(msg) - process.send_after(state.dummy_one_send_subject, 1000, mqtt_dummy.Proc) - Nil - } - Error(Nil) -> { - io.println("Timeout on dummy one") - } - } - - case process.receive(state.dummy_two_receive_subject, 1500) { - Ok(msg) -> { - sensors.print_sensor_reading(msg) - process.send_after(state.dummy_two_send_subject, 1000, mqtt_dummy.Proc) - Nil - } - Error(Nil) -> { - io.println("Timeout on dummy two..") - } - } - loop(state) -} From a1020b80d2af4aa0dc361d6a6793397262b2cf36 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Wed, 25 Mar 2026 10:04:40 +0100 Subject: [PATCH 5/6] parse sensor json values --- gleam.toml | 1 + manifest.toml | 2 ++ src/config.gleam | 4 ---- src/readings_broadcaster.gleam | 28 +++++++++++++++++++--------- src/sensors.gleam | 25 ++++++++++++++++++++++++- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/gleam.toml b/gleam.toml index b2e196a..1b60035 100644 --- a/gleam.toml +++ b/gleam.toml @@ -22,6 +22,7 @@ spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0" spoke_tcp = ">= 2.0.0 and < 3.0.0" gleam_erlang = ">= 1.3.0 and < 2.0.0" gleam_otp = ">= 1.2.0 and < 2.0.0" +gleam_json = ">= 3.1.0 and < 4.0.0" [dev_dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index c381e1a..b98204c 100644 --- a/manifest.toml +++ b/manifest.toml @@ -8,6 +8,7 @@ packages = [ { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" }, { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, + { name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" }, { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, @@ -25,6 +26,7 @@ packages = [ envoy = { version = ">= 1.1.0 and < 2.0.0" } glaml = { version = ">= 3.0.2 and < 4.0.0" } gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" } +gleam_json = { version = ">= 3.1.0 and < 4.0.0" } gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } diff --git a/src/config.gleam b/src/config.gleam index 9891a17..3a90c8a 100644 --- a/src/config.gleam +++ b/src/config.gleam @@ -1,9 +1,5 @@ import envoy import glaml -import gleam/dict -import gleam/erlang/node -import gleam/io -import gleam/list import gleam/option.{type Option, None, Some} import gleam/result import simplifile diff --git a/src/readings_broadcaster.gleam b/src/readings_broadcaster.gleam index 840e8e5..1c115eb 100644 --- a/src/readings_broadcaster.gleam +++ b/src/readings_broadcaster.gleam @@ -1,5 +1,5 @@ +import gleam/bit_array import gleam/erlang/process -import gleam/io import gleam/list import gleam/otp/actor import sensors @@ -17,11 +17,13 @@ fn handle_message( state: State, update: mqtt.Update, ) -> actor.Next(State, mqtt.Update) { + echo update case parse_mqtt_update(update) { - Ok(reading) -> { + Ok(readings) -> { // Publish to all subscribers + echo readings list.each(state.subscribers, fn(subscriber) { - process.send(subscriber, reading) + list.each(readings, fn(reading) { process.send(subscriber, reading) }) }) actor.continue(state) } @@ -29,12 +31,20 @@ fn handle_message( } } -fn parse_mqtt_update(update: mqtt.Update) -> Result(sensors.SensorReading, Nil) { - io.println("GOT MQTT MESSAGE!") - echo update - // TODO: Implement parsing logic based on your MQTT topic/payload structure - // For now, return an error - Error(Nil) +fn parse_mqtt_update( + update: mqtt.Update, +) -> Result(List(sensors.SensorReading), Nil) { + case update { + mqtt.ReceivedMessage(topic, payload, _) -> { + let assert Ok(payload_str) = bit_array.to_string(payload) + case topic { + "homeassistant/sensor/bws/node1/state1" -> + Ok(sensors.parse_topic_1(payload_str)) + _ -> Error(Nil) + } + } + _ -> Error(Nil) + } } pub fn start( diff --git a/src/sensors.gleam b/src/sensors.gleam index 9e05cde..ba494df 100644 --- a/src/sensors.gleam +++ b/src/sensors.gleam @@ -1,5 +1,7 @@ +import gleam/dynamic/decode import gleam/float import gleam/io +import gleam/json pub type Sensor { Temperature @@ -25,7 +27,28 @@ pub fn sensor_name(sensor: Sensor) -> String { } } -// homeassistant/sensor/bws/node1/state1 +pub const topic_1: String = "homeassistant/sensor/bws/node1/state1" + +type Topic1Value { + Topic1Value(temp: Float, humidity: Float, pressure: Float) +} + +/// Parses a JSON string into a list of `SensorReading` values. +/// example input string: {"temp":18.84,"humidity":28.690811,"pressure":944.67} +pub fn parse_topic_1(json_string: String) -> List(SensorReading) { + let decoder = { + use temp <- decode.field("temp", decode.float) + use humidity <- decode.field("humidity", decode.float) + use pressure <- decode.field("pressure", decode.float) + decode.success(Topic1Value(temp:, humidity:, pressure:)) + } + let assert Ok(decoded_val) = json.parse(from: json_string, using: decoder) + [ + SensorReading(sensor: Temperature, value: decoded_val.temp), + SensorReading(sensor: Humidity, value: decoded_val.humidity), + SensorReading(sensor: Pressure, value: decoded_val.pressure), + ] +} pub fn print_sensor_reading(reading: SensorReading) -> Nil { let sensor_name = sensor_name(reading.sensor) From 536344d6c4dcab063c8f3bf7b146d25a50f5aee8 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Wed, 25 Mar 2026 10:39:55 +0100 Subject: [PATCH 6/6] add test to parse_topic_1 --- test/sensors_test.gleam | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/sensors_test.gleam b/test/sensors_test.gleam index f9f9d97..6d02b0c 100644 --- a/test/sensors_test.gleam +++ b/test/sensors_test.gleam @@ -16,3 +16,15 @@ pub fn file_read_test() { reading |> should.equal("Temperature 20.5 °C") } + +pub fn parse_topic_1_test() { + let json_string = + "{\"temp\":18.84,\"humidity\":28.690811,\"pressure\":944.67}" + let readings = sensors.parse_topic_1(json_string) + readings + |> should.equal([ + sensors.SensorReading(sensor: sensors.Temperature, value: 18.84), + sensors.SensorReading(sensor: sensors.Humidity, value: 28.690811), + sensors.SensorReading(sensor: sensors.Pressure, value: 944.67), + ]) +}