get event proxy working on local including mqtt message sending

This commit is contained in:
Travis Shears 2025-12-24 21:34:34 +01:00
parent 0897210754
commit aa5a4eda05
Signed by: travisshears
GPG key ID: CB9BF1910F3F7469
6 changed files with 218 additions and 3 deletions

View file

@ -1,6 +1,7 @@
package main
import (
"encoding/json"
"fmt"
"io"
"log/slog"
@ -8,8 +9,33 @@ import (
"os"
"strconv"
"strings"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type App struct {
client mqtt.Client
}
func getMQTTClient() mqtt.Client {
// TODO: get port and host from env vars
opts := mqtt.NewClientOptions().
AddBroker("tcp://localhost:5883").
SetClientID("event-proxy")
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
// // Publish a message
// token := client.Publish("homeassistant/sensor/bws/node1/state1", 0, false, `{"temp": 23.5, "humidity": 45, "pressure": 1013}`)
// token.Wait()
// client.Disconnect(250)
}
type Message struct {
ID int
Data []string
@ -37,7 +63,86 @@ func ParseMsg(msg string) (*Message, error) {
return message, nil
}
func main() {
type Message1DTO struct {
Temperature float32 `json:"temperature"`
Humidity float32 `json:"humidity"`
Pressure float32 `json:"pressure"`
}
type Message2DTO struct {
PM1 float32 `json:"pm1"`
PM25 float32 `json:"pm25"`
PM10 float32 `json:"pm10"`
}
func (app *App) handleMsg(message *Message) {
switch message.ID {
case 1:
temp, err := strconv.ParseFloat(message.Data[0], 32)
if err != nil {
slog.Error("Error parsing temperature", "error", err)
return
}
humidity, err := strconv.ParseFloat(message.Data[1], 32)
if err != nil {
slog.Error("Error parsing humidity", "error", err)
return
}
pressure, err := strconv.ParseFloat(message.Data[2], 32)
if err != nil {
slog.Error("Error parsing pressure", "error", err)
return
}
dto := Message1DTO{
Temperature: float32(temp),
Humidity: float32(humidity),
Pressure: float32(pressure),
}
dtoString, err := json.Marshal(dto)
if err != nil {
slog.Error("Error marshaling Message1DTO", "error", err)
return
}
topic := "homeassistant/sensor/bws/node1/state1"
token := app.client.Publish(topic, 0, false, dtoString)
token.Wait()
slog.Info("Sent Message", "dto", dto, "topic", topic)
case 2:
pm1, err := strconv.ParseFloat(message.Data[0], 32)
if err != nil {
slog.Error("Error parsing pm1", "error", err)
return
}
pm25, err := strconv.ParseFloat(message.Data[1], 32)
if err != nil {
slog.Error("Error parsing pm25", "error", err)
return
}
pm10, err := strconv.ParseFloat(message.Data[2], 32)
if err != nil {
slog.Error("Error parsing pm10", "error", err)
return
}
dto := Message2DTO{
PM1: float32(pm1),
PM25: float32(pm25),
PM10: float32(pm10),
}
dtoString, err := json.Marshal(dto)
if err != nil {
slog.Error("Error marshaling Message2DTO", "error", err)
return
}
topic := "homeassistant/sensor/bws/node1/state2"
token := app.client.Publish(topic, 0, false, dtoString)
token.Wait()
slog.Info("Sent Message", "dto", dto, "topic", topic)
default:
slog.Warn("Unknown message ID", "id", message.ID)
}
}
func (app *App) runServer() {
// Initialize JSON logger
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(logger)
@ -67,11 +172,11 @@ func main() {
slog.Error("Error accepting connection", "error", err)
continue
}
go handleConnection(conn)
go app.handleConnection(conn)
}
}
func handleConnection(conn *net.TCPConn) {
func (app *App) handleConnection(conn *net.TCPConn) {
defer conn.Close()
slog.Info("New connection", "remote_addr", conn.RemoteAddr().String())
@ -98,6 +203,7 @@ func handleConnection(conn *net.TCPConn) {
return
}
slog.Info("Parsed message", "message", msg)
app.handleMsg(msg)
//Echo message back
// _, err = conn.Write(buffer[:n])
@ -108,3 +214,10 @@ func handleConnection(conn *net.TCPConn) {
}
}
}
func main() {
client := getMQTTClient()
app := App{client: client}
app.runServer()
client.Disconnect(250)
}