wasm-nats-producer-client/messaging.go

60 lines
1.5 KiB
Go
Raw Normal View History

2024-12-14 01:15:08 +00:00
package main
import (
"github.com/bytecodealliance/wasm-tools-go/cm"
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer"
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types"
logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging"
//config "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/config/runtime"
2024-12-14 01:26:48 +00:00
"time"
2024-12-14 01:15:08 +00:00
)
type messagingConsumerAdapter struct {
Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string])
}
// NOTE(lxf): this is overridden in tests
var messagingConsumer = &messagingConsumerAdapter{
Publish: consumer.Publish,
}
func loop() cm.Result[string, struct{}, string] {
// TODO implement the logic to get the destination topic from the config
// dest_topic := config.GetAll()
dest_topic := "test.reply"
2024-12-14 01:26:48 +00:00
counter := 0
2024-12-14 01:15:08 +00:00
// TASK
2024-12-14 01:26:48 +00:00
for {
2024-12-14 01:15:08 +00:00
// generate random message
2024-12-14 01:26:48 +00:00
data := []byte("Hello, World! " + string(counter))
2024-12-14 01:15:08 +00:00
// Send reply
reply := types.BrokerMessage{
Subject: dest_topic,
Body: cm.ToList(data),
ReplyTo: cm.None[string](),
}
res := messagingConsumer.Publish(reply)
if res.IsErr() {
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
return res
}
2024-12-14 01:26:48 +00:00
counter++
if counter == 1000 {
break
}
// sleep for 1 second
time.Sleep(1 * time.Second)
2024-12-14 01:15:08 +00:00
}
return cm.OK[cm.Result[string, struct{}, string]](struct{}{})
}