From af4f5a850c1a521fcef160e65ac829cf33a12d1a Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Tue, 24 Mar 2026 09:47:29 +0100 Subject: [PATCH] create an mqtt_dummy that sends string every second --- config.yml | 1 + gleam.toml | 5 +++++ manifest.toml | 15 +++++++++++++ src/config.gleam | 30 +++++++++++++++++++++---- src/mqtt_dummy.gleam | 48 ++++++++++++++++++++++++++++++++++++++++ src/weather_portal.gleam | 38 +++++++++++++++++++++++++++++++ 6 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 src/mqtt_dummy.gleam diff --git a/config.yml b/config.yml index d5c7d7c..0e4a279 100644 --- a/config.yml +++ b/config.yml @@ -1,5 +1,6 @@ mqtt: host: "192.168.1.11" + client_id: "weather_portal_dev" user: "homeassistant" # password comes from env jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod diff --git a/gleam.toml b/gleam.toml index 4e7d401..b2e196a 100644 --- a/gleam.toml +++ b/gleam.toml @@ -17,6 +17,11 @@ gleam_stdlib = ">= 0.44.0 and < 2.0.0" simplifile = ">= 2.4.0 and < 3.0.0" glaml = ">= 3.0.2 and < 4.0.0" envoy = ">= 1.1.0 and < 2.0.0" +spoke_mqtt = ">= 1.0.0 and < 2.0.0" +spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0" +spoke_tcp = ">= 2.0.0 and < 3.0.0" +gleam_erlang = ">= 1.3.0 and < 2.0.0" +gleam_otp = ">= 1.2.0 and < 2.0.0" [dev_dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 891a587..c381e1a 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,18 +2,33 @@ # You typically do not need to edit this file packages = [ + { name = "drift", version = "1.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "drift", source = "hex", outer_checksum = "30FB0312FF93ABC3A71791B4B8C06B357BFC09E8185CB5CA94B3CCFCEF09D54B" }, + { name = "drift_actor", version = "2.0.1", build_tools = ["gleam"], requirements = ["drift", "gleam_erlang", "gleam_otp", "gleam_stdlib"], otp_app = "drift_actor", source = "hex", outer_checksum = "6C15D215F3C8A26AE8531CB1ED2EFA16A8816D4946D9E91765D477D66F24BE90" }, { name = "envoy", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "envoy", source = "hex", outer_checksum = "850DA9D29D2E5987735872A2B5C81035146D7FE19EFC486129E44440D03FD832" }, { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" }, + { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, + { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, + { name = "mug", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "mug", source = "hex", outer_checksum = "C01279D98E40371DA23461774B63F0E3581B8F1396049D881B0C7EB32799D93F" }, { name = "simplifile", version = "2.4.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "7C18AFA4FED0B4CE1FA5B0B4BAC1FA1744427054EA993565F6F3F82E5453170D" }, + { name = "spoke_core", version = "1.0.1", build_tools = ["gleam"], requirements = ["drift", "gleam_stdlib", "spoke_mqtt", "spoke_packet"], otp_app = "spoke_core", source = "hex", outer_checksum = "B4092B5D0912936E3504AF903324D283D537C5126ED7FF50D8D12597E69224F1" }, + { name = "spoke_mqtt", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "spoke_packet"], otp_app = "spoke_mqtt", source = "hex", outer_checksum = "44480FEE8F416839CE6F9C43938A84604F802B0FA318E1C0A9434AD02DB0D5F4" }, + { name = "spoke_mqtt_actor", version = "1.1.1", build_tools = ["gleam"], requirements = ["drift", "drift_actor", "gleam_erlang", "gleam_otp", "gleam_stdlib", "spoke_core", "spoke_mqtt"], otp_app = "spoke_mqtt_actor", source = "hex", outer_checksum = "26FF4FB00796EF41323F14498AD2F901F571470B221E8B811C9852F07419A7A9" }, + { name = "spoke_packet", version = "1.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "spoke_packet", source = "hex", outer_checksum = "DE74578D34FB8E52B2CE18D7266C3DFC28DE3E88E9AEF415121FA9EB67D57EC3" }, + { name = "spoke_tcp", version = "2.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib", "mug", "spoke_core", "spoke_mqtt_actor"], otp_app = "spoke_tcp", source = "hex", outer_checksum = "B2CFDF5658D995200B5085E5FE224457E64CA3B3C3F89C335F65B14431B67AE2" }, { name = "yamerl", version = "0.10.0", build_tools = ["rebar3"], requirements = [], otp_app = "yamerl", source = "hex", outer_checksum = "346ADB2963F1051DC837A2364E4ACF6EB7D80097C0F53CBDC3046EC8EC4B4E6E" }, ] [requirements] envoy = { version = ">= 1.1.0 and < 2.0.0" } glaml = { version = ">= 3.0.2 and < 4.0.0" } +gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" } +gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } simplifile = { version = ">= 2.4.0 and < 3.0.0" } +spoke_mqtt = { version = ">= 1.0.0 and < 2.0.0" } +spoke_mqtt_actor = { version = ">= 1.1.1 and < 2.0.0" } +spoke_tcp = { version = ">= 2.0.0 and < 3.0.0" } diff --git a/src/config.gleam b/src/config.gleam index e60aa98..2fecd45 100644 --- a/src/config.gleam +++ b/src/config.gleam @@ -4,7 +4,13 @@ import gleam/result import simplifile pub type Config { - Config(mqtt_host: String, mqtt_user: String, mqtt_pw: String, jwt_key: String) + Config( + mqtt_host: String, + mqtt_user: String, + mqtt_pw: String, + mqtt_client_id: String, + jwt_key: String, + ) } fn load_file() -> Result(String, String) { @@ -36,19 +42,33 @@ fn compile_config(doc: glaml.Document) -> Result(Config, String) { glaml.select_sugar(root, "mqtt.user") |> result.map_error(fn(_) { "mqtt.user not found in config.yml" }), ) + use mqtt_client_id_result <- result.try( + glaml.select_sugar(root, "mqtt.client_id") + |> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }), + ) use jwt_key_result <- result.try( glaml.select_sugar(root, "jwt_key") |> result.map_error(fn(_) { "jwt_key not found in config.yml" }), ) // Extract strings from nodes - case mqtt_host_result, mqtt_user_result, jwt_key_result { - glaml.NodeStr(host), glaml.NodeStr(user), glaml.NodeStr(key) -> { + case + mqtt_host_result, + mqtt_user_result, + jwt_key_result, + mqtt_client_id_result + { + glaml.NodeStr(host), + glaml.NodeStr(user), + glaml.NodeStr(key), + glaml.NodeStr(client_id) + -> { let yaml_config = Config( mqtt_host: host, mqtt_user: user, mqtt_pw: "placeholder", + mqtt_client_id: client_id, jwt_key: key, ) @@ -61,13 +81,15 @@ fn compile_config(doc: glaml.Document) -> Result(Config, String) { |> result.unwrap(yaml_config.mqtt_user), mqtt_pw: envoy.get("MQTT_PW") |> result.unwrap(yaml_config.mqtt_pw), + mqtt_client_id: envoy.get("MQTT_CLIENT_ID") + |> result.unwrap(yaml_config.mqtt_client_id), jwt_key: envoy.get("JWT_KEY") |> result.unwrap(yaml_config.jwt_key), ) Ok(final_config) } - _, _, _ -> Error("Config values must be strings in config.yml") + _, _, _, _ -> Error("Config values must be strings in config.yml") } } diff --git a/src/mqtt_dummy.gleam b/src/mqtt_dummy.gleam new file mode 100644 index 0000000..5a3fdd7 --- /dev/null +++ b/src/mqtt_dummy.gleam @@ -0,0 +1,48 @@ +import gleam/erlang/process +import gleam/list +import gleam/otp/actor + +pub type Subscriber = + process.Subject(String) + +pub type Message { + Subscribe(subscriber: Subscriber) + Proc +} + +pub type Subscribers = + List(Subscriber) + +fn handle_message( + subscribers: Subscribers, + message: Message, +) -> actor.Next(Subscribers, Message) { + case message { + Subscribe(subscriber) -> { + let new_subscribers = [subscriber, ..subscribers] + actor.continue(new_subscribers) + } + Proc -> { + list.each(subscribers, fn(subscriber) { + process.send(subscriber, "Hello from dummy mqtt action") + }) + actor.continue(subscribers) + } + } +} + +// Subscribe to receive ticks from the dummy actor +pub fn subscribe(actor_ref: process.Subject(Message)) -> Subscriber { + let mailbox = process.new_subject() + process.send(actor_ref, Subscribe(mailbox)) + mailbox +} + +// Start the actor +pub fn start() -> Result(process.Subject(Message), actor.StartError) { + let assert Ok(started) = + actor.new([]) + |> actor.on_message(handle_message) + |> actor.start + Ok(started.data) +} diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 623775b..1627003 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,5 +1,7 @@ import config +import gleam/erlang/process import gleam/io +import mqtt_dummy pub fn main() -> Nil { case config.load_config() { @@ -14,4 +16,40 @@ pub fn main() -> Nil { io.println("Failed to load config: " <> err) } } + + let assert Ok(subject) = mqtt_dummy.start() + let mailbox = mqtt_dummy.subscribe(subject) + + // Kick off the first message + process.send_after(subject, 1000, mqtt_dummy.Proc) + + receive_and_reschedule(mailbox, subject) + // case process.receive(mailbox, 5000) { + // Ok(msg) -> { + // io.println("Got message:" <> msg) + // // process_messages(mailbox, n - 1) + // } + // Error(Nil) -> { + // io.println("Timeout waiting for message") + // } + // } + Nil +} + +fn receive_and_reschedule( + mailbox: process.Subject(String), + subject: process.Subject(mqtt_dummy.Message), +) -> Nil { + case process.receive(mailbox, 2000) { + Ok(msg) -> { + io.println("Got message: " <> msg) + // Reschedule the next Proc message + process.send_after(subject, 1000, mqtt_dummy.Proc) + receive_and_reschedule(mailbox, subject) + } + Error(Nil) -> { + io.println("Timeout - waiting again...") + receive_and_reschedule(mailbox, subject) + } + } }