In this post we I’ll be creating a shell to publish messages on a particular topic using Mosquitto
mqtt broker, and another application to subscribe to a topic and print incoming messages on the terminal.
For this we need to have mosquitto
and mosquitto-server
installed.
On Fedora you can install it with following command
$ sudo dnf install mosquitto mosquitto-server
To install go package for mqtt
$ go get github.com/eclipse/paho.mqtt.golang
First we will create a tool to publish messages on a given topic.
First import the package.
import "github.com/eclipse/paho.mqtt.golang"
Then we need to create a new MQTT client.
// set the protocol, ip and port of the broker.
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
// set the id to the client.
opts.SetClientID("Device-pub")
// create a new client.
c := MQTT.NewClient(opts)
Connect to the broker and obtain the token.
token := c.Connect();
Once the client is connected to the broker, you can now publish messages on a particular topic.
message := "hello this is the trial message"
c.Publish("some_topic", 0, false, message)
Once that is done, subscribe to a particular topic.
c.Subscribe("some_topic", 0, nil);
You can now recieve/listen to the messages published on the topic named some_topic
.
Now let’s build a tool to subscribe to a topic and recieve the messages published on that topic.
package main
import (
"fmt"
//import the Paho Go MQTT library
MQTT "github.com/eclipse/paho.mqtt.golang"
"os"
"strings"
"time"
)
var flag bool = false
//var wcount int = 0
//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
topic := msg.Topic()
payload := msg.Payload()
if strings.Compare(string(payload), "\n") > 0 {
fmt.Printf("TOPIC: %s\n", topic)
fmt.Printf("MSG: %s\n", payload)
}
if strings.Compare("bye\n", string(payload)) == 0 {
fmt.Println("exitting")
flag = true
}
}
func main() {
//create a ClientOptions struct setting the broker address, clientid, turn
//off trace output and set the default message handler
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
opts.SetClientID("Device-sub")
opts.SetDefaultPublishHandler(f)
//create and start a client using the above ClientOptions
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//subscribe to the topic /go-mqtt/sample and request messages to be delivered
//at a maximum qos of zero, wait for the receipt to confirm the subscription
if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for flag == false {
time.Sleep(1 * time.Second)
//fmt.Println("waiting: ", wcount)
//wcount += 1
}
//unsubscribe from /go-mqtt/sample
if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
}
Now that we have built a tool to publish messages over mqtt, lets build a tool to receive messages.
package main
import (
"bufio"
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
"os"
"strings"
"time"
)
func main(){
//create a ClientOptions struct setting the broker address, clientid, turn
//off trace output and set the default message handler
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
opts.SetClientID("Device-pub")
//create and start a client using the above ClientOptions
c := MQTT.NewClient(opts)
//we are going to try connecting for max 10 times to the server if the connection fails.
for i := 0; i < 10; i++ {
if token := c.Connect(); token.Wait() && token.Error() == nil {
break
} else {
fmt.Println(token.Error())
time.Sleep(1 * time.Second)
}
}
//subscribe to the topic /go-mqtt/sample and request messages to be delivered
//at a maximum qos of zero, wait for the receipt to confirm the subscription
//same thing needs to go here as well.
if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// this is the shell where we will take input from the user and publish the message on the topic until user enters `bye`.
for {
var message string
fmt.Print(">> ")
// create a new bffer reader.
reader := bufio.NewReader(os.Stdin)
// read a string.
message, err := reader.ReadString('\n')
if err != nil {
fmt.Println(err)
}
if strings.Compare(message, "\n") > 0 {
// if there is a message, publish it.
token := c.Publish("some_topic", 0, false, message)
if strings.Compare(message, "bye\n") == 0 {
// if message == "bye" then exit the shell.
break
}
token.Wait()
}
}
//unsubscribe from /go-mqtt/sample
if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
}