Compare commits

...

6 commits

Author SHA1 Message Date
536344d6c4
add test to parse_topic_1
All checks were successful
Unit Tests / Run Tests (push) Successful in 11s
2026-03-25 10:40:08 +01:00
a1020b80d2
parse sensor json values 2026-03-25 10:38:15 +01:00
6c0711acd5
get mqtt reciving working 2026-03-25 10:01:45 +01:00
45b934b3ad
get real mqtt connection working 2026-03-24 19:56:55 +01:00
1f4942726e
try mutiple dummies 2026-03-24 19:35:57 +01:00
86e40f90a1
refactor config module 2026-03-24 18:56:13 +01:00
10 changed files with 215 additions and 110 deletions

View file

@ -3,4 +3,4 @@ mqtt:
client_id: "weather_portal_dev" client_id: "weather_portal_dev"
user: "homeassistant" user: "homeassistant"
# password comes from env # password comes from env
jwt_key: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod jwt_secret: apuOQZyML+8PixmVDP4nXU4M # loaded from env on prod

View file

@ -22,6 +22,7 @@ spoke_mqtt_actor = ">= 1.1.1 and < 2.0.0"
spoke_tcp = ">= 2.0.0 and < 3.0.0" spoke_tcp = ">= 2.0.0 and < 3.0.0"
gleam_erlang = ">= 1.3.0 and < 2.0.0" gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0" gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_json = ">= 3.1.0 and < 4.0.0"
[dev_dependencies] [dev_dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0" gleeunit = ">= 1.0.0 and < 2.0.0"

View file

@ -8,6 +8,7 @@ packages = [
{ name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" },
{ name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" }, { name = "glaml", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib", "yamerl"], otp_app = "glaml", source = "hex", outer_checksum = "100CA23F526AB159712A3204D200969571FC43B193736B320C1400D410DEE7AD" },
{ name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" }, { name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
{ name = "gleam_json", version = "3.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "44FDAA8847BE8FC48CA7A1C089706BD54BADCC4C45B237A992EDDF9F2CDB2836" },
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" }, { name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
{ name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" }, { name = "gleam_stdlib", version = "0.70.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "86949BF5D1F0E4AC0AB5B06F235D8A5CC11A2DFC33BF22F752156ED61CA7D0FF" },
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" }, { name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
@ -25,6 +26,7 @@ packages = [
envoy = { version = ">= 1.1.0 and < 2.0.0" } envoy = { version = ">= 1.1.0 and < 2.0.0" }
glaml = { version = ">= 3.0.2 and < 4.0.0" } glaml = { version = ">= 3.0.2 and < 4.0.0" }
gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" } gleam_erlang = { version = ">= 1.3.0 and < 2.0.0" }
gleam_json = { version = ">= 3.1.0 and < 4.0.0" }
gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } gleam_otp = { version = ">= 1.2.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" }

View file

@ -1,5 +1,6 @@
import envoy import envoy
import glaml import glaml
import gleam/option.{type Option, None, Some}
import gleam/result import gleam/result
import simplifile import simplifile
@ -9,7 +10,7 @@ pub type Config {
mqtt_user: String, mqtt_user: String,
mqtt_pw: String, mqtt_pw: String,
mqtt_client_id: String, mqtt_client_id: String,
jwt_key: String, jwt_secret: String,
) )
} }
@ -30,71 +31,59 @@ fn parse_yaml(contents: String) -> Result(glaml.Document, String) {
} }
} }
fn compile_config(doc: glaml.Document) -> Result(Config, String) { fn get_config_value(
let root = glaml.document_root(doc) root: glaml.Node,
yaml_key: Option(String),
env_var: Option(String),
) -> String {
let from_env = case env_var {
Some(var) -> envoy.get(var)
None -> Error(Nil)
}
// Extract values from YAML case from_env {
use mqtt_host_result <- result.try( Ok(val) -> val
glaml.select_sugar(root, "mqtt.host") Error(_) -> {
|> result.map_error(fn(_) { "mqtt.host not found in config.yml" }), // Fall back to YAML if env var not found
) case yaml_key {
use mqtt_user_result <- result.try( Some(key) -> {
glaml.select_sugar(root, "mqtt.user") let assert Ok(node) = glaml.select_sugar(root, key)
|> result.map_error(fn(_) { "mqtt.user not found in config.yml" }), let assert glaml.NodeStr(str) = node
) str
use mqtt_client_id_result <- result.try( }
glaml.select_sugar(root, "mqtt.client_id") None -> {
|> result.map_error(fn(_) { "mqtt.client_id not found in config.yml" }), panic as { "Config value not found in environment or YAML" }
) }
use jwt_key_result <- result.try( }
glaml.select_sugar(root, "jwt_key")
|> result.map_error(fn(_) { "jwt_key not found in config.yml" }),
)
// Extract strings from nodes
case
mqtt_host_result,
mqtt_user_result,
jwt_key_result,
mqtt_client_id_result
{
glaml.NodeStr(host),
glaml.NodeStr(user),
glaml.NodeStr(key),
glaml.NodeStr(client_id)
-> {
let yaml_config =
Config(
mqtt_host: host,
mqtt_user: user,
mqtt_pw: "placeholder",
mqtt_client_id: client_id,
jwt_key: key,
)
// Override with env vars
let final_config =
Config(
mqtt_host: envoy.get("MQTT_HOST")
|> result.unwrap(yaml_config.mqtt_host),
mqtt_user: envoy.get("MQTT_USER")
|> result.unwrap(yaml_config.mqtt_user),
mqtt_pw: envoy.get("MQTT_PW")
|> result.unwrap(yaml_config.mqtt_pw),
mqtt_client_id: envoy.get("MQTT_CLIENT_ID")
|> result.unwrap(yaml_config.mqtt_client_id),
jwt_key: envoy.get("JWT_KEY")
|> result.unwrap(yaml_config.jwt_key),
)
Ok(final_config)
} }
_, _, _, _ -> Error("Config values must be strings in config.yml")
} }
} }
pub fn load_config() -> Result(Config, String) { fn compile_config(doc: glaml.Document) -> Config {
let root = glaml.document_root(doc)
let get = fn(yaml_key: option.Option(String), env_var: option.Option(String)) -> String {
get_config_value(root, yaml_key, env_var)
}
Config(
mqtt_host: get(Some("mqtt.host"), None),
mqtt_pw: get(None, Some("MQTT_PW")),
mqtt_user: get(Some("mqtt.user"), None),
jwt_secret: get(Some("jwt_secret"), Some("JWT_SECRET")),
mqtt_client_id: get(Some("mqtt.client_id"), Some("MQTT_CLIENT_ID")),
)
}
fn get_doc() -> Result(glaml.Document, String) {
use file_content <- result.try(load_file()) use file_content <- result.try(load_file())
use doc <- result.try(parse_yaml(file_content)) use doc <- result.try(parse_yaml(file_content))
compile_config(doc) Ok(doc)
}
pub fn load_config() -> Config {
let doc = get_doc()
case doc {
Ok(doc) -> compile_config(doc)
Error(err) -> panic as { "fail to load config: " <> err }
}
} }

