only producer configuration

This commit is contained in:
Lorenzo Venerandi 2025-01-07 00:17:58 +01:00
parent cc93b40612
commit 95d1b2b560
3 changed files with 19 additions and 116 deletions

71
main.go
View File

@ -2,15 +2,12 @@
package main package main
import ( import (
"fmt"
"net/http"
logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging" logger "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasi/logging/logging"
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer" "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/consumer"
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/handler"
"gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types" "gitea.rebus.ninja/lore/wasm-nats-producer-client/gen/wasmcloud/messaging/types"
"github.com/bytecodealliance/wasm-tools-go/cm" "github.com/bytecodealliance/wasm-tools-go/cm"
"go.wasmcloud.dev/component/net/wasihttp" "time"
) )
type messagingConsumerAdapter struct { type messagingConsumerAdapter struct {
@ -24,56 +21,30 @@ var messagingConsumer = &messagingConsumerAdapter{
func init() { func init() {
wasihttp.HandleFunc(handleHttp) sendMessage()
handler.Exports.HandleMessage = handleMessage
} }
func handleHttp(w http.ResponseWriter, r *http.Request) { func sendMessage() {
// get body as string for {
if handleRequest(r.FormValue("data")) {
fmt.Fprintf(w, "Message sent!\n") dest_topic := "streaming"
} else {
fmt.Fprintf(w, "Error\n") message := "test"
// Send reply
reply := types.BrokerMessage{
Subject: dest_topic,
Body: cm.ToList([]byte(message)),
ReplyTo: cm.None[string](),
}
res := messagingConsumer.Publish(reply)
if res.IsErr() {
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
}
time.Sleep(10 * time.Second)
} }
// send response
}
func handleMessage(msg types.BrokerMessage) cm.Result[string, struct{}, string]{
logger.Log(logger.LevelInfo,"MessageHandler", "Received message on subject" + msg.Subject)
arg := cm.LiftString[string, *uint8, uint8](msg.Body.Data(), uint8(msg.Body.Len()))
if handleRequest(arg) {
cm.OK[cm.Result[string, struct{}, string]](struct{}{})
}
return cm.Err[cm.Result[string, struct{}, string]]("Couldn't send message to topic")
}
func handleRequest(arg string) bool {
// TODO implement the logic to get the destination topic from the config
// dest_topic := config.GetAll()
dest_topic := "streaming"
result := exec_task(arg)
// Send reply
reply := types.BrokerMessage{
Subject: dest_topic,
Body: cm.ToList([]byte(result)),
ReplyTo: cm.None[string](),
}
res := messagingConsumer.Publish(reply)
if res.IsErr() {
logger.Log(logger.LevelError, "MessageHandler", "Failed to send reply, error: " + *res.Err())
return false
}
return true
} }
// Since we don't run this program like a CLI, the `main` function is empty. Instead, // Since we don't run this program like a CLI, the `main` function is empty. Instead,

12
task.go
View File

@ -1,12 +0,0 @@
package main
import (
)
func exec_task(arg string) string{
response := "" + arg
return response
}

View File

@ -37,9 +37,6 @@ spec:
weight: 100 weight: 100
requirements: requirements:
host-type: edge host-type: edge
# Establish a unidirectional link to the messaging capability provider (powered by NATS),
# so the `echo` component can make use of messaging interface functionality
# (i.e. making interacting with the messaging system, in this case NATS)
- type: link - type: link
properties: properties:
target: nats-producer target: nats-producer
@ -52,60 +49,7 @@ spec:
type: capability type: capability
properties: properties:
image: ghcr.io/wasmcloud/messaging-nats:0.23.1 image: ghcr.io/wasmcloud/messaging-nats:0.23.1
## To configure OTEL integration for this provider specifically, uncomment the lines below
# config:
# - name: otel
# properties:
# otel_exporter_otlp_endpoint: "http://all-in-one:4318"
# otel_exporter_otlp_traces_endpoint: "http://traces-backend/v1/traces"
# otel_exporter_otlp_metrics_endpoint: "http://metrics-backend/v1/metrics"
# otel_exporter_otlp_logs_endpoint: "http://logs-backend/v1/logs"
traits: traits:
# Since the `nats` capability provider calls an component to handle messages
# coming over subscriptions, this provider needs a unidirectional link to the
# component that wil be called.
#
# Here we link the `nats` provider (the "source"), to the `echo` component (the "target"),
# so that so the provider can deliver messages to the component (by invoking the wasmcloud:messaging/handler interface) .
- type: link
properties:
target: go_data_producer
namespace: wasmcloud
package: messaging
interfaces: [handler]
source_config:
- name: simple-subscription
properties:
subscriptions: producer
- type: spreadscaler
properties:
instances: 1
spread:
- name: cloud
weight: 0
requirements:
host-type: cloud
- name: edge
weight: 100
requirements:
host-type: edge
- name: httpserver
type: capability
properties:
image: ghcr.io/wasmcloud/http-server:0.23.2
traits:
# Link to Echo, and inform it to listen on port 8000
# on the local machine
- type: link
properties:
target: go_data_producer
namespace: wasmcloud
package: http
interfaces: [incoming-handler]
source_config:
- name: default-http
properties:
address: 0.0.0.0:8000
- type: spreadscaler - type: spreadscaler
properties: properties:
instances: 1 instances: 1