diff --git a/config.yml b/config.yml index 0e4a279..17d157d 100644 --- a/config.yml +++ b/config.yml @@ -3,4 +3,4 @@ mqtt: client_id: "weather_portal_dev" user: "homeassistant" # password comes from env -jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod +jwt_secret: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod diff --git a/gleam.toml b/gleam.toml index b2e196a..1b60035 100644 --- a/gleam.toml +++ b/gleam.toml @@ -22,6 +22,7 @@ spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0" spoke_tcp = ">= 2.0.0 and < 3.0.0" gleam_erlang = ">= 1.3.0 and < 2.0.0" gleam_otp = ">= 1.2.0 and < 2.0.0" +gleam_json = ">= 3.1.0 and < 4.0.0" [dev_dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index c381e1a..b98204c 100644 --- a/manifest.toml +++ b/manifest.toml @@ -8,6 +8,7 @@ packages = [ { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" }, { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, + { name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" }, { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, @@ -25,6 +26,7 @@ packages = [ envoy = { version = ">= 1.1.0 and < 2.0.0" } glaml = { version = ">= 3.0.2 and < 4.0.0" } gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" } +gleam_json = { version = ">= 3.1.0 and < 4.0.0" } gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } diff --git a/src/config.gleam b/src/config.gleam index 2fecd45..3a90c8a 100644 --- a/src/config.gleam +++ b/src/config.gleam @@ -1,5 +1,6 @@ import envoy import glaml +import gleam/option.{type Option, None, Some} import gleam/result import simplifile @@ -9,7 +10,7 @@ pub type Config { mqtt_user: String, mqtt_pw: String, mqtt_client_id: String, - jwt_key: String, + jwt_secret: String, ) } @@ -30,71 +31,59 @@ fn parse_yaml(contents: String) -> Result(glaml.Document, String) { } } -fn compile_config(doc: glaml.Document) -> Result(Config, String) { - let root = glaml.document_root(doc) +fn get_config_value( + root: glaml.Node, + yaml_key: Option(String), + env_var: Option(String), +) -> String { + let from_env = case env_var { + Some(var) -> envoy.get(var) + None -> Error(Nil) + } - // Extract values from YAML - use mqtt_host_result <- result.try( - glaml.select_sugar(root, "mqtt.host") - |> result.map_error(fn(_) { "mqtt.host not found in config.yml" }), - ) - use mqtt_user_result <- result.try( - glaml.select_sugar(root, "mqtt.user") - |> result.map_error(fn(_) { "mqtt.user not found in config.yml" }), - ) - use mqtt_client_id_result <- result.try( - glaml.select_sugar(root, "mqtt.client_id") - |> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }), - ) - use jwt_key_result <- result.try( - glaml.select_sugar(root, "jwt_key") - |> result.map_error(fn(_) { "jwt_key not found in config.yml" }), - ) - - // Extract strings from nodes - case - mqtt_host_result, - mqtt_user_result, - jwt_key_result, - mqtt_client_id_result - { - glaml.NodeStr(host), - glaml.NodeStr(user), - glaml.NodeStr(key), - glaml.NodeStr(client_id) - -> { - let yaml_config = - Config( - mqtt_host: host, - mqtt_user: user, - mqtt_pw: "placeholder", - mqtt_client_id: client_id, - jwt_key: key, - ) - - // Override with env vars - let final_config = - Config( - mqtt_host: envoy.get("MQTT_HOST") - |> result.unwrap(yaml_config.mqtt_host), - mqtt_user: envoy.get("MQTT_USER") - |> result.unwrap(yaml_config.mqtt_user), - mqtt_pw: envoy.get("MQTT_PW") - |> result.unwrap(yaml_config.mqtt_pw), - mqtt_client_id: envoy.get("MQTT_CLIENT_ID") - |> result.unwrap(yaml_config.mqtt_client_id), - jwt_key: envoy.get("JWT_KEY") - |> result.unwrap(yaml_config.jwt_key), - ) - - Ok(final_config) + case from_env { + Ok(val) -> val + Error(_) -> { + // Fall back to YAML if env var not found + case yaml_key { + Some(key) -> { + let assert Ok(node) = glaml.select_sugar(root, key) + let assert glaml.NodeStr(str) = node + str + } + None -> { + panic as { "Config value not found in environment or YAML" } + } + } } - _, _, _, _ -> Error("Config values must be strings in config.yml") } } -pub fn load_config() -> Result(Config, String) { +fn compile_config(doc: glaml.Document) -> Config { + let root = glaml.document_root(doc) + let get = fn(yaml_key: option.Option(String), env_var: option.Option(String)) -> String { + get_config_value(root, yaml_key, env_var) + } + + Config( + mqtt_host: get(Some("mqtt.host"), None), + mqtt_pw: get(None, Some("MQTT_PW")), + mqtt_user: get(Some("mqtt.user"), None), + jwt_secret: get(Some("jwt_secret"), Some("JWT_SECRET")), + mqtt_client_id: get(Some("mqtt.client_id"), Some("MQTT_CLIENT_ID")), + ) +} + +fn get_doc() -> Result(glaml.Document, String) { use file_content <- result.try(load_file()) use doc <- result.try(parse_yaml(file_content)) - compile_config(doc) + Ok(doc) +} + +pub fn load_config() -> Config { + let doc = get_doc() + case doc { + Ok(doc) -> compile_config(doc) + Error(err) -> panic as { "fail to load config: " <> err } + } } diff --git a/src/mqtt.gleam b/src/mqtt.gleam new file mode 100644 index 0000000..8358776 --- /dev/null +++ b/src/mqtt.gleam @@ -0,0 +1,46 @@ +import config +import gleam/erlang/process +import gleam/option.{None, Some} +import readings_broadcaster +import spoke/mqtt +import spoke/mqtt_actor +import spoke/tcp + +pub fn start(cfg: config.Config) -> mqtt_actor.Client { + let client_id = cfg.mqtt_client_id + let password = <> + let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) + let transport = tcp.connector_with_defaults(cfg.mqtt_host) + let connection_options = + mqtt.ConnectOptions( + client_id: client_id, + authentication: Some(auth), + transport_options: transport, + keep_alive_seconds: 120, + server_timeout_ms: 15_000, + ) + let assert Ok(started) = + mqtt_actor.build(connection_options) |> mqtt_actor.start(100) + let client = started.data + + let start_up_checker = process.new_subject() + mqtt_actor.subscribe_to_updates(client, start_up_checker) + mqtt_actor.connect(client, True, None) + let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = + process.receive(start_up_checker, 15_000) + + let assert Ok(broadcaster) = readings_broadcaster.start([]) + mqtt_actor.subscribe_to_updates(client, broadcaster) + + client +} + +pub fn subscribe(client: mqtt_actor.Client, topic: String) { + mqtt_actor.subscribe(client, [ + mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), + ]) +} + +pub fn disconnect(client: mqtt_actor.Client) { + mqtt_actor.disconnect(client) +} diff --git a/src/readings_broadcaster.gleam b/src/readings_broadcaster.gleam new file mode 100644 index 0000000..1c115eb --- /dev/null +++ b/src/readings_broadcaster.gleam @@ -0,0 +1,66 @@ +import gleam/bit_array +import gleam/erlang/process +import gleam/list +import gleam/otp/actor +import sensors +import spoke/mqtt + +// pub type Message { +// MqttUpdate(update: mqtt.Update) +// } + +type State { + State(subscribers: List(process.Subject(sensors.SensorReading))) +} + +fn handle_message( + state: State, + update: mqtt.Update, +) -> actor.Next(State, mqtt.Update) { + echo update + case parse_mqtt_update(update) { + Ok(readings) -> { + // Publish to all subscribers + echo readings + list.each(state.subscribers, fn(subscriber) { + list.each(readings, fn(reading) { process.send(subscriber, reading) }) + }) + actor.continue(state) + } + Error(_) -> actor.continue(state) + } +} + +fn parse_mqtt_update( + update: mqtt.Update, +) -> Result(List(sensors.SensorReading), Nil) { + case update { + mqtt.ReceivedMessage(topic, payload, _) -> { + let assert Ok(payload_str) = bit_array.to_string(payload) + case topic { + "homeassistant/sensor/bws/node1/state1" -> + Ok(sensors.parse_topic_1(payload_str)) + _ -> Error(Nil) + } + } + _ -> Error(Nil) + } +} + +pub fn start( + subscribers: List(process.Subject(sensors.SensorReading)), +) -> Result(process.Subject(mqtt.Update), actor.StartError) { + let initial_state = State(subscribers: subscribers) + let assert Ok(started) = + actor.new(initial_state) + |> actor.on_message(handle_message) + |> actor.start + Ok(started.data) +} +// pub fn subscribe( +// reader: process.Subject(Message), +// ) -> process.Subject(sensors.SensorReading) { +// let mailbox = process.new_subject() +// process.send(reader, Subscribe(mailbox)) +// mailbox +// } diff --git a/src/sensors.gleam b/src/sensors.gleam index 0cc1df2..ba494df 100644 --- a/src/sensors.gleam +++ b/src/sensors.gleam @@ -1,5 +1,7 @@ +import gleam/dynamic/decode import gleam/float import gleam/io +import gleam/json pub type Sensor { Temperature @@ -25,6 +27,29 @@ pub fn sensor_name(sensor: Sensor) -> String { } } +pub const topic_1: String = "homeassistant/sensor/bws/node1/state1" + +type Topic1Value { + Topic1Value(temp: Float, humidity: Float, pressure: Float) +} + +/// Parses a JSON string into a list of `SensorReading` values. +/// example input string: {"temp":18.84,"humidity":28.690811,"pressure":944.67} +pub fn parse_topic_1(json_string: String) -> List(SensorReading) { + let decoder = { + use temp <- decode.field("temp", decode.float) + use humidity <- decode.field("humidity", decode.float) + use pressure <- decode.field("pressure", decode.float) + decode.success(Topic1Value(temp:, humidity:, pressure:)) + } + let assert Ok(decoded_val) = json.parse(from: json_string, using: decoder) + [ + SensorReading(sensor: Temperature, value: decoded_val.temp), + SensorReading(sensor: Humidity, value: decoded_val.humidity), + SensorReading(sensor: Pressure, value: decoded_val.pressure), + ] +} + pub fn print_sensor_reading(reading: SensorReading) -> Nil { let sensor_name = sensor_name(reading.sensor) io.println(sensor_name <> ": " <> float.to_string(reading.value)) diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 2888f43..943b858 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,55 +1,18 @@ import config import gleam/erlang/process import gleam/io -import mqtt_dummy -import sensors +import mqtt pub fn main() -> Nil { - case config.load_config() { - Ok(cfg) -> { - io.println("Config loaded successfully!") - io.println("MQTT Host: " <> cfg.mqtt_host) - io.println("MQTT User: " <> cfg.mqtt_user) - io.println("MQTT PW: " <> cfg.mqtt_pw) - io.println("JWT Key: " <> cfg.jwt_key) - } - Error(err) -> { - io.println("Failed to load config: " <> err) - } - } + let cfg = config.load_config() + io.println("Config loaded successfully!") - let assert Ok(subject) = mqtt_dummy.start() - let mailbox = mqtt_dummy.subscribe(subject) + // Start MQTT, which will forward updates to sensor_reader + let client = mqtt.start(cfg) + let assert Ok(s) = + mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1") + echo s - // Kick off the first message - process.send_after(subject, 1000, mqtt_dummy.Proc) - - receive_and_reschedule(mailbox, subject) - // case process.receive(mailbox, 5000) { - // Ok(msg) -> { - // io.println("Got message:" <> msg) - // // process_messages(mailbox, n - 1) - // } - // Error(Nil) -> { - // io.println("Timeout waiting for message") - // } - // } + process.sleep_forever() Nil } - -fn receive_and_reschedule( - mailbox: process.Subject(sensors.SensorReading), - subject: process.Subject(mqtt_dummy.Message), -) -> Nil { - case process.receive(mailbox, 2000) { - Ok(msg) -> { - sensors.print_sensor_reading(msg) - process.send_after(subject, 1000, mqtt_dummy.Proc) - receive_and_reschedule(mailbox, subject) - } - Error(Nil) -> { - io.println("Timeout - waiting again...") - receive_and_reschedule(mailbox, subject) - } - } -} diff --git a/test/config_test.gleam b/test/config_test.gleam index 1ed224d..ea18727 100644 --- a/test/config_test.gleam +++ b/test/config_test.gleam @@ -1,4 +1,5 @@ import config +import envoy import gleeunit import gleeunit/should @@ -8,7 +9,8 @@ pub fn main() -> Nil { // gleeunit test functions end in `_test` pub fn file_read_test() { - let assert Ok(cfg) = config.load_config() + envoy.set("MQTT_PW", "TEST") + let cfg = config.load_config() cfg.mqtt_host |> should.equal("192.168.1.11") diff --git a/test/sensors_test.gleam b/test/sensors_test.gleam index fd4ce0e..6d02b0c 100644 --- a/test/sensors_test.gleam +++ b/test/sensors_test.gleam @@ -6,7 +6,6 @@ pub fn main() -> Nil { gleeunit.main() } -// gleeunit test functions end in `_test` pub fn file_read_test() { let reading = sensors.sensor_name(sensors.Temperature) @@ -17,3 +16,15 @@ pub fn file_read_test() { reading |> should.equal("Temperature 20.5 °C") } + +pub fn parse_topic_1_test() { + let json_string = + "{\"temp\":18.84,\"humidity\":28.690811,\"pressure\":944.67}" + let readings = sensors.parse_topic_1(json_string) + readings + |> should.equal([ + sensors.SensorReading(sensor: sensors.Temperature, value: 18.84), + sensors.SensorReading(sensor: sensors.Humidity, value: 28.690811), + sensors.SensorReading(sensor: sensors.Pressure, value: 944.67), + ]) +}