Technical

Building a Basic MQTT System: Creating a Communication Platform with Python and Mosquitto

OS Information and packages version

  • CentOS 7.x or Ubuntu 20.04
  • python 3.8.x or up
  • mosquitto 1.6.10 or up
  • paho-mqtt 1.6.1 or up

In this project, we will build a basic MQTT system using Python and Mosquitto on a Macbook Air M1. We will use UTM to set up multiple Ubuntu 20.04 servers to achieve this.

First, we will implement a socket server and socket client using Python to create a real-time communication system. Next, we will establish an MQTT broker using Mosquitto on one of the Ubuntu servers. This will allow different clients to send messages through the broker.

Finally, we will use the paho-mqtt module to create an MQTT subscriber on another Ubuntu server. We will subscribe to a specific topic and handle messages sent to that topic. We will repeat this process for additional subscribers.

This project provides an excellent opportunity to learn about MQTT, Python, Mosquitto, and UTM. It will also provide hands-on experience with setting up and managing multiple Ubuntu servers on a Macbook Air M1. The resulting MQTT system can be used as a foundation for more advanced IoT applications, and the skills and knowledge gained from this project can be applied to various other projects and use cases.

I. Introduction

MQTT (Message Queuing Telemetry Transport) is a lightweight communication protocol designed specifically for IoT (Internet of Things) applications. It enables reliable communication in low bandwidth and unstable network environments. MQTT is based on a publish/subscribe pattern, where messages are differentiated by topics. Publishers publish messages to specific topics, and all subscribers that have subscribed to that topic will receive the messages. This allows for multi-party communication, and enables filtering of topics to only receive messages of interest.

MQTT is essential for IoT applications because IoT involves a large number of devices and sensors that need to communicate and exchange information with each other. As these devices and sensors typically have lower computing and storage capabilities, a lightweight communication protocol is needed, and MQTT meets that requirement. MQTT also supports a variety of network protocols, such as TCP, UDP, Bluetooth, ZigBee, etc., enabling cross-platform communication. Furthermore, MQTT’s low-power consumption makes it an ideal communication protocol for IoT devices. In summary, MQTT is designed to provide a reliable and efficient communication solution for IoT applications.

II. Setting up the Ubuntu 20.04 Servers with UTM

Installing UTM and creating multiple Ubuntu 20.04 virtual machines Configuring the network settings for the virtual machines

III. Implementing the Socket Server and Socket Client

Creating a simple socket server and socket client with Python Configuring the socket server to send a test message every 5 seconds

socket_server.py

import  datetime, logging, os, socket, time
from logging.handlers import TimedRotatingFileHandler
from config.config import *

# init log
if not os.path.exists(LOG_PATH):
    os.mkdir(LOG_PATH)

log_file_name = "socket_server_" + datetime.datetime.now().strftime("%Y-%m-%d") + ".log"
log_file_path = os.path.join(LOG_PATH, log_file_name)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = TimedRotatingFileHandler(log_file_path, when='midnight', interval=1, backupCount=60)
handler.setLevel(logging.DEBUG)
handler.suffix = '%Y%m%d'
handler.namer = lambda name: name.replace(".log", "")  # replace .log
# file size 20MB
handler.maxBytes = 20 * 1024 * 1024
handler.setFormatter(formatter)
logger.addHandler(handler)
# Create a Socket object
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Bind the IP address and Port number
server_socket.bind((SOCKET_HOST, SOCKET_PORT))

# Listen for client connections
server_socket.listen()
logger.info("Socket server is up and waiting for client connections...")

print("Socket server is up and waiting for client connections...")

while True:
    # Wait for client connection
    client_socket, addr = server_socket.accept()
    logger.debug("Connected to client {0}".format(addr))
    print("Connected to client {0}".format(addr))

    # Define the number of messages to send
    count = 0

    # Send a message with timestamp and message count every 5 seconds
    while True:
        try:
            # Get current timestamp
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

            # Increment message count
            count += 1

            # Construct message
            message = f"Message {count} sent at {timestamp} | "

            # Send message
            client_socket.send(message.encode('utf-8'))
            logger.debug(message + "to client {0}".format(addr))
            # Send message every 5 seconds
            time.sleep(5)
        except:
            break

    # Close the socket connection
    client_socket.close()

socket_client.py

import asyncio, datetime, logging, os, socket
import paho.mqtt.client as mqtt
from logging.handlers import TimedRotatingFileHandler
from config.config import *

# init log
if not os.path.exists(LOG_PATH):
    os.mkdir(LOG_PATH)

log_file_name = "socket_client_" + datetime.datetime.now().strftime("%Y-%m-%d") + ".log"
log_file_path = os.path.join(LOG_PATH, log_file_name)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = TimedRotatingFileHandler(log_file_path, when='midnight', interval=1, backupCount=60)
handler.setLevel(logging.DEBUG)
handler.suffix = '%Y%m%d'
handler.namer = lambda name: name.replace(".log", "")  # replace .log
# file size 20MB
handler.maxBytes = 20 * 1024 * 1024
handler.setFormatter(formatter)
logger.addHandler(handler)

# MQTT Client 
client = mqtt.Client()
client.username_pw_set(BROKER_CLIENT_USER, BROKER_CLIENT_PASSWD)
client.connect(BROKER_HOST, BROKER_PORT)

# connect socket 
sokcet_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sokcet_client.connect((SOCKET_HOST, SOCKET_PORT))