46
src/mqtt.gleam Normal file
View file

@ -0,0 +1,46 @@
import config
import gleam/erlang/process
import gleam/option.{None, Some}
import readings_broadcaster
import spoke/mqtt
import spoke/mqtt_actor
import spoke/tcp
pub fn start(cfg: config.Config) -> mqtt_actor.Client {
let client_id = cfg.mqtt_client_id
let password = <<cfg.mqtt_pw:utf8>>
let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password))
let transport = tcp.connector_with_defaults(cfg.mqtt_host)
let connection_options =
mqtt.ConnectOptions(
client_id: client_id,
authentication: Some(auth),
transport_options: transport,
keep_alive_seconds: 120,
server_timeout_ms: 15_000,
)
let assert Ok(started) =
mqtt_actor.build(connection_options) |> mqtt_actor.start(100)
let client = started.data
let start_up_checker = process.new_subject()
mqtt_actor.subscribe_to_updates(client, start_up_checker)
mqtt_actor.connect(client, True, None)
let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) =
process.receive(start_up_checker, 15_000)
let assert Ok(broadcaster) = readings_broadcaster.start([])
mqtt_actor.subscribe_to_updates(client, broadcaster)
client
}
pub fn subscribe(client: mqtt_actor.Client, topic: String) {
mqtt_actor.subscribe(client, [
mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce),
])
}
pub fn disconnect(client: mqtt_actor.Client) {
mqtt_actor.disconnect(client)
}

