diff --git a/main.go b/main.go index 67732d7..77a3afb 100644 --- a/main.go +++ b/main.go @@ -2,25 +2,78 @@ package main import ( - "net/http" "fmt" + "net/http" - "go.wasmcloud.dev/component/net/wasihttp" + logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging" + "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer" "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/handler" + "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types" + "github.com/bytecodealliance/wasm-tools-go/cm" + "go.wasmcloud.dev/component/net/wasihttp" ) +type messagingConsumerAdapter struct { + Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string]) +} + +// NOTE(lxf): this is overridden in tests +var messagingConsumer = &messagingConsumerAdapter{ + Publish: consumer.Publish, +} + + func init() { - wasihttp.HandleFunc(handleRequest) + wasihttp.HandleFunc(handleHttp) handler.Exports.HandleMessage = handleMessage } -func handleRequest(w http.ResponseWriter, r *http.Request) { +func handleHttp(w http.ResponseWriter, r *http.Request) { // get body as string - handleHttp(r.FormValue("data")) + if handleRequest(r.FormValue("data")) { + fmt.Fprintf(w, "Message sent!\n") + } else { + fmt.Fprintf(w, "Error\n") + } // send response - fmt.Fprintf(w, "Message sent!\n") +} + +func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string]{ + + logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject) + + arg := cm.LiftString[string, *uint8, uint8](msg.Body.Data(), uint8(msg.Body.Len())) + + if handleRequest(arg) { + cm.OK[cm.Result[string, struct{}, string]](struct{}{}) + } + + return cm.Err[cm.Result[string, struct{}, string]]("Couldn't send message to topic") +} + +func handleRequest(arg string) bool { + + // TODO implement the logic to get the destination topic from the config + // dest_topic := config.GetAll() + dest_topic := "streaming" + + result := exec_task(arg) + + // Send reply + reply := types.BrokerMessage{ + Subject: dest_topic, + Body: cm.ToList([]byte(result)), + ReplyTo: cm.None[string](), + } + res := messagingConsumer.Publish(reply) + if res.IsErr() { + logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err()) + return false + } + + return true } // Since we don't run this program like a CLI, the `main` function is empty. Instead, diff --git a/messaging.go b/messaging.go deleted file mode 100644 index 3da97d2..0000000 --- a/messaging.go +++ /dev/null @@ -1,65 +0,0 @@ -package main - -import ( - "github.com/bytecodealliance/wasm-tools-go/cm" - "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer" - "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types" - logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging" - //config "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/config/runtime" -) - -type messagingConsumerAdapter struct { - Publish func(msg types.BrokerMessage) (result cm.Result[string, struct{}, string]) -} - -// NOTE(lxf): this is overridden in tests -var messagingConsumer = &messagingConsumerAdapter{ - Publish: consumer.Publish, -} - -func handleHttp(httpData string) { - - // TODO implement the logic to get the destination topic from the config - // dest_topic := config.GetAll() - dest_topic := "streaming" - - // generate random message - data := []byte(httpData) - - // Send reply - reply := types.BrokerMessage{ - Subject: dest_topic, - Body: cm.ToList(data), - ReplyTo: cm.None[string](), - } - - res := messagingConsumer.Publish(reply) - - if res.IsErr() { - logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err()) - } - -} - -func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string] { - - logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject) - - // TODO implement the logic to get the destination topic from the config - // dest_topic := config.GetAll() - dest_topic := "streaming" - - // Send reply - reply := types.BrokerMessage{ - Subject: dest_topic, - Body: msg.Body, - ReplyTo: cm.None[string](), - } - res := messagingConsumer.Publish(reply) - if res.IsErr() { - logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err()) - return res - } - - return cm.OK[cm.Result[string, struct{}, string]](struct{}{}) -} diff --git a/task.go b/task.go new file mode 100644 index 0000000..b778dc4 --- /dev/null +++ b/task.go @@ -0,0 +1,12 @@ +package main + +import ( + +) + +func exec_task(arg string) string{ + + response := "" + arg + + return response +} \ No newline at end of file