66 lines
1.9 KiB
Go
66 lines
1.9 KiB
Go
package main
|
|
|
|
import (
|
|
"github.com/bytecodealliance/wasm-tools-go/cm"
|
|
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer"
|
|
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types"
|
|
logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging"
|
|
//config "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/config/runtime"
|
|
)
|
|
|
|
type messagingConsumerAdapter struct {
|
|
Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string])
|
|
}
|
|
|
|
// NOTE(lxf): this is overridden in tests
|
|
var messagingConsumer = &messagingConsumerAdapter{
|
|
Publish: consumer.Publish,
|
|
}
|
|
|
|
func handleHttp(httpData string) {
|
|
|
|
// TODO implement the logic to get the destination topic from the config
|
|
// dest_topic := config.GetAll()
|
|
dest_topic := "streaming"
|
|
|
|
// generate random message
|
|
data := []byte(httpData)
|
|
|
|
// Send reply
|
|
reply := types.BrokerMessage{
|
|
Subject: dest_topic,
|
|
Body: cm.ToList(data),
|
|
ReplyTo: cm.None[string](),
|
|
}
|
|
|
|
res := messagingConsumer.Publish(reply)
|
|
|
|
if res.IsErr() {
|
|
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
|
|
}
|
|
|
|
}
|
|
|
|
func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string] {
|
|
|
|
logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject)
|
|
|
|
// TODO implement the logic to get the destination topic from the config
|
|
// dest_topic := config.GetAll()
|
|
dest_topic := "streaming"
|
|
|
|
// Send reply
|
|
reply := types.BrokerMessage{
|
|
Subject: dest_topic,
|
|
Body: msg.Body,
|
|
ReplyTo: cm.None[string](),
|
|
}
|
|
res := messagingConsumer.Publish(reply)
|
|
if res.IsErr() {
|
|
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
|
|
return res
|
|
}
|
|
|
|
return cm.OK[cm.Result[string, struct{}, string]](struct{}{})
|
|
}
|