View file

@ -0,0 +1,66 @@
import gleam/bit_array
import gleam/erlang/process
import gleam/list
import gleam/otp/actor
import sensors
import spoke/mqtt
// pub type Message {
// MqttUpdate(update: mqtt.Update)
// }
type State {
State(subscribers: List(process.Subject(sensors.SensorReading)))
}
fn handle_message(
state: State,
update: mqtt.Update,
) -> actor.Next(State, mqtt.Update) {
echo update
case parse_mqtt_update(update) {
Ok(readings) -> {
// Publish to all subscribers
echo readings
list.each(state.subscribers, fn(subscriber) {
list.each(readings, fn(reading) { process.send(subscriber, reading) })
})
actor.continue(state)
}
Error(_) -> actor.continue(state)
}
}
fn parse_mqtt_update(
update: mqtt.Update,
) -> Result(List(sensors.SensorReading), Nil) {
case update {
mqtt.ReceivedMessage(topic, payload, _) -> {
let assert Ok(payload_str) = bit_array.to_string(payload)
case topic {
"homeassistant/sensor/bws/node1/state1" ->
Ok(sensors.parse_topic_1(payload_str))
_ -> Error(Nil)
}
}
_ -> Error(Nil)
}
}
pub fn start(
subscribers: List(process.Subject(sensors.SensorReading)),
) -> Result(process.Subject(mqtt.Update), actor.StartError) {
let initial_state = State(subscribers: subscribers)
let assert Ok(started) =
actor.new(initial_state)
|> actor.on_message(handle_message)
|> actor.start
Ok(started.data)
}
// pub fn subscribe(
// reader: process.Subject(Message),
// ) -> process.Subject(sensors.SensorReading) {
// let mailbox = process.new_subject()
// process.send(reader, Subscribe(mailbox))
// mailbox
// }

View file

@ -1,5 +1,7 @@
import gleam/dynamic/decode
import gleam/float import gleam/float
import gleam/io import gleam/io
import gleam/json
pub type Sensor { pub type Sensor {
Temperature Temperature
@ -25,6 +27,29 @@ pub fn sensor_name(sensor: Sensor) -> String {
} }
} }
pub const topic_1: String = "homeassistant/sensor/bws/node1/state1"
type Topic1Value {
Topic1Value(temp: Float, humidity: Float, pressure: Float)
}
/// Parses a JSON string into a list of `SensorReading` values.
/// example input string: {"temp":18.84,"humidity":28.690811,"pressure":944.67}
pub fn parse_topic_1(json_string: String) -> List(SensorReading) {
let decoder = {
use temp <- decode.field("temp", decode.float)
use humidity <- decode.field("humidity", decode.float)
use pressure <- decode.field("pressure", decode.float)
decode.success(Topic1Value(temp:, humidity:, pressure:))
}
let assert Ok(decoded_val) = json.parse(from: json_string, using: decoder)
[
SensorReading(sensor: Temperature, value: decoded_val.temp),
SensorReading(sensor: Humidity, value: decoded_val.humidity),
SensorReading(sensor: Pressure, value: decoded_val.pressure),
]
}
pub fn print_sensor_reading(reading: SensorReading) -> Nil { pub fn print_sensor_reading(reading: SensorReading) -> Nil {
let sensor_name = sensor_name(reading.sensor) let sensor_name = sensor_name(reading.sensor)
io.println(sensor_name <> ": " <> float.to_string(reading.value)) io.println(sensor_name <> ": " <> float.to_string(reading.value))

View file

@ -1,55 +1,18 @@
import config import config
import gleam/erlang/process import gleam/erlang/process
import gleam/io import gleam/io
import mqtt_dummy import mqtt
import sensors
pub fn main() -> Nil { pub fn main() -> Nil {
case config.load_config() { let cfg = config.load_config()
Ok(cfg) -> { io.println("Config loaded successfully!")
io.println("Config loaded successfully!")
io.println("MQTT Host: " <> cfg.mqtt_host)
io.println("MQTT User: " <> cfg.mqtt_user)
io.println("MQTT PW: " <> cfg.mqtt_pw)
io.println("JWT Key: " <> cfg.jwt_key)
}
Error(err) -> {
io.println("Failed to load config: " <> err)
}
}
let assert Ok(subject) = mqtt_dummy.start() // Start MQTT, which will forward updates to sensor_reader
let mailbox = mqtt_dummy.subscribe(subject) let client = mqtt.start(cfg)
let assert Ok(s) =
mqtt.subscribe(client, "homeassistant/sensor/bws/node1/state1")
echo s
// Kick off the first message process.sleep_forever()
process.send_after(subject, 1000, mqtt_dummy.Proc)
receive_and_reschedule(mailbox, subject)
// case process.receive(mailbox, 5000) {
// Ok(msg) -> {
// io.println("Got message:" <> msg)
// // process_messages(mailbox, n - 1)
// }
// Error(Nil) -> {
// io.println("Timeout waiting for message")
// }
// }
Nil Nil
} }
fn receive_and_reschedule(
mailbox: process.Subject(sensors.SensorReading),
subject: process.Subject(mqtt_dummy.Message),
) -> Nil {
case process.receive(mailbox, 2000) {
Ok(msg) -> {
sensors.print_sensor_reading(msg)
process.send_after(subject, 1000, mqtt_dummy.Proc)
receive_and_reschedule(mailbox, subject)
}
Error(Nil) -> {
io.println("Timeout - waiting again...")
receive_and_reschedule(mailbox, subject)
}
}
}

View file

@ -1,4 +1,5 @@
import config import config
import envoy
import gleeunit import gleeunit
import gleeunit/should import gleeunit/should
@ -8,7 +9,8 @@ pub fn main() -> Nil {
// gleeunit test functions end in `_test` // gleeunit test functions end in `_test`
pub fn file_read_test() { pub fn file_read_test() {
let assert Ok(cfg) = config.load_config() envoy.set("MQTT_PW", "TEST")
let cfg = config.load_config()
cfg.mqtt_host cfg.mqtt_host
|> should.equal("192.168.1.11") |> should.equal("192.168.1.11")

View file

@ -6,7 +6,6 @@ pub fn main() -> Nil {
gleeunit.main() gleeunit.main()
} }
// gleeunit test functions end in `_test`
pub fn file_read_test() { pub fn file_read_test() {
let reading = let reading =
sensors.sensor_name(sensors.Temperature) sensors.sensor_name(sensors.Temperature)
@ -17,3 +16,15 @@ pub fn file_read_test() {
reading reading
|> should.equal("Temperature 20.5 °C") |> should.equal("Temperature 20.5 °C")
} }
pub fn parse_topic_1_test() {
let json_string =
"{\"temp\":18.84,\"humidity\":28.690811,\"pressure\":944.67}"
let readings = sensors.parse_topic_1(json_string)
readings
|> should.equal([
sensors.SensorReading(sensor: sensors.Temperature, value: 18.84),
sensors.SensorReading(sensor: sensors.Humidity, value: 28.690811),
sensors.SensorReading(sensor: sensors.Pressure, value: 944.67),
])
}