weather-portal/src/readings_broadcaster.gleam

66 lines
1.6 KiB
Gleam

import gleam/bit_array
import gleam/erlang/process
import gleam/list
import gleam/otp/actor
import sensors
import spoke/mqtt
// pub type Message {
// MqttUpdate(update: mqtt.Update)
// }
type State {
State(subscribers: List(process.Subject(sensors.SensorReading)))
}
fn handle_message(
state: State,
update: mqtt.Update,
) -> actor.Next(State, mqtt.Update) {
echo update
case parse_mqtt_update(update) {
Ok(readings) -> {
// Publish to all subscribers
echo readings
list.each(state.subscribers, fn(subscriber) {
list.each(readings, fn(reading) { process.send(subscriber, reading) })
})
actor.continue(state)
}
Error(_) -> actor.continue(state)
}
}
fn parse_mqtt_update(
update: mqtt.Update,
) -> Result(List(sensors.SensorReading), Nil) {
case update {
mqtt.ReceivedMessage(topic, payload, _) -> {
let assert Ok(payload_str) = bit_array.to_string(payload)
case topic {
"homeassistant/sensor/bws/node1/state1" ->
Ok(sensors.parse_topic_1(payload_str))
_ -> Error(Nil)
}
}
_ -> Error(Nil)
}
}
pub fn start(
subscribers: List(process.Subject(sensors.SensorReading)),
) -> Result(process.Subject(mqtt.Update), actor.StartError) {
let initial_state = State(subscribers: subscribers)
let assert Ok(started) =
actor.new(initial_state)
|> actor.on_message(handle_message)
|> actor.start
Ok(started.data)
}
// pub fn subscribe(
// reader: process.Subject(Message),
// ) -> process.Subject(sensors.SensorReading) {
// let mailbox = process.new_subject()
// process.send(reader, Subscribe(mailbox))
// mailbox
// }