Golang: Using Paho Mqtt

In this post we I’ll be creating a shell to publish messages on a particular topic using Mosquitto mqtt broker, and another application to subscribe to a topic and print incoming messages on the terminal.

For this we need to have mosquitto and mosquitto-server installed. On Fedora you can install it with following command

$ sudo dnf install mosquitto mosquitto-server

To install go package for mqtt

$ go get github.com/eclipse/paho.mqtt.golang

First we will create a tool to publish messages on a given topic.

First import the package.

import "github.com/eclipse/paho.mqtt.golang"

Then we need to create a new MQTT client.

// set the protocol, ip and port of the broker.
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")

// set the id to the client.
opts.SetClientID("Device-pub")

// create a new client.
c := MQTT.NewClient(opts)

Connect to the broker and obtain the token.

token := c.Connect(); 

Once the client is connected to the broker, you can now publish messages on a particular topic.

message := "hello this is the trial message"
c.Publish("some_topic", 0, false, message)

Once that is done, subscribe to a particular topic.

c.Subscribe("some_topic", 0, nil); 

You can now recieve/listen to the messages published on the topic named some_topic.

Now let’s build a tool to subscribe to a topic and recieve the messages published on that topic.

package main

import (
        "fmt"
        //import the Paho Go MQTT library
        MQTT "github.com/eclipse/paho.mqtt.golang"
        "os"
        "strings"
        "time"
)

var flag bool = false

//var wcount int = 0

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
        topic := msg.Topic()
        payload := msg.Payload()
        if strings.Compare(string(payload), "\n") > 0 {
                fmt.Printf("TOPIC: %s\n", topic)
                fmt.Printf("MSG: %s\n", payload)
        }

        if strings.Compare("bye\n", string(payload)) == 0 {
                fmt.Println("exitting")
                flag = true
        }
}
func main() {
        //create a ClientOptions struct setting the broker address, clientid, turn
        //off trace output and set the default message handler
        opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
        opts.SetClientID("Device-sub")
        opts.SetDefaultPublishHandler(f)

        //create and start a client using the above ClientOptions
        c := MQTT.NewClient(opts)
        if token := c.Connect(); token.Wait() && token.Error() != nil {
                panic(token.Error())
        }

        //subscribe to the topic /go-mqtt/sample and request messages to be delivered
        //at a maximum qos of zero, wait for the receipt to confirm the subscription
        if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        for flag == false {
                time.Sleep(1 * time.Second)
                //fmt.Println("waiting: ", wcount)
                //wcount += 1
        }

        //unsubscribe from /go-mqtt/sample
        if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        c.Disconnect(250)
}

Now that we have built a tool to publish messages over mqtt, lets build a tool to receive messages.

package  main

import (
    "bufio"
    "fmt"
    MQTT "github.com/eclipse/paho.mqtt.golang"
    "os"
    "strings"
    "time"
)

func main(){
        //create a ClientOptions struct setting the broker address, clientid, turn
        //off trace output and set the default message handler
        opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
        opts.SetClientID("Device-pub")

        //create and start a client using the above ClientOptions
        c := MQTT.NewClient(opts)

        //we are going to try connecting for max 10 times to the server if the connection fails.
        for i := 0; i < 10; i++ {
                if token := c.Connect(); token.Wait() && token.Error() == nil {
                        break
                } else {
                        fmt.Println(token.Error())
                        time.Sleep(1 * time.Second)
                }
        }

        //subscribe to the topic /go-mqtt/sample and request messages to be delivered
        //at a maximum qos of zero, wait for the receipt to confirm the subscription
       //same thing needs to go here as well.
        if token := c.Subscribe("some_topic", 0, nil); token.Wait() && token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        // this is the shell where we will take input from the user and publish the message on the topic until user enters `bye`.

        for {
                var message string
                fmt.Print(">> ")
                // create a new bffer reader.
                reader := bufio.NewReader(os.Stdin)
                // read a string.
                message, err := reader.ReadString('\n')
                if err != nil {
                        fmt.Println(err)
                }
                if strings.Compare(message, "\n") > 0 {
                        // if there is a message, publish it.
                        token := c.Publish("some_topic", 0, false, message)
                        if strings.Compare(message, "bye\n") == 0 {
                                // if message == "bye" then exit the shell.
                                break
                        }
                        token.Wait()
                }
        }

        //unsubscribe from /go-mqtt/sample
        if token := c.Unsubscribe("some_topic"); token.Wait() && token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        c.Disconnect(250)

}