diff --git a/config.yml b/config.yml index 17d157d..0e4a279 100644 --- a/config.yml +++ b/config.yml @@ -3,4 +3,4 @@ mqtt: client_id: "weather_portal_dev" user: "homeassistant" # password comes from env -jwt_secret: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod +jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod diff --git a/gleam.toml b/gleam.toml index 1b60035..b2e196a 100644 --- a/gleam.toml +++ b/gleam.toml @@ -22,7 +22,6 @@ spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0" spoke_tcp = ">= 2.0.0 and < 3.0.0" gleam_erlang = ">= 1.3.0 and < 2.0.0" gleam_otp = ">= 1.2.0 and < 2.0.0" -gleam_json = ">= 3.1.0 and < 4.0.0" [dev_dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index b98204c..c381e1a 100644 --- a/manifest.toml +++ b/manifest.toml @@ -8,7 +8,6 @@ packages = [ { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" }, { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, - { name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" }, { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, @@ -26,7 +25,6 @@ packages = [ envoy = { version = ">= 1.1.0 and < 2.0.0" } glaml = { version = ">= 3.0.2 and < 4.0.0" } gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" } -gleam_json = { version = ">= 3.1.0 and < 4.0.0" } gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } diff --git a/src/config.gleam b/src/config.gleam index 3a90c8a..2fecd45 100644 --- a/src/config.gleam +++ b/src/config.gleam @@ -1,6 +1,5 @@ import envoy import glaml -import gleam/option.{type Option, None, Some} import gleam/result import simplifile @@ -10,7 +9,7 @@ pub type Config { mqtt_user: String, mqtt_pw: String, mqtt_client_id: String, - jwt_secret: String, + jwt_key: String, ) } @@ -31,59 +30,71 @@ fn parse_yaml(contents: String) -> Result(glaml.Document, String) { } } -fn get_config_value( - root: glaml.Node, - yaml_key: Option(String), - env_var: Option(String), -) -> String { - let from_env = case env_var { - Some(var) -> envoy.get(var) - None -> Error(Nil) - } - - case from_env { - Ok(val) -> val - Error(_) -> { - // Fall back to YAML if env var not found - case yaml_key { - Some(key) -> { - let assert Ok(node) = glaml.select_sugar(root, key) - let assert glaml.NodeStr(str) = node - str - } - None -> { - panic as { "Config value not found in environment or YAML" } - } - } - } - } -} - -fn compile_config(doc: glaml.Document) -> Config { +fn compile_config(doc: glaml.Document) -> Result(Config, String) { let root = glaml.document_root(doc) - let get = fn(yaml_key: option.Option(String), env_var: option.Option(String)) -> String { - get_config_value(root, yaml_key, env_var) - } - Config( - mqtt_host: get(Some("mqtt.host"), None), - mqtt_pw: get(None, Some("MQTT_PW")), - mqtt_user: get(Some("mqtt.user"), None), - jwt_secret: get(Some("jwt_secret"), Some("JWT_SECRET")), - mqtt_client_id: get(Some("mqtt.client_id"), Some("MQTT_CLIENT_ID")), + // Extract values from YAML + use mqtt_host_result <- result.try( + glaml.select_sugar(root, "mqtt.host") + |> result.map_error(fn(_) { "mqtt.host not found in config.yml" }), ) + use mqtt_user_result <- result.try( + glaml.select_sugar(root, "mqtt.user") + |> result.map_error(fn(_) { "mqtt.user not found in config.yml" }), + ) + use mqtt_client_id_result <- result.try( + glaml.select_sugar(root, "mqtt.client_id") + |> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }), + ) + use jwt_key_result <- result.try( + glaml.select_sugar(root, "jwt_key") + |> result.map_error(fn(_) { "jwt_key not found in config.yml" }), + ) + + // Extract strings from nodes + case + mqtt_host_result, + mqtt_user_result, + jwt_key_result, + mqtt_client_id_result + { + glaml.NodeStr(host), + glaml.NodeStr(user), + glaml.NodeStr(key), + glaml.NodeStr(client_id) + -> { + let yaml_config = + Config( + mqtt_host: host, + mqtt_user: user, + mqtt_pw: "placeholder", + mqtt_client_id: client_id, + jwt_key: key, + ) + + // Override with env vars + let final_config = + Config( + mqtt_host: envoy.get("MQTT_HOST") + |> result.unwrap(yaml_config.mqtt_host), + mqtt_user: envoy.get("MQTT_USER") + |> result.unwrap(yaml_config.mqtt_user), + mqtt_pw: envoy.get("MQTT_PW") + |> result.unwrap(yaml_config.mqtt_pw), + mqtt_client_id: envoy.get("MQTT_CLIENT_ID") + |> result.unwrap(yaml_config.mqtt_client_id), + jwt_key: envoy.get("JWT_KEY") + |> result.unwrap(yaml_config.jwt_key), + ) + + Ok(final_config) + } + _, _, _, _ -> Error("Config values must be strings in config.yml") + } } -fn get_doc() -> Result(glaml.Document, String) { +pub fn load_config() -> Result(Config, String) { use file_content <- result.try(load_file()) use doc <- result.try(parse_yaml(file_content)) - Ok(doc) -} - -pub fn load_config() -> Config { - let doc = get_doc() - case doc { - Ok(doc) -> compile_config(doc) - Error(err) -> panic as { "fail to load config: " <> err } - } + compile_config(doc) } diff --git a/src/mqtt.gleam b/src/mqtt.gleam deleted file mode 100644 index 8358776..0000000 --- a/src/mqtt.gleam +++ /dev/null @@ -1,46 +0,0 @@ -import config -import gleam/erlang/process -import gleam/option.{None, Some} -import readings_broadcaster -import spoke/mqtt -import spoke/mqtt_actor -import spoke/tcp - -pub fn start(cfg: config.Config) -> mqtt_actor.Client { - let client_id = cfg.mqtt_client_id - let password = <> - let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) - let transport = tcp.connector_with_defaults(cfg.mqtt_host) - let connection_options = - mqtt.ConnectOptions( - client_id: client_id, - authentication: Some(auth), - transport_options: transport, - keep_alive_seconds: 120, - server_timeout_ms: 15_000, - ) - let assert Ok(started) = - mqtt_actor.build(connection_options) |> mqtt_actor.start(100) - let client = started.data - - let start_up_checker = process.new_subject() - mqtt_actor.subscribe_to_updates(client, start_up_checker) - mqtt_actor.connect(client, True, None) - let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = - process.receive(start_up_checker, 15_000) - - let assert Ok(broadcaster) = readings_broadcaster.start([]) - mqtt_actor.subscribe_to_updates(client, broadcaster) - - client -} - -pub fn subscribe(client: mqtt_actor.Client, topic: String) { - mqtt_actor.subscribe(client, [ - mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), - ]) -} - -pub fn disconnect(client: mqtt_actor.Client) { - mqtt_actor.disconnect(client) -} diff --git a/src/readings_broadcaster.gleam b/src/readings_broadcaster.gleam deleted file mode 100644 index 1c115eb..0000000 --- a/src/readings_broadcaster.gleam +++ /dev/null @@ -1,66 +0,0 @@ -import gleam/bit_array -import gleam/erlang/process -import gleam/list -import gleam/otp/actor -import sensors -import spoke/mqtt - -// pub type Message { -// MqttUpdate(update: mqtt.Update) -// } - -type State { - State(subscribers: List(process.Subject(sensors.SensorReading))) -} - -fn handle_message( - state: State, - update: mqtt.Update, -) -> actor.Next(State, mqtt.Update) { - echo update - case parse_mqtt_update(update) { - Ok(readings) -> { - // Publish to all subscribers - echo readings - list.each(state.subscribers, fn(subscriber) { - list.each(readings, fn(reading) { process.send(subscriber, reading) }) - }) - actor.continue(state) - } - Error(_) -> actor.continue(state) - } -} - -fn parse_mqtt_update( - update: mqtt.Update, -) -> Result(List(sensors.SensorReading), Nil) { - case update { - mqtt.ReceivedMessage(topic, payload, _) -> { - let assert Ok(payload_str) = bit_array.to_string(payload) - case topic { - "homeassistant/sensor/bws/node1/state1" -> - Ok(sensors.parse_topic_1(payload_str)) - _ -> Error(Nil) - } - } - _ -> Error(Nil) - } -} - -pub fn start( - subscribers: List(process.Subject(sensors.SensorReading)), -) -> Result(process.Subject(mqtt.Update), actor.StartError) { - let initial_state = State(subscribers: subscribers) - let assert Ok(started) = - actor.new(initial_state) - |> actor.on_message(handle_message) - |> actor.start - Ok(started.data) -} -// pub fn subscribe( -// reader: process.Subject(Message), -// ) -> process.Subject(sensors.SensorReading) { -// let mailbox = process.new_subject() -// process.send(reader, Subscribe(mailbox)) -// mailbox -// } diff --git a/src/sensors.gleam b/src/sensors.gleam index ba494df..0cc1df2 100644 --- a/src/sensors.gleam +++ b/src/sensors.gleam @@ -1,7 +1,5 @@ -import gleam/dynamic/decode import gleam/float import gleam/io -import gleam/json pub type Sensor { Temperature @@ -27,29 +25,6 @@ pub fn sensor_name(sensor: Sensor) -> String { } } -pub const topic_1: String = "homeassistant/sensor/bws/node1/state1" - -type Topic1Value { - Topic1Value(temp: Float, humidity: Float, pressure: Float) -} - -/// Parses a JSON string into a list of `SensorReading` values. -/// example input string: {"temp":18.84,"humidity":28.690811,"pressure":944.67} -pub fn parse_topic_1(json_string: String) -> List(SensorReading) { - let decoder = { - use temp <- decode.field("temp", decode.float) - use humidity <- decode.field("humidity", decode.float) - use pressure <- decode.field("pressure", decode.float) - decode.success(Topic1Value(temp:, humidity:, pressure:)) - } - let assert Ok(decoded_val) = json.parse(from: json_string, using: decoder) - [ - SensorReading(sensor: Temperature, value: decoded_val.temp), - SensorReading(sensor: Humidity, value: decoded_val.humidity), - SensorReading(sensor: Pressure, value: decoded_val.pressure), - ] -} - pub fn print_sensor_reading(reading: SensorReading) -> Nil { let sensor_name = sensor_name(reading.sensor) io.println(sensor_name <> ": " <> float.to_string(reading.value)) diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 943b858..2888f43 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,18 +1,55 @@ import config import gleam/erlang/process import gleam/io -import mqtt +import mqtt_dummy +import sensors pub fn main() -> Nil { - let cfg = config.load_config() - io.println("Config loaded successfully!") + case config.load_config() { + Ok(cfg) -> { + io.println("Config loaded successfully!") + io.println("MQTT Host: " <> cfg.mqtt_host) + io.println("MQTT User: " <> cfg.mqtt_user) + io.println("MQTT PW: " <> cfg.mqtt_pw) + io.println("JWT Key: " <> cfg.jwt_key) + } + Error(err) -> { + io.println("Failed to load config: " <> err) + } + } - // Start MQTT, which will forward updates to sensor_reader - let client = mqtt.start(cfg) - let assert Ok(s) = - mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1") - echo s + let assert Ok(subject) = mqtt_dummy.start() + let mailbox = mqtt_dummy.subscribe(subject) - process.sleep_forever() + // Kick off the first message + process.send_after(subject, 1000, mqtt_dummy.Proc) + + receive_and_reschedule(mailbox, subject) + // case process.receive(mailbox, 5000) { + // Ok(msg) -> { + // io.println("Got message:" <> msg) + // // process_messages(mailbox, n - 1) + // } + // Error(Nil) -> { + // io.println("Timeout waiting for message") + // } + // } Nil } + +fn receive_and_reschedule( + mailbox: process.Subject(sensors.SensorReading), + subject: process.Subject(mqtt_dummy.Message), +) -> Nil { + case process.receive(mailbox, 2000) { + Ok(msg) -> { + sensors.print_sensor_reading(msg) + process.send_after(subject, 1000, mqtt_dummy.Proc) + receive_and_reschedule(mailbox, subject) + } + Error(Nil) -> { + io.println("Timeout - waiting again...") + receive_and_reschedule(mailbox, subject) + } + } +} diff --git a/test/config_test.gleam b/test/config_test.gleam index ea18727..1ed224d 100644 --- a/test/config_test.gleam +++ b/test/config_test.gleam @@ -1,5 +1,4 @@ import config -import envoy import gleeunit import gleeunit/should @@ -9,8 +8,7 @@ pub fn main() -> Nil { // gleeunit test functions end in `_test` pub fn file_read_test() { - envoy.set("MQTT_PW", "TEST") - let cfg = config.load_config() + let assert Ok(cfg) = config.load_config() cfg.mqtt_host |> should.equal("192.168.1.11") diff --git a/test/sensors_test.gleam b/test/sensors_test.gleam index 6d02b0c..fd4ce0e 100644 --- a/test/sensors_test.gleam +++ b/test/sensors_test.gleam @@ -6,6 +6,7 @@ pub fn main() -> Nil { gleeunit.main() } +// gleeunit test functions end in `_test` pub fn file_read_test() { let reading = sensors.sensor_name(sensors.Temperature) @@ -16,15 +17,3 @@ pub fn file_read_test() { reading |> should.equal("Temperature 20.5 °C") } - -pub fn parse_topic_1_test() { - let json_string = - "{\"temp\":18.84,\"humidity\":28.690811,\"pressure\":944.67}" - let readings = sensors.parse_topic_1(json_string) - readings - |> should.equal([ - sensors.SensorReading(sensor: sensors.Temperature, value: 18.84), - sensors.SensorReading(sensor: sensors.Humidity, value: 28.690811), - sensors.SensorReading(sensor: sensors.Pressure, value: 944.67), - ]) -}