229 lines
5.2 KiB
Go
229 lines
5.2 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type App struct {
|
|
client mqtt.Client
|
|
}
|
|
|
|
func getMQTTClient() mqtt.Client {
|
|
mqttHost := os.Getenv("MQTT_HOST")
|
|
if mqttHost == "" {
|
|
panic("MQTT_HOST environment variable not set")
|
|
}
|
|
mqttPort := os.Getenv("MQTT_PORT")
|
|
if mqttPort == "" {
|
|
panic("MQTT_PORT environment variable not set")
|
|
}
|
|
|
|
brokerURL := "tcp://" + mqttHost + ":" + mqttPort
|
|
slog.Info("Connecting to MQTT", "broker", brokerURL)
|
|
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(brokerURL).
|
|
SetClientID("event-proxy")
|
|
|
|
client := mqtt.NewClient(opts)
|
|
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
panic(token.Error())
|
|
}
|
|
return client
|
|
}
|
|
|
|
type Message struct {
|
|
ID int
|
|
Data []string
|
|
}
|
|
|
|
func ParseMsg(msg string) (*Message, error) {
|
|
parts := strings.Split(msg, ",")
|
|
msgIdStr := parts[0]
|
|
if !strings.HasPrefix(msgIdStr, "M") {
|
|
return nil, fmt.Errorf("message must start with 'M' prefix")
|
|
}
|
|
msgIdStr = strings.TrimPrefix(msgIdStr, "M")
|
|
msgId, err := strconv.Atoi(msgIdStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
msgData := make([]string, len(parts)-1)
|
|
for i, data := range parts[1:] {
|
|
msgData[i] = strings.TrimSpace(data)
|
|
}
|
|
message := &Message{
|
|
ID: msgId,
|
|
Data: msgData,
|
|
}
|
|
return message, nil
|
|
}
|
|
|
|
type Message1DTO struct {
|
|
Temperature float32 `json:"temperature"`
|
|
Humidity float32 `json:"humidity"`
|
|
Pressure float32 `json:"pressure"`
|
|
}
|
|
|
|
type Message2DTO struct {
|
|
PM1 float32 `json:"pm1"`
|
|
PM25 float32 `json:"pm25"`
|
|
PM10 float32 `json:"pm10"`
|
|
}
|
|
|
|
func (app *App) handleMsg(message *Message) {
|
|
switch message.ID {
|
|
case 1:
|
|
temp, err := strconv.ParseFloat(message.Data[0], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing temperature", "error", err)
|
|
return
|
|
}
|
|
humidity, err := strconv.ParseFloat(message.Data[1], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing humidity", "error", err)
|
|
return
|
|
}
|
|
pressure, err := strconv.ParseFloat(message.Data[2], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing pressure", "error", err)
|
|
return
|
|
}
|
|
dto := Message1DTO{
|
|
Temperature: float32(temp),
|
|
Humidity: float32(humidity),
|
|
Pressure: float32(pressure),
|
|
}
|
|
dtoString, err := json.Marshal(dto)
|
|
if err != nil {
|
|
slog.Error("Error marshaling Message1DTO", "error", err)
|
|
return
|
|
}
|
|
topic := "homeassistant/sensor/bws/node1/state1"
|
|
token := app.client.Publish(topic, 0, false, dtoString)
|
|
token.Wait()
|
|
slog.Info("Sent Message", "dto", dto, "topic", topic)
|
|
case 2:
|
|
pm1, err := strconv.ParseFloat(message.Data[0], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing pm1", "error", err)
|
|
return
|
|
}
|
|
pm25, err := strconv.ParseFloat(message.Data[1], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing pm25", "error", err)
|
|
return
|
|
}
|
|
pm10, err := strconv.ParseFloat(message.Data[2], 32)
|
|
if err != nil {
|
|
slog.Error("Error parsing pm10", "error", err)
|
|
return
|
|
}
|
|
dto := Message2DTO{
|
|
PM1: float32(pm1),
|
|
PM25: float32(pm25),
|
|
PM10: float32(pm10),
|
|
}
|
|
dtoString, err := json.Marshal(dto)
|
|
if err != nil {
|
|
slog.Error("Error marshaling Message2DTO", "error", err)
|
|
return
|
|
}
|
|
topic := "homeassistant/sensor/bws/node1/state2"
|
|
token := app.client.Publish(topic, 0, false, dtoString)
|
|
token.Wait()
|
|
slog.Info("Sent Message", "dto", dto, "topic", topic)
|
|
default:
|
|
slog.Warn("Unknown message ID", "id", message.ID)
|
|
}
|
|
}
|
|
|
|
func (app *App) runServer() {
|
|
// Initialize JSON logger
|
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
|
slog.SetDefault(logger)
|
|
|
|
addr := "0.0.0.0:8080"
|
|
slog.Info("Starting TCP server", "address", addr)
|
|
|
|
//Resolve address
|
|
addr_, err := net.ResolveTCPAddr("tcp", addr)
|
|
if err != nil {
|
|
slog.Error("Error resolving address", "error", err)
|
|
return
|
|
}
|
|
|
|
//Listen for incoming connections
|
|
listener, err := net.ListenTCP("tcp", addr_)
|
|
if err != nil {
|
|
slog.Error("Error starting server", "error", err)
|
|
return
|
|
}
|
|
defer listener.Close()
|
|
|
|
for {
|
|
//Accept incoming connection
|
|
conn, err := listener.AcceptTCP()
|
|
if err != nil {
|
|
slog.Error("Error accepting connection", "error", err)
|
|
continue
|
|
}
|
|
go app.handleConnection(conn)
|
|
}
|
|
}
|
|
|
|
func (app *App) handleConnection(conn *net.TCPConn) {
|
|
defer conn.Close()
|
|
slog.Info("New connection", "remote_addr", conn.RemoteAddr().String())
|
|
|
|
buffer := make([]byte, 1024)
|
|
for {
|
|
//Read up to 1024 bytes
|
|
n, err := conn.Read(buffer)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
slog.Info("Connection closed by client", "remote_addr", conn.RemoteAddr().String())
|
|
} else {
|
|
slog.Error("Error reading from connection", "error", err)
|
|
}
|
|
return
|
|
}
|
|
if n == 0 {
|
|
return
|
|
}
|
|
msgStr := string(buffer[:n])
|
|
slog.Info("Received message", "message", msgStr, "bytes", n)
|
|
msg, err := ParseMsg(msgStr)
|
|
if err != nil {
|
|
slog.Error("Error parsing message", "error", err)
|
|
return
|
|
}
|
|
slog.Info("Parsed message", "message", msg)
|
|
app.handleMsg(msg)
|
|
|
|
//Echo message back
|
|
// _, err = conn.Write(buffer[:n])
|
|
_, err = conn.Write([]byte("1"))
|
|
if err != nil {
|
|
slog.Error("Error writing to connection", "error", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
client := getMQTTClient()
|
|
app := App{client: client}
|
|
app.runServer()
|
|
client.Disconnect(250)
|
|
}
|