diff --git a/src/mqtt.gleam b/src/mqtt.gleam index e9e63b6..96fb968 100644 --- a/src/mqtt.gleam +++ b/src/mqtt.gleam @@ -1,47 +1,44 @@ import config import gleam/erlang/process -import gleam/int -import gleam/io -import gleam/option.{None} -import gleam/string +import gleam/option.{None, Some} import spoke/mqtt import spoke/mqtt_actor import spoke/tcp -pub fn start(cfg: config.Config) { +pub fn start( + cfg: config.Config, + updates_subject: process.Subject(mqtt.Update), +) -> mqtt_actor.Client { let client_id = cfg.mqtt_client_id - let topic = "spoke-test" - + let password = <> + let auth = mqtt.AuthDetails(username: cfg.mqtt_user, password: Some(password)) + let transport = tcp.connector_with_defaults(cfg.mqtt_host) + let connection_options = + mqtt.ConnectOptions( + client_id: client_id, + authentication: Some(auth), + transport_options: transport, + keep_alive_seconds: 15, + server_timeout_ms: 5000, + ) let assert Ok(started) = - tcp.connector_with_defaults(cfg.mqtt_host) - |> mqtt.connect_with_id(client_id) - |> mqtt_actor.build() - |> mqtt_actor.start(100) + mqtt_actor.build(connection_options) |> mqtt_actor.start(100) let client = started.data - let updates = process.new_subject() - mqtt_actor.subscribe_to_updates(client, updates) + mqtt_actor.subscribe_to_updates(client, updates_subject) mqtt_actor.connect(client, True, None) let assert Ok(mqtt.ConnectionStateChanged(mqtt.ConnectAccepted(_))) = - process.receive(updates, 5000) + process.receive(updates_subject, 5000) + client +} - let assert Ok(_) = - mqtt_actor.subscribe(client, [ - mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), - ]) - - let message = - mqtt.PublishData( - topic, - <<"Hello from spoke!">>, - mqtt.AtLeastOnce, - retain: False, - ) - mqtt_actor.publish(client, message) - - let message = process.receive(updates, 1000) - io.println(string.inspect(message)) +pub fn subscribe(client: mqtt_actor.Client, topic: String) { + mqtt_actor.subscribe(client, [ + mqtt.SubscribeRequest(topic, mqtt.ExactlyOnce), + ]) +} +pub fn disconnect(client: mqtt_actor.Client) { mqtt_actor.disconnect(client) } diff --git a/src/weather_portal.gleam b/src/weather_portal.gleam index 0e34e98..6d78660 100644 --- a/src/weather_portal.gleam +++ b/src/weather_portal.gleam @@ -1,15 +1,17 @@ import config import gleam/erlang/process import gleam/io - -// import mqtt +import mqtt import mqtt_dummy import sensors pub fn main() -> Nil { - let _cfg = config.load_config() + let cfg = config.load_config() io.println("Config loaded successfully!") + let mqtt_updates = process.new_subject() + let client = mqtt.start(cfg, mqtt_updates) + let assert Ok(dummy_one) = mqtt_dummy.start() let assert Ok(dummy_two) = mqtt_dummy.start() process.send_after(dummy_one, 1000, mqtt_dummy.Proc)