Compare commits
6 commits
113384b6ce
...
536344d6c4
| Author | SHA1 | Date | |
|---|---|---|---|
| 536344d6c4 | |||
| a1020b80d2 | |||
| 6c0711acd5 | |||
| 45b934b3ad | |||
| 1f4942726e | |||
| 86e40f90a1 |
10 changed files with 215 additions and 110 deletions
|
|
@ -3,4 +3,4 @@ mqtt:
|
|||
client_id: "weather_portal_dev"
|
||||
user: "homeassistant"
|
||||
# password comes from env
|
||||
jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod
|
||||
jwt_secret: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0"
|
|||
spoke_tcp = ">= 2.0.0 and < 3.0.0"
|
||||
gleam_erlang = ">= 1.3.0 and < 2.0.0"
|
||||
gleam_otp = ">= 1.2.0 and < 2.0.0"
|
||||
gleam_json = ">= 3.1.0 and < 4.0.0"
|
||||
|
||||
[dev_dependencies]
|
||||
gleeunit = ">= 1.0.0 and < 2.0.0"
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ packages = [
|
|||
{ name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" },
|
||||
{ name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" },
|
||||
{ name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
|
||||
{ name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" },
|
||||
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
|
||||
{ name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" },
|
||||
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
|
||||
|
|
@ -25,6 +26,7 @@ packages = [
|
|||
envoy = { version = ">= 1.1.0 and < 2.0.0" }
|
||||
glaml = { version = ">= 3.0.2 and < 4.0.0" }
|
||||
gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" }
|
||||
gleam_json = { version = ">= 3.1.0 and < 4.0.0" }
|
||||
gleam_otp = { version = ">= 1.2.0 and < 2.0.0" }
|
||||
gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
|
||||
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
|
||||
|
|
|
|||
111
src/config.gleam
111
src/config.gleam
|
|
@ -1,5 +1,6 @@
|
|||
import envoy
|
||||
import glaml
|
||||
import gleam/option.{type Option, None, Some}
|
||||
import gleam/result
|
||||
import simplifile
|
||||
|
||||
|
|
@ -9,7 +10,7 @@ pub type Config {
|
|||
mqtt_user: String,
|
||||
mqtt_pw: String,
|
||||
mqtt_client_id: String,
|
||||
jwt_key: String,
|
||||
jwt_secret: String,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -30,71 +31,59 @@ fn parse_yaml(contents: String) -> Result(glaml.Document, String) {
|
|||
}
|
||||
}
|
||||
|
||||
fn compile_config(doc: glaml.Document) -> Result(Config, String) {
|
||||
let root = glaml.document_root(doc)
|
||||
fn get_config_value(
|
||||
root: glaml.Node,
|
||||
yaml_key: Option(String),
|
||||
env_var: Option(String),
|
||||
) -> String {
|
||||
let from_env = case env_var {
|
||||
Some(var) -> envoy.get(var)
|
||||
None -> Error(Nil)
|
||||
}
|
||||
|
||||
// Extract values from YAML
|
||||
use mqtt_host_result <- result.try(
|
||||
glaml.select_sugar(root, "mqtt.host")
|
||||
|> result.map_error(fn(_) { "mqtt.host not found in config.yml" }),
|
||||
)
|
||||
use mqtt_user_result <- result.try(
|
||||
glaml.select_sugar(root, "mqtt.user")
|
||||
|> result.map_error(fn(_) { "mqtt.user not found in config.yml" }),
|
||||
)
|
||||
use mqtt_client_id_result <- result.try(
|
||||
glaml.select_sugar(root, "mqtt.client_id")
|
||||
|> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }),
|
||||
)
|
||||
use jwt_key_result <- result.try(
|
||||
glaml.select_sugar(root, "jwt_key")
|
||||
|> result.map_error(fn(_) { "jwt_key not found in config.yml" }),
|
||||
)
|
||||
|
||||
// Extract strings from nodes
|
||||
case
|
||||
mqtt_host_result,
|
||||
mqtt_user_result,
|
||||
jwt_key_result,
|
||||
mqtt_client_id_result
|
||||
{
|
||||
glaml.NodeStr(host),
|
||||
glaml.NodeStr(user),
|
||||
glaml.NodeStr(key),
|
||||
glaml.NodeStr(client_id)
|
||||
-> {
|
||||
let yaml_config =
|
||||
Config(
|
||||
mqtt_host: host,
|
||||
mqtt_user: user,
|
||||
mqtt_pw: "placeholder",
|
||||
mqtt_client_id: client_id,
|
||||
jwt_key: key,
|
||||
)
|
||||
|
||||
// Override with env vars
|
||||
let final_config =
|
||||
Config(
|
||||
mqtt_host: envoy.get("MQTT_HOST")
|
||||
|> result.unwrap(yaml_config.mqtt_host),
|
||||
mqtt_user: envoy.get("MQTT_USER")
|
||||
|> result.unwrap(yaml_config.mqtt_user),
|
||||
mqtt_pw: envoy.get("MQTT_PW")
|
||||
|> result.unwrap(yaml_config.mqtt_pw),
|
||||
mqtt_client_id: envoy.get("MQTT_CLIENT_ID")
|
||||
|> result.unwrap(yaml_config.mqtt_client_id),
|
||||
jwt_key: envoy.get("JWT_KEY")
|
||||
|> result.unwrap(yaml_config.jwt_key),
|
||||
)
|
||||
|
||||
Ok(final_config)
|
||||
case from_env {
|
||||
Ok(val) -> val
|
||||
Error(_) -> {
|
||||
// Fall back to YAML if env var not found
|
||||
case yaml_key {
|
||||
Some(key) -> {
|
||||
let assert Ok(node) = glaml.select_sugar(root, key)
|
||||
let assert glaml.NodeStr(str) = node
|
||||
str
|
||||
}
|
||||
None -> {
|
||||
panic as { "Config value not found in environment or YAML" }
|
||||
}
|
||||
}
|
||||
}
|
||||
_, _, _, _ -> Error("Config values must be strings in config.yml")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_config() -> Result(Config, String) {
|
||||
fn compile_config(doc: glaml.Document) -> Config {
|
||||
let root = glaml.document_root(doc)
|
||||
let get = fn(yaml_key: option.Option(String), env_var: option.Option(String)) -> String {
|
||||
get_config_value(root, yaml_key, env_var)
|
||||
}
|
||||
|
||||
Config(
|
||||
mqtt_host: get(Some("mqtt.host"), None),
|
||||
mqtt_pw: get(None, Some("MQTT_PW")),
|
||||
mqtt_user: get(Some("mqtt.user"), None),
|
||||
jwt_secret: get(Some("jwt_secret"), Some("JWT_SECRET")),
|
||||
mqtt_client_id: get(Some("mqtt.client_id"), Some("MQTT_CLIENT_ID")),
|
||||
)
|
||||
}
|
||||
|
||||
fn get_doc() -> Result(glaml.Document, String) {
|
||||
use file_content <- result.try(load_file())
|
||||
use doc <- result.try(parse_yaml(file_content))
|
||||
compile_config(doc)
|
||||
Ok(doc)
|
||||
}
|
||||
|
||||
pub fn load_config() -> Config {
|
||||
let doc = get_doc()
|
||||
case doc {
|
||||
Ok(doc) -> compile_config(doc)
|
||||
Error(err) -> panic as { "fail to load config: " <> err }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
46
src/mqtt.gleam
Normal file
46
src/mqtt.gleam
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
import config
|
||||
import gleam/erlang/process
|
||||
import gleam/option.{None, Some}
|
||||
import readings_broadcaster
|
||||
import spoke/mqtt
|
||||
import spoke/mqtt_actor
|
||||
import spoke/tcp
|
||||
|
||||
pub fn start(cfg: config.Config) -> mqtt_actor.Client {
|
||||
let client_id = cfg.mqtt_client_id
|
||||
let password = <<cfg.mqtt_pw:utf8>>
|
||||
let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password))
|
||||
let transport = tcp.connector_with_defaults(cfg.mqtt_host)
|
||||
let connection_options =
|
||||
mqtt.ConnectOptions(
|
||||
client_id: client_id,
|
||||
authentication: Some(auth),
|
||||
transport_options: transport,
|
||||
keep_alive_seconds: 120,
|
||||
server_timeout_ms: 15_000,
|
||||
)
|
||||
let assert Ok(started) =
|
||||
mqtt_actor.build(connection_options) |> mqtt_actor.start(100)
|
||||
let client = started.data
|
||||
|
||||
let start_up_checker = process.new_subject()
|
||||
mqtt_actor.subscribe_to_updates(client, start_up_checker)
|
||||
mqtt_actor.connect(client, True, None)
|
||||
let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) =
|
||||
process.receive(start_up_checker, 15_000)
|
||||
|
||||
let assert Ok(broadcaster) = readings_broadcaster.start([])
|
||||
mqtt_actor.subscribe_to_updates(client, broadcaster)
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
pub fn subscribe(client: mqtt_actor.Client, topic: String) {
|
||||
mqtt_actor.subscribe(client, [
|
||||
mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce),
|
||||
])
|
||||
}
|
||||
|
||||
pub fn disconnect(client: mqtt_actor.Client) {
|
||||
mqtt_actor.disconnect(client)
|
||||
}
|
||||
66
src/readings_broadcaster.gleam
Normal file
66
src/readings_broadcaster.gleam
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
import gleam/bit_array
|
||||
import gleam/erlang/process
|
||||
import gleam/list
|
||||
import gleam/otp/actor
|
||||
import sensors
|
||||
import spoke/mqtt
|
||||
|
||||
// pub type Message {
|
||||
// MqttUpdate(update: mqtt.Update)
|
||||
// }
|
||||
|
||||
type State {
|
||||
State(subscribers: List(process.Subject(sensors.SensorReading)))
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
state: State,
|
||||
update: mqtt.Update,
|
||||
) -> actor.Next(State, mqtt.Update) {
|
||||
echo update
|
||||
case parse_mqtt_update(update) {
|
||||
Ok(readings) -> {
|
||||
// Publish to all subscribers
|
||||
echo readings
|
||||
list.each(state.subscribers, fn(subscriber) {
|
||||
list.each(readings, fn(reading) { process.send(subscriber, reading) })
|
||||
})
|
||||
actor.continue(state)
|
||||
}
|
||||
Error(_) -> actor.continue(state)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_mqtt_update(
|
||||
update: mqtt.Update,
|
||||
) -> Result(List(sensors.SensorReading), Nil) {
|
||||
case update {
|
||||
mqtt.ReceivedMessage(topic, payload, _) -> {
|
||||
let assert Ok(payload_str) = bit_array.to_string(payload)
|
||||
case topic {
|
||||
"homeassistant/sensor/bws/node1/state1" ->
|
||||
Ok(sensors.parse_topic_1(payload_str))
|
||||
_ -> Error(Nil)
|
||||
}
|
||||
}
|
||||
_ -> Error(Nil)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
subscribers: List(process.Subject(sensors.SensorReading)),
|
||||
) -> Result(process.Subject(mqtt.Update), actor.StartError) {
|
||||
let initial_state = State(subscribers: subscribers)
|
||||
let assert Ok(started) =
|
||||
actor.new(initial_state)
|
||||
|> actor.on_message(handle_message)
|
||||
|> actor.start
|
||||
Ok(started.data)
|
||||
}
|
||||
// pub fn subscribe(
|
||||
// reader: process.Subject(Message),
|
||||
// ) -> process.Subject(sensors.SensorReading) {
|
||||
// let mailbox = process.new_subject()
|
||||
// process.send(reader, Subscribe(mailbox))
|
||||
// mailbox
|
||||
// }
|
||||
|
|
@ -1,5 +1,7 @@
|
|||
import gleam/dynamic/decode
|
||||
import gleam/float
|
||||
import gleam/io
|
||||
import gleam/json
|
||||
|
||||
pub type Sensor {
|
||||
Temperature
|
||||
|
|
@ -25,6 +27,29 @@ pub fn sensor_name(sensor: Sensor) -> String {
|
|||
}
|
||||
}
|
||||
|
||||
pub const topic_1: String = "homeassistant/sensor/bws/node1/state1"
|
||||
|
||||
type Topic1Value {
|
||||
Topic1Value(temp: Float, humidity: Float, pressure: Float)
|
||||
}
|
||||
|
||||
/// Parses a JSON string into a list of `SensorReading` values.
|
||||
/// example input string: {"temp":18.84,"humidity":28.690811,"pressure":944.67}
|
||||
pub fn parse_topic_1(json_string: String) -> List(SensorReading) {
|
||||
let decoder = {
|
||||
use temp <- decode.field("temp", decode.float)
|
||||
use humidity <- decode.field("humidity", decode.float)
|
||||
use pressure <- decode.field("pressure", decode.float)
|
||||
decode.success(Topic1Value(temp:, humidity:, pressure:))
|
||||
}
|
||||
let assert Ok(decoded_val) = json.parse(from: json_string, using: decoder)
|
||||
[
|
||||
SensorReading(sensor: Temperature, value: decoded_val.temp),
|
||||
SensorReading(sensor: Humidity, value: decoded_val.humidity),
|
||||
SensorReading(sensor: Pressure, value: decoded_val.pressure),
|
||||
]
|
||||
}
|
||||
|
||||
pub fn print_sensor_reading(reading: SensorReading) -> Nil {
|
||||
let sensor_name = sensor_name(reading.sensor)
|
||||
io.println(sensor_name <> ": " <> float.to_string(reading.value))
|
||||
|
|
|
|||
|
|
@ -1,55 +1,18 @@
|
|||
import config
|
||||
import gleam/erlang/process
|
||||
import gleam/io
|
||||
import mqtt_dummy
|
||||
import sensors
|
||||
import mqtt
|
||||
|
||||
pub fn main() -> Nil {
|
||||
case config.load_config() {
|
||||
Ok(cfg) -> {
|
||||
io.println("Config loaded successfully!")
|
||||
io.println("MQTT Host: " <> cfg.mqtt_host)
|
||||
io.println("MQTT User: " <> cfg.mqtt_user)
|
||||
io.println("MQTT PW: " <> cfg.mqtt_pw)
|
||||
io.println("JWT Key: " <> cfg.jwt_key)
|
||||
}
|
||||
Error(err) -> {
|
||||
io.println("Failed to load config: " <> err)
|
||||
}
|
||||
}
|
||||
let cfg = config.load_config()
|
||||
io.println("Config loaded successfully!")
|
||||
|
||||
let assert Ok(subject) = mqtt_dummy.start()
|
||||
let mailbox = mqtt_dummy.subscribe(subject)
|
||||
// Start MQTT, which will forward updates to sensor_reader
|
||||
let client = mqtt.start(cfg)
|
||||
let assert Ok(s) =
|
||||
mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1")
|
||||
echo s
|
||||
|
||||
// Kick off the first message
|
||||
process.send_after(subject, 1000, mqtt_dummy.Proc)
|
||||
|
||||
receive_and_reschedule(mailbox, subject)
|
||||
// case process.receive(mailbox, 5000) {
|
||||
// Ok(msg) -> {
|
||||
// io.println("Got message:" <> msg)
|
||||
// // process_messages(mailbox, n - 1)
|
||||
// }
|
||||
// Error(Nil) -> {
|
||||
// io.println("Timeout waiting for message")
|
||||
// }
|
||||
// }
|
||||
process.sleep_forever()
|
||||
Nil
|
||||
}
|
||||
|
||||
fn receive_and_reschedule(
|
||||
mailbox: process.Subject(sensors.SensorReading),
|
||||
subject: process.Subject(mqtt_dummy.Message),
|
||||
) -> Nil {
|
||||
case process.receive(mailbox, 2000) {
|
||||
Ok(msg) -> {
|
||||
sensors.print_sensor_reading(msg)
|
||||
process.send_after(subject, 1000, mqtt_dummy.Proc)
|
||||
receive_and_reschedule(mailbox, subject)
|
||||
}
|
||||
Error(Nil) -> {
|
||||
io.println("Timeout - waiting again...")
|
||||
receive_and_reschedule(mailbox, subject)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import config
|
||||
import envoy
|
||||
import gleeunit
|
||||
import gleeunit/should
|
||||
|
||||
|
|
@ -8,7 +9,8 @@ pub fn main() -> Nil {
|
|||
|
||||
// gleeunit test functions end in `_test`
|
||||
pub fn file_read_test() {
|
||||
let assert Ok(cfg) = config.load_config()
|
||||
envoy.set("MQTT_PW", "TEST")
|
||||
let cfg = config.load_config()
|
||||
|
||||
cfg.mqtt_host
|
||||
|> should.equal("192.168.1.11")
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ pub fn main() -> Nil {
|
|||
gleeunit.main()
|
||||
}
|
||||
|
||||
// gleeunit test functions end in `_test`
|
||||
pub fn file_read_test() {
|
||||
let reading =
|
||||
sensors.sensor_name(sensors.Temperature)
|
||||
|
|
@ -17,3 +16,15 @@ pub fn file_read_test() {
|
|||
reading
|
||||
|> should.equal("Temperature 20.5 °C")
|
||||
}
|
||||
|
||||
pub fn parse_topic_1_test() {
|
||||
let json_string =
|
||||
"{\"temp\":18.84,\"humidity\":28.690811,\"pressure\":944.67}"
|
||||
let readings = sensors.parse_topic_1(json_string)
|
||||
readings
|
||||
|> should.equal([
|
||||
sensors.SensorReading(sensor: sensors.Temperature, value: 18.84),
|
||||
sensors.SensorReading(sensor: sensors.Humidity, value: 28.690811),
|
||||
sensors.SensorReading(sensor: sensors.Pressure, value: 944.67),
|
||||
])
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue