package main import ( "encoding/json" "fmt" "io" "log/slog" "net" "os" "strconv" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" ) type App struct { client mqtt.Client } func getMQTTClient() mqtt.Client { mqttHost := os.Getenv("MQTT_HOST") if mqttHost == "" { panic("MQTT_HOST environment variable not set") } mqttPort := os.Getenv("MQTT_PORT") if mqttPort == "" { panic("MQTT_PORT environment variable not set") } mqttUsername := os.Getenv("MQTT_USERNAME") if mqttUsername == "" { panic("MQTT_USERNAME environment variable not set") } mqttPassword := os.Getenv("MQTT_PASSWORD") if mqttPassword == "" { panic("MQTT_PASSWORD environment variable not set") } brokerURL := "tcp://" + mqttHost + ":" + mqttPort slog.Info("Connecting to MQTT", "broker", brokerURL) opts := mqtt.NewClientOptions(). AddBroker(brokerURL). SetClientID("event-proxy"). SetUsername(mqttUsername). SetPassword(mqttPassword) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return client } type Message struct { ID int Data []string } func ParseMsg(msg string) (*Message, error) { parts := strings.Split(msg, ",") msgIdStr := parts[0] if !strings.HasPrefix(msgIdStr, "M") { return nil, fmt.Errorf("message must start with 'M' prefix") } msgIdStr = strings.TrimPrefix(msgIdStr, "M") msgId, err := strconv.Atoi(msgIdStr) if err != nil { return nil, err } msgData := make([]string, len(parts)-1) for i, data := range parts[1:] { msgData[i] = strings.TrimSpace(data) } message := &Message{ ID: msgId, Data: msgData, } return message, nil } type Message1DTO struct { Temperature float32 `json:"temp"` Humidity float32 `json:"humidity"` Pressure float32 `json:"pressure"` } type Message2DTO struct { PM1 float32 `json:"pm1"` PM25 float32 `json:"pm2_5"` PM10 float32 `json:"pm10"` } func (app *App) handleMsg(message *Message) { switch message.ID { case 1: temp, err := strconv.ParseFloat(message.Data[0], 32) if err != nil { slog.Error("Error parsing temperature", "error", err) return } humidity, err := strconv.ParseFloat(message.Data[2], 32) if err != nil { slog.Error("Error parsing humidity", "error", err) return } pressure, err := strconv.ParseFloat(message.Data[1], 32) if err != nil { slog.Error("Error parsing pressure", "error", err) return } dto := Message1DTO{ Temperature: float32(temp), Humidity: float32(humidity), Pressure: float32(pressure), } dtoString, err := json.Marshal(dto) if err != nil { slog.Error("Error marshaling Message1DTO", "error", err) return } topic := "homeassistant/sensor/bws/node1/state1" token := app.client.Publish(topic, 0, false, dtoString) token.Wait() slog.Info("Sent Message", "dto", dto, "topic", topic) case 2: pm1, err := strconv.ParseFloat(message.Data[0], 32) if err != nil { slog.Error("Error parsing pm1", "error", err) return } pm25, err := strconv.ParseFloat(message.Data[1], 32) if err != nil { slog.Error("Error parsing pm25", "error", err) return } pm10, err := strconv.ParseFloat(message.Data[2], 32) if err != nil { slog.Error("Error parsing pm10", "error", err) return } dto := Message2DTO{ PM1: float32(pm1), PM25: float32(pm25), PM10: float32(pm10), } dtoString, err := json.Marshal(dto) if err != nil { slog.Error("Error marshaling Message2DTO", "error", err) return } topic := "homeassistant/sensor/bws/node1/state2" token := app.client.Publish(topic, 0, false, dtoString) token.Wait() slog.Info("Sent Message", "dto", dto, "topic", topic) default: slog.Warn("Unknown message ID", "id", message.ID) } } func (app *App) runServer() { // Initialize JSON logger logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) slog.SetDefault(logger) addr := "0.0.0.0:8080" slog.Info("Starting TCP server", "address", addr) //Resolve address addr_, err := net.ResolveTCPAddr("tcp", addr) if err != nil { slog.Error("Error resolving address", "error", err) return } //Listen for incoming connections listener, err := net.ListenTCP("tcp", addr_) if err != nil { slog.Error("Error starting server", "error", err) return } defer listener.Close() for { //Accept incoming connection conn, err := listener.AcceptTCP() if err != nil { slog.Error("Error accepting connection", "error", err) continue } go app.handleConnection(conn) } } func (app *App) handleConnection(conn *net.TCPConn) { defer conn.Close() slog.Info("New connection", "remote_addr", conn.RemoteAddr().String()) buffer := make([]byte, 1024) for { //Read up to 1024 bytes n, err := conn.Read(buffer) if err != nil { if err == io.EOF { slog.Info("Connection closed by client", "remote_addr", conn.RemoteAddr().String()) } else { slog.Error("Error reading from connection", "error", err) } return } if n == 0 { return } msgStr := string(buffer[:n]) slog.Info("Received message", "message", msgStr, "bytes", n) msg, err := ParseMsg(msgStr) if err != nil { slog.Error("Error parsing message", "error", err) return } slog.Info("Parsed message", "message", msg) app.handleMsg(msg) //Echo message back // _, err = conn.Write(buffer[:n]) _, err = conn.Write([]byte("1")) if err != nil { slog.Error("Error writing to connection", "error", err) return } } } func main() { client := getMQTTClient() app := App{client: client} app.runServer() client.Disconnect(250) }