# Send the received socket message to the MQTT broker server and write it to a log file.
async def handle_socket_data():
    while True:
        try:
            # Received socket message (1024 bytes)
            data = sokcet_client.recv(1024)
            if not data:
                break
            print(data)
            logger.debug(f"Received data from Socket: {data}")
            client.publish(BROKER_TOPIC, data, qos=0, retain=False, properties={"messageExpiryInterval":BROKER_EXPIRY_TIME})
            logger.debug(f"Published data to MQTT Server: {data}")
        except Exception as e:
            logger.error(f"Error in handling Socket data: {e}")
            break

async def main():
    # listen Socket
    await handle_socket_data()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

config/config.py

LOG_PATH = "./log"
SOCKET_HOST = 'localhost'
SOCKET_PORT = 8000
BROKER_HOST = 'localhost'
BROKER_PORT = 1883
BROKER_CLIENT_USER = 'test_user'
BROKER_CLIENT_PASSWD = '1234'
BROKER_EXPIRY_TIME = 60
BROKER_TOPIC = 'test/log'

IV. Setting up the Mosquitto MQTT Broker

Installing and configuring Mosquitto on one of the Ubuntu servers Understanding MQTT topics, messages, and QoS levels

install Mosquitto in Ubuntu 20.04

sudo apt update -y && sudo apt install mosquitto mosquitto-clients -y

check Mosquitto is installed

mosquitto -h

You can see the help like bellow:

mosquitto version 1.6.10

mosquitto is an MQTT v3.1.1 broker.

Usage: mosquitto [-c config_file] [-d] [-h] [-p port]

 -c : specify the broker config file.
 -d : put the broker into the background after starting.
 -h : display this help.
 -p : start the broker listening on the specified port.
      Not recommended in conjunction with the -c option.
 -v : verbose mode - enable all logging types. This overrides
      any logging options given in the config file.

Configure mosquitto settings

cd /etc/mosquitto
vim mosquitto.conf

Modify the following parameters

per_listener_settings true
# bind_address ip-address/host name
bind_address 0.0.0.0

# Port to use for the default listener.
port 1883

log_type all
log_dest file /var/log/mosquitto.log

allow_anonymous false
password_file /etc/mosquitto/pwfile

Add borker user
path: /etc/mosquitto

cp pwfile.example pwfile
mosquitto_passwd -c /etc/mosquitto/tmppw test_user
cat tmppw >> pwfile

Start up Mosquitto broker

sudo systemctl enable mosquitto
sudo systemctl start mosquitto

See http://mosquitto.org/ for more information.

V. Creating an MQTT Subscriber with paho-mqtt

Installing and importing the paho-mqtt module on another Ubuntu server Subscribing to a specific MQTT topic and handling incoming messages Connecting the MQTT subscriber to the socket server

sudo pip3 install paho-mqtt

subscriber.py

import datetime, logging, os
import paho.mqtt.client as mqtt
from logging.handlers import TimedRotatingFileHandler
from config.config import *

# init log
if not os.path.exists(LOG_PATH):
    os.mkdir(LOG_PATH)

log_file_name = "subscriber_" + datetime.datetime.now().strftime("%Y-%m-%d") + ".log"
log_file_path = os.path.join(LOG_PATH, log_file_name)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = TimedRotatingFileHandler(log_file_path, when='midnight', interval=1, backupCount=60)
handler.setLevel(logging.DEBUG)
handler.suffix = '%Y%m%d'
handler.namer = lambda name: name.replace(".log", "")  # replace .log
# file size 20MB
handler.maxBytes = 20 * 1024 * 1024
handler.setFormatter(formatter)
logger.addHandler(handler)

# MQTT Client 
client = mqtt.Client()

def on_message(client, userdata, msg):
    message = msg.payload
    print(message)
    # do somthing ...
    logger.debug(f"Received data from Broker: {message}")


def on_connect(client, userdata, flags, rc):
    #  MQTT topic
    client.subscribe(BROKER_TOPIC)


if __name__ == '__main__':
    client.on_connect = on_connect
    client.on_message = on_message
    client.username_pw_set(BROKER_CLIENT_USER, BROKER_CLIENT_PASSWD)
    client.connect(BROKER_HOST, BROKER_PORT)
    client.loop_forever()

VI. Scaling the MQTT System

Adding additional subscribers to the MQTT system Testing the system with multiple subscribers

VII. Conclusion

Recap of the project and its benefits Next steps for expanding and customizing the MQTT system

In this project, we have built a basic MQTT system using Python and Mosquitto on Ubuntu 20.04 servers using a Macbook Air M1 and UTM. We have learned about the MQTT protocol and its importance in IoT applications, as well as the basic components of an MQTT system.

We implemented a socket server and client using Python, and configured the server to send a test message every 5 seconds. We then set up the Mosquitto MQTT broker, configured topics and QoS levels, and created an MQTT subscriber using the paho-mqtt module.

Through testing the basic MQTT system, we were able to confirm that messages were delivered successfully through the MQTT broker.

Overall, this project provides a solid foundation for building an MQTT system and offers the benefits of real-time communication, efficient data transfer, and scalability.

Next steps for expanding and customizing the MQTT system may include adding more subscribers, creating additional topics, and configuring security measures. The possibilities are endless, and with this foundation, you can explore and build upon the MQTT system to meet your specific needs.

Please refer to my GITHUB for the complete code.

Here is a complete programming tutorial video (notalking):

asmr programming exercise project notalking to Implementing an MQTT System using Python and Mosquitto

Your support is my motivation!

Buy me a coffee ☕ https://ko-fi.com/codedaddypro

Leave a Reply

Your email address will not be published. Required fields are marked *