get mqtt reciving working
This commit is contained in:
parent
45b934b3ad
commit
6c0711acd5
4 changed files with 75 additions and 57 deletions
56
src/readings_broadcaster.gleam
Normal file
56
src/readings_broadcaster.gleam
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
import gleam/erlang/process
|
||||
import gleam/io
|
||||
import gleam/list
|
||||
import gleam/otp/actor
|
||||
import sensors
|
||||
import spoke/mqtt
|
||||
|
||||
// pub type Message {
|
||||
// MqttUpdate(update: mqtt.Update)
|
||||
// }
|
||||
|
||||
type State {
|
||||
State(subscribers: List(process.Subject(sensors.SensorReading)))
|
||||
}
|
||||
|
||||
fn handle_message(
|
||||
state: State,
|
||||
update: mqtt.Update,
|
||||
) -> actor.Next(State, mqtt.Update) {
|
||||
case parse_mqtt_update(update) {
|
||||
Ok(reading) -> {
|
||||
// Publish to all subscribers
|
||||
list.each(state.subscribers, fn(subscriber) {
|
||||
process.send(subscriber, reading)
|
||||
})
|
||||
actor.continue(state)
|
||||
}
|
||||
Error(_) -> actor.continue(state)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_mqtt_update(update: mqtt.Update) -> Result(sensors.SensorReading, Nil) {
|
||||
io.println("GOT MQTT MESSAGE!")
|
||||
echo update
|
||||
// TODO: Implement parsing logic based on your MQTT topic/payload structure
|
||||
// For now, return an error
|
||||
Error(Nil)
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
subscribers: List(process.Subject(sensors.SensorReading)),
|
||||
) -> Result(process.Subject(mqtt.Update), actor.StartError) {
|
||||
let initial_state = State(subscribers: subscribers)
|
||||
let assert Ok(started) =
|
||||
actor.new(initial_state)
|
||||
|> actor.on_message(handle_message)
|
||||
|> actor.start
|
||||
Ok(started.data)
|
||||
}
|
||||
// pub fn subscribe(
|
||||
// reader: process.Subject(Message),
|
||||
// ) -> process.Subject(sensors.SensorReading) {
|
||||
// let mailbox = process.new_subject()
|
||||
// process.send(reader, Subscribe(mailbox))
|
||||
// mailbox
|
||||
// }
|
||||
Loading…
Add table
Add a link
Reference in a new issue