wasm-nats-stream-client/messaging.go

36 lines
1.1 KiB
Go
Raw Normal View History

2024-12-05 23:36:10 +00:00
package main
import (
"github.com/bytecodealliance/wasm-tools-go/cm"
"gitea.rebus.ninja/lore/go-nats-client/gen/wasmcloud/messaging/consumer"
"gitea.rebus.ninja/lore/go-nats-client/gen/wasmcloud/messaging/types"
2024-12-06 11:55:15 +00:00
logger "gitea.rebus.ninja/lore/go-nats-client/gen/wasi/logging/logging"
2024-12-05 23:36:10 +00:00
)
type messagingConsumerAdapter struct {
Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string])
}
// NOTE(lxf): this is overridden in tests
var messagingConsumer = &messagingConsumerAdapter{
Publish: consumer.Publish,
}
func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string] {
2024-12-06 11:55:15 +00:00
logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject)
2024-12-05 23:36:10 +00:00
2024-12-06 00:29:36 +00:00
reply := types.BrokerMessage{
Subject: msg.Subject + ".reply",
Body: msg.Body,
ReplyTo: cm.None[string](),
}
res := messagingConsumer.Publish(reply)
if res.IsErr() {
2024-12-06 11:55:15 +00:00
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
2024-12-06 00:29:36 +00:00
return res
2024-12-05 23:36:10 +00:00
}
return cm.OK[cm.Result[string, struct{}, string]](struct{}{})
}