From aa5a4eda051874d88f6221023d18690bc73179eb Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Wed, 24 Dec 2025 21:34:34 +0100 Subject: [PATCH] get event proxy working on local including mqtt message sending --- event_proxy/docker-compose.yml | 17 +++ event_proxy/go.mod | 7 ++ event_proxy/go.sum | 8 ++ .../home_assistant_configs/node1_config.json | 63 ++++++++++ event_proxy/main.go | 119 +++++++++++++++++- event_proxy/mosquitto.conf | 7 ++ 6 files changed, 218 insertions(+), 3 deletions(-) create mode 100644 event_proxy/docker-compose.yml create mode 100644 event_proxy/go.sum create mode 100644 event_proxy/home_assistant_configs/node1_config.json create mode 100644 event_proxy/mosquitto.conf diff --git a/event_proxy/docker-compose.yml b/event_proxy/docker-compose.yml new file mode 100644 index 0000000..ef5fec3 --- /dev/null +++ b/event_proxy/docker-compose.yml @@ -0,0 +1,17 @@ +services: + mqtt: + image: eclipse-mosquitto:latest + container_name: mqtt-dev + ports: + - "5883:1883" + - "9001:9001" + volumes: + - ./mosquitto.conf:/mosquitto/config/mosquitto.conf + - mosquitto_data:/mosquitto/data + - mosquitto_logs:/mosquitto/log + environment: + - TZ=UTC + +volumes: + mosquitto_data: + mosquitto_logs: diff --git a/event_proxy/go.mod b/event_proxy/go.mod index 6ca9105..5e8e747 100644 --- a/event_proxy/go.mod +++ b/event_proxy/go.mod @@ -1,3 +1,10 @@ module event_proxy go 1.25.0 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect +) diff --git a/event_proxy/go.sum b/event_proxy/go.sum new file mode 100644 index 0000000..9abe94c --- /dev/null +++ b/event_proxy/go.sum @@ -0,0 +1,8 @@ +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= diff --git a/event_proxy/home_assistant_configs/node1_config.json b/event_proxy/home_assistant_configs/node1_config.json new file mode 100644 index 0000000..ef47b69 --- /dev/null +++ b/event_proxy/home_assistant_configs/node1_config.json @@ -0,0 +1,63 @@ +{ + "dev": { + "ids": "bws_node1_001", + "name": "Node1", + "mf": "diy", + "sw": "1.0", + "hw": "1.0" + }, + "cmps": { + "node1_temp": { + "p": "sensor", + "device_class": "temperature", + "unit_of_measurement": "°C", + "value_template": "{{ value_json.temp}}", + "state_topic": "homeassistant/sensor/bws/node1/state1", + "unique_id": "bws_node1_temp_001" + }, + "node1_humidity": { + "p": "sensor", + "device_class": "humidity", + "unit_of_measurement": "%", + "value_template": "{{ value_json.humidity}}", + "state_topic": "homeassistant/sensor/bws/node1/state1", + "unique_id": "bws_node1_humidity_001" + }, + "node1_pressure": { + "p": "sensor", + "device_class": "atmospheric_pressure", + "unit_of_measurement": "hPa", + "value_template": "{{ value_json.pressure}}", + "state_topic": "homeassistant/sensor/bws/node1/state1", + "unique_id": "bws_node1_pressure_001" + }, + "node1_pm1": { + "p": "sensor", + "device_class": "pm1", + "unit_of_measurement": "µg/m³", + "value_template": "{{ value_json.pm1}}", + "state_topic": "homeassistant/sensor/bws/node1/state2", + "unique_id": "bws_node1_pm1_001" + }, + "node1_pm25": { + "p": "sensor", + "device_class": "pm25", + "unit_of_measurement": "µg/m³", + "value_template": "{{ value_json.pm2_5}}", + "state_topic": "homeassistant/sensor/bws/node1/state2", + "unique_id": "bws_node1_pm25_001" + }, + "node1_pm10": { + "p": "sensor", + "device_class": "pm10", + "unit_of_measurement": "µg/m³", + "value_template": "{{ value_json.pm10}}", + "state_topic": "homeassistant/sensor/bws/node1/state2", + "unique_id": "bws_node1_pm10_001" + } + }, + "o": { + "name": "diy_pico_w" + }, + "qos": 1 +} diff --git a/event_proxy/main.go b/event_proxy/main.go index e20def8..97bb6ec 100644 --- a/event_proxy/main.go +++ b/event_proxy/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "io" "log/slog" @@ -8,8 +9,33 @@ import ( "os" "strconv" "strings" + + mqtt "github.com/eclipse/paho.mqtt.golang" ) +type App struct { + client mqtt.Client +} + +func getMQTTClient() mqtt.Client { + // TODO: get port and host from env vars + opts := mqtt.NewClientOptions(). + AddBroker("tcp://localhost:5883"). + SetClientID("event-proxy") + + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + return client + + // // Publish a message + // token := client.Publish("homeassistant/sensor/bws/node1/state1", 0, false, `{"temp": 23.5, "humidity": 45, "pressure": 1013}`) + // token.Wait() + // client.Disconnect(250) +} + type Message struct { ID int Data []string @@ -37,7 +63,86 @@ func ParseMsg(msg string) (*Message, error) { return message, nil } -func main() { +type Message1DTO struct { + Temperature float32 `json:"temperature"` + Humidity float32 `json:"humidity"` + Pressure float32 `json:"pressure"` +} + +type Message2DTO struct { + PM1 float32 `json:"pm1"` + PM25 float32 `json:"pm25"` + PM10 float32 `json:"pm10"` +} + +func (app *App) handleMsg(message *Message) { + switch message.ID { + case 1: + temp, err := strconv.ParseFloat(message.Data[0], 32) + if err != nil { + slog.Error("Error parsing temperature", "error", err) + return + } + humidity, err := strconv.ParseFloat(message.Data[1], 32) + if err != nil { + slog.Error("Error parsing humidity", "error", err) + return + } + pressure, err := strconv.ParseFloat(message.Data[2], 32) + if err != nil { + slog.Error("Error parsing pressure", "error", err) + return + } + dto := Message1DTO{ + Temperature: float32(temp), + Humidity: float32(humidity), + Pressure: float32(pressure), + } + dtoString, err := json.Marshal(dto) + if err != nil { + slog.Error("Error marshaling Message1DTO", "error", err) + return + } + topic := "homeassistant/sensor/bws/node1/state1" + token := app.client.Publish(topic, 0, false, dtoString) + token.Wait() + slog.Info("Sent Message", "dto", dto, "topic", topic) + case 2: + pm1, err := strconv.ParseFloat(message.Data[0], 32) + if err != nil { + slog.Error("Error parsing pm1", "error", err) + return + } + pm25, err := strconv.ParseFloat(message.Data[1], 32) + if err != nil { + slog.Error("Error parsing pm25", "error", err) + return + } + pm10, err := strconv.ParseFloat(message.Data[2], 32) + if err != nil { + slog.Error("Error parsing pm10", "error", err) + return + } + dto := Message2DTO{ + PM1: float32(pm1), + PM25: float32(pm25), + PM10: float32(pm10), + } + dtoString, err := json.Marshal(dto) + if err != nil { + slog.Error("Error marshaling Message2DTO", "error", err) + return + } + topic := "homeassistant/sensor/bws/node1/state2" + token := app.client.Publish(topic, 0, false, dtoString) + token.Wait() + slog.Info("Sent Message", "dto", dto, "topic", topic) + default: + slog.Warn("Unknown message ID", "id", message.ID) + } +} + +func (app *App) runServer() { // Initialize JSON logger logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) @@ -67,11 +172,11 @@ func main() { slog.Error("Error accepting connection", "error", err) continue } - go handleConnection(conn) + go app.handleConnection(conn) } } -func handleConnection(conn *net.TCPConn) { +func (app *App) handleConnection(conn *net.TCPConn) { defer conn.Close() slog.Info("New connection", "remote_addr", conn.RemoteAddr().String()) @@ -98,6 +203,7 @@ func handleConnection(conn *net.TCPConn) { return } slog.Info("Parsed message", "message", msg) + app.handleMsg(msg) //Echo message back // _, err = conn.Write(buffer[:n]) @@ -108,3 +214,10 @@ func handleConnection(conn *net.TCPConn) { } } } + +func main() { + client := getMQTTClient() + app := App{client: client} + app.runServer() + client.Disconnect(250) +} diff --git a/event_proxy/mosquitto.conf b/event_proxy/mosquitto.conf new file mode 100644 index 0000000..6213483 --- /dev/null +++ b/event_proxy/mosquitto.conf @@ -0,0 +1,7 @@ +listener 1883 +protocol mqtt + +listener 9001 +protocol websockets + +allow_anonymous true