Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype a MQTT Source #7207

Closed
matzew opened this issue Aug 25, 2023 · 23 comments · Fixed by #7919
Closed

Prototype a MQTT Source #7207

matzew opened this issue Aug 25, 2023 · 23 comments · Fixed by #7919
Assignees
Labels
good first issue Denotes an issue ready for a new contributor, according to the "help wanted" guidelines. help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. kind/feature-request kind/good-first-issue Denotes an issue ready for a new contributor. triage/accepted Issues which should be fixed (post-triage)

Comments

@matzew
Copy link
Member

matzew commented Aug 25, 2023

Problem

We do not have a MQTT Source in Knative Eventing, for transforming messages from MQTT brokers to Cloudevents and ingest them via HTTP into the Knative Eventing system.

With "edge" / "iot" computing there is a growing number of "devices" that emit MQTT message to brokers. In order to process those message in Knative Eventing, we need a Source to read from those MQTT brokers

POC Implementation

Instead of directly creating a fully fledge controller, we can go a much simple approach and create an image, that can be used inside the ContainerSource, like we did for the websockets protocol. See:

This would give us a quick win, and a good starting point for developers in the need to transform MQTT messages to cloudevents, into the Knative eventing sytstem

Persona:
Developers

Exit Criteria
A binary/image that can be used via the Containersource CRD

Time Estimate (optional):
2-3

Additional context (optional)

@matzew matzew added kind/feature-request help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. kind/good-first-issue Denotes an issue ready for a new contributor. good first issue Denotes an issue ready for a new contributor, according to the "help wanted" guidelines. labels Aug 25, 2023
@rhuss
Copy link
Contributor

rhuss commented Aug 25, 2023

I really love the idea and this is definitely a start in the right direction. I guess such a ContainerSource could also be used in a SinkBinding (i'm always confused when to a use ContainerSource and when a SinkBinding 😬 ). A next step then could be a fully fledged MqttSource as this has the advantage of a typed configuration for e.g. the connection parameters.

@g1rjeevan
Copy link
Contributor

I would like to work and contribute on this.

@g1rjeevan
Copy link
Contributor

/assign

@matzew
Copy link
Member Author

matzew commented Aug 28, 2023

@g1rjeevan sounds good! Feel free to join our slack instance for further discussions

@prakrit55
Copy link
Contributor

hey @g1rjeevan, are you still on the ticket..........I want to give it a try

@Leo6Leo
Copy link
Member

Leo6Leo commented Oct 30, 2023

Are there any updates on this issue @g1rjeevan? Please let me know if you are still working on it and still need more time within the next 24 hours, otherwise we will give @prakrit55 a chance to try! Thanks :)

@Leo6Leo
Copy link
Member

Leo6Leo commented Nov 2, 2023

/unassign
@prakrit55 Do you want to give it a shot?

@prakrit55
Copy link
Contributor

Hey @Leo6Leo, thanks for mentioning. Yes, it wd love to work on it.

@Leo6Leo
Copy link
Member

Leo6Leo commented Nov 2, 2023

/assign @prakrit55

@Leo6Leo
Copy link
Member

Leo6Leo commented Nov 13, 2023

@prakrit55 are there any questions about this Issue?

@prakrit55
Copy link
Contributor

Hey @Leo6Leo, sorry for the delay. Where could I find the main for containersource ??

@Leo6Leo
Copy link
Member

Leo6Leo commented Nov 22, 2023

Hey @prakrit55, thanks for working on this issue.

Our final goal is to create a containersource that can emit the MQTT event. Something similar to this, see how the heartbeats source is implemented.

So the code should be in a package under cmd/mqttsource and the ContainerSource YAML should be under config/tools/mqttsource.

The final goal should be:

  • Configure mqttsource in namespace with a ConfigMap
  • Apply the ContainerSource YAML ko apply -n <x> -f config/tools/mqttsource
  • And hope it will work

@prakrit55
Copy link
Contributor

hey @Leo6Leo, should we configure it with only http or https with tls ??

@Leo6Leo
Copy link
Member

Leo6Leo commented Nov 28, 2023

Hey @prakrit55, follow up upon our conversation on slack regarding this question, I have consulted with tech lead @pierDipi , and do you want to give a try to both http and https? :))) Please let us know if you have any questions!

Here is a code snippet that might be helpful to you when you are working on the TLS part.

opts := make([]cehttp.Option, 0, 1)
opts = append(opts, cloudevents.WithTarget(sink))
if eventingtls.IsHttpsSink(sink) {
clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = &cacerts
httpTransport := http.DefaultTransport.(*http.Transport).Clone()
httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
log.Fatalf("Failed to get TLS Client Config: %v", err)
}
transport := &ochttp.Transport{
Base: httpTransport,
Propagation: tracecontextb3.TraceContextEgress,
}
opts = append(opts, cehttp.WithRoundTripper(transport))
}
c, err := client.NewClientHTTP(opts, nil)

@octonawish-akcodes
Copy link
Contributor

/assign

@prakrit55 prakrit55 removed their assignment Jan 16, 2024
@octonawish-akcodes octonawish-akcodes removed their assignment Jan 17, 2024
@Rahul-Kumar-prog
Copy link
Contributor

hey @Leo6Leo this issue seems pretty interesting for me and I want to give a go.

I am assigning this to myself.

@Rahul-Kumar-prog
Copy link
Contributor

/assign

Copy link

This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.

@github-actions github-actions bot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Apr 18, 2024
@creydr
Copy link
Member

creydr commented Apr 18, 2024

/remove-lifecycle stale

@knative-prow knative-prow bot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Apr 18, 2024
@creydr creydr added the triage/accepted Issues which should be fixed (post-triage) label Apr 18, 2024
@ctmphuongg
Copy link
Contributor

Hi! I'm interested in working on this issue if it's not worked on now.

@creydr
Copy link
Member

creydr commented May 6, 2024

Welcome @ctmphuongg,
The other PR related to this seems to be closed. So feel free to go ahead and /assign it to you in case you want to pick this

@ctmphuongg
Copy link
Contributor

/assign

@Leo6Leo
Copy link
Member

Leo6Leo commented May 14, 2024

@ctmphuongg As you described the process that you tested cloud-event's sample MQTT receiver:

Copy the file main.go in folder receiver, installing all packages

  1. Start a MQTT broker as they mentioned in the README: docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
  2. Keep the mqttsource running by go build the file and run ./main
  3. Send a message to the same topic, using mosquitto_pub -t “test-topic” -m “test”
  4. The thing is I don’t receive any message in the terminal where I started the mqttsource, but when using 5 mosquitto_sub to the same topic “test-topic”, it still received all the messages. So, I’m not sure if that sample there is working or not

The reason why there is no message is received is because you are passing the data as string, without specify any cloudevent properties.

According to CloudEvent Spec for MQTT,

For MQTT 5.0, the MQTT PUBLISH message's Content Type property MUST be set to the media type of an event format. For MQTT 3.1.1, the media type of the JSON event format is always implied.

And according to the example here indicated:

------------------ PUBLISH -------------------

Topic Name: mytopic
Content Type: application/cloudevents+json; charset=utf-8

------------------ payload -------------------

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
	"time" : 2018-04-05T03:56;24Z,
	"id" : 1234-1234-1234,
	"source" : "/mycontext/subcontext",
	"datacontenttype" : "application/json; charset=utf-8",

    ... further attributes omitted ...

    "data" : {
        ... application data ...
    }
}

-----------------------------------------------

you will need to carefully craft the message (payload) of your data, otherwise cloudevent go-sdk will fail the event validation, and will not invoke the Invoker function.

To conclude, the correct mosquitto_pub command should be:

mosquitto_pub -t 'test/topic' -m '{"specversion" : "1.0","type" :"com.example.someevent","id" : "1234-1234-1234","source" : "/mycontext/subcontext","data":{"msg":"hello world!"}}' -D PUBLISH user-property Content-Type application/cloudevents+json; charset=utf-8

Hope this helps, and please let me know if you have any questions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Denotes an issue ready for a new contributor, according to the "help wanted" guidelines. help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. kind/feature-request kind/good-first-issue Denotes an issue ready for a new contributor. triage/accepted Issues which should be fixed (post-triage)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants