moved logic to main and add task file template
This commit is contained in:
parent
2d0467e13e
commit
cc93b40612
65
main.go
65
main.go
@ -2,25 +2,78 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"go.wasmcloud.dev/component/net/wasihttp"
|
||||
logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging"
|
||||
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer"
|
||||
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/handler"
|
||||
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types"
|
||||
"github.com/bytecodealliance/wasm-tools-go/cm"
|
||||
"go.wasmcloud.dev/component/net/wasihttp"
|
||||
)
|
||||
|
||||
type messagingConsumerAdapter struct {
|
||||
Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string])
|
||||
}
|
||||
|
||||
// NOTE(lxf): this is overridden in tests
|
||||
var messagingConsumer = &messagingConsumerAdapter{
|
||||
Publish: consumer.Publish,
|
||||
}
|
||||
|
||||
|
||||
func init() {
|
||||
wasihttp.HandleFunc(handleRequest)
|
||||
wasihttp.HandleFunc(handleHttp)
|
||||
handler.Exports.HandleMessage = handleMessage
|
||||
}
|
||||
|
||||
func handleRequest(w http.ResponseWriter, r *http.Request) {
|
||||
func handleHttp(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// get body as string
|
||||
handleHttp(r.FormValue("data"))
|
||||
if handleRequest(r.FormValue("data")) {
|
||||
fmt.Fprintf(w, "Message sent!\n")
|
||||
} else {
|
||||
fmt.Fprintf(w, "Error\n")
|
||||
}
|
||||
|
||||
// send response
|
||||
fmt.Fprintf(w, "Message sent!\n")
|
||||
}
|
||||
|
||||
func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string]{
|
||||
|
||||
logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject)
|
||||
|
||||
arg := cm.LiftString[string, *uint8, uint8](msg.Body.Data(), uint8(msg.Body.Len()))
|
||||
|
||||
if handleRequest(arg) {
|
||||
cm.OK[cm.Result[string, struct{}, string]](struct{}{})
|
||||
}
|
||||
|
||||
return cm.Err[cm.Result[string, struct{}, string]]("Couldn't send message to topic")
|
||||
}
|
||||
|
||||
func handleRequest(arg string) bool {
|
||||
|
||||
// TODO implement the logic to get the destination topic from the config
|
||||
// dest_topic := config.GetAll()
|
||||
dest_topic := "streaming"
|
||||
|
||||
result := exec_task(arg)
|
||||
|
||||
// Send reply
|
||||
reply := types.BrokerMessage{
|
||||
Subject: dest_topic,
|
||||
Body: cm.ToList([]byte(result)),
|
||||
ReplyTo: cm.None[string](),
|
||||
}
|
||||
res := messagingConsumer.Publish(reply)
|
||||
if res.IsErr() {
|
||||
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Since we don't run this program like a CLI, the `main` function is empty. Instead,
|
||||
|
65
messaging.go
65
messaging.go
@ -1,65 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/bytecodealliance/wasm-tools-go/cm"
|
||||
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer"
|
||||
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types"
|
||||
logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging"
|
||||
//config "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/config/runtime"
|
||||
)
|
||||
|
||||
type messagingConsumerAdapter struct {
|
||||
Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string])
|
||||
}
|
||||
|
||||
// NOTE(lxf): this is overridden in tests
|
||||
var messagingConsumer = &messagingConsumerAdapter{
|
||||
Publish: consumer.Publish,
|
||||
}
|
||||
|
||||
func handleHttp(httpData string) {
|
||||
|
||||
// TODO implement the logic to get the destination topic from the config
|
||||
// dest_topic := config.GetAll()
|
||||
dest_topic := "streaming"
|
||||
|
||||
// generate random message
|
||||
data := []byte(httpData)
|
||||
|
||||
// Send reply
|
||||
reply := types.BrokerMessage{
|
||||
Subject: dest_topic,
|
||||
Body: cm.ToList(data),
|
||||
ReplyTo: cm.None[string](),
|
||||
}
|
||||
|
||||
res := messagingConsumer.Publish(reply)
|
||||
|
||||
if res.IsErr() {
|
||||
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string] {
|
||||
|
||||
logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject)
|
||||
|
||||
// TODO implement the logic to get the destination topic from the config
|
||||
// dest_topic := config.GetAll()
|
||||
dest_topic := "streaming"
|
||||
|
||||
// Send reply
|
||||
reply := types.BrokerMessage{
|
||||
Subject: dest_topic,
|
||||
Body: msg.Body,
|
||||
ReplyTo: cm.None[string](),
|
||||
}
|
||||
res := messagingConsumer.Publish(reply)
|
||||
if res.IsErr() {
|
||||
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
|
||||
return res
|
||||
}
|
||||
|
||||
return cm.OK[cm.Result[string, struct{}, string]](struct{}{})
|
||||
}
|
Loading…
Reference in New Issue
Block a user