MQTT on AWS IoT Core: Connect Devices to the Cloud
The most surprising thing about MQTT on AWS IoT Core is that it’s not just a messaging protocol; it’s an entire state management system for your devices and the cloud.
Let’s see it in action. Imagine a simple temperature sensor.
import paho.mqtt.client as mqtt
import time
import json
# AWS IoT Core MQTT endpoint
# Replace with your actual endpoint
endpoint = "YOUR_AWS_IOT_ENDPOINT.iot.YOUR_REGION.amazonaws.com"
port = 8883 # TLS port
# Certificates and keys
# Replace with your actual file paths
cert_path = "certs/YOUR_CERTIFICATE.pem.crt"
key_path = "certs/YOUR_PRIVATE_KEY.pem.key"
ca_path = "certs/AmazonRootCA1.pem"
# Client ID - must be unique per device
client_id = "my_temperature_sensor_001"
# Topics
publish_topic = f"devices/{client_id}/data"
subscribe_topic = f"devices/{client_id}/commands"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to AWS IoT Core!")
# Subscribe to command topic on connect
client.subscribe(subscribe_topic)
else:
print(f"Connection failed with code {rc}")
def on_message(client, userdata, msg):
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
command = json.loads(msg.payload.decode())
if command.get("action") == "set_reporting_interval":
new_interval = command.get("interval_seconds")
if new_interval:
print(f"Setting reporting interval to {new_interval} seconds.")
# In a real device, you'd update a global variable or device state
# For this example, we'll just print.
client = mqtt.Client(client_id=client_id)
client.on_connect = on_connect
client.on_message = on_message
# Configure TLS
client.tls_set(
ca_certs=ca_path,
certfile=cert_path,
keyfile=key_path
)
try:
client.connect(endpoint, port, 60)
client.loop_start() # Start a background thread for network traffic
# Simulate sensor readings
while True:
temperature = 25.5 + (time.time() % 5) # Simulate some variation
message = {
"timestamp": int(time.time()),
"temperature": round(temperature, 2),
"device_id": client_id
}
payload = json.dumps(message)
result = client.publish(publish_topic, payload, qos=1)
print(f"Published: {payload} to {publish_topic}")
time.sleep(10) # Publish every 10 seconds
except KeyboardInterrupt:
print("Disconnecting...")
client.loop_stop()
client.disconnect()
This code snippet connects a Python client to AWS IoT Core, publishing simulated temperature data and subscribing to receive commands. The client_id is crucial – it’s how AWS IoT Core identifies your device and routes messages. The publish_topic and subscribe_topic follow a convention: devices/{client_id}/data for outgoing sensor readings, and devices/{client_id}/commands for instructions from the cloud. When a message arrives on the command topic, the on_message function decodes it and simulates changing the device’s reporting interval.
AWS IoT Core acts as the MQTT broker, but with significant enhancements. It provides secure authentication using X.509 certificates, authorization via IoT policies, and message routing to other AWS services like Lambda, S3, or DynamoDB. The core concept is the Device Shadow.
The Device Shadow is a persistent JSON document that represents the state of your device. It has three key components:
reported: The state that the device says it is currently in. This is updated by the device itself.desired: The state that the cloud wants the device to be in. This is typically set by an application or user.delta: The difference betweendesiredandreported. AWS IoT Core automatically calculates this and publishes it to a specific topic ($aws/things/THING_NAME/shadow/update/delta) that your device can subscribe to. Your device then reads thisdeltato know what changes it needs to make to reach thedesiredstate.
When your device connects, it establishes a persistent connection to the AWS IoT Core MQTT broker. It then subscribes to its shadow topics:
$aws/things/THING_NAME/shadow/update/delta: To receive desired state changes.$aws/things/THING_NAME/shadow/get/accepted: To get the current shadow state after requesting it.$aws/things/THING_NAME/shadow/update/accepted: To confirm that its reported state updates were accepted.
The device publishes its current state to $aws/things/THING_NAME/shadow/update, which updates the reported property in the shadow document. If a cloud application changes the desired state (e.g., setting a thermostat to 22°C), AWS IoT Core compares it to the reported state, calculates the delta, and publishes it to the /delta topic. The device then receives this delta, acts on it (e.g., turns on the heater), and reports its new reported state back to the shadow. This creates a robust loop for managing device state from the cloud.
When you configure your device in AWS IoT Core, you create a "Thing." This Thing is associated with certificates for authentication and policies for authorization. The MQTT connection uses TLS encryption, and the client ID must match the Common Name (CN) of the device’s certificate or be explicitly allowed by an IoT policy. The topics you use are often structured around the Thing’s name, like devices/MY_DEVICE_ID/messages or, for shadow operations, $aws/things/MY_THING_NAME/shadow/....
The most counterintuitive aspect of the Device Shadow is how it elegantly decouples device and cloud logic. You don’t need your device to be online 100% of the time to command it. You can set a desired state, and when the device eventually reconnects, it will immediately receive the delta and act upon it, reporting its new reported state. This means your cloud application can be built assuming the device will eventually reach the desired state, without worrying about transient network issues.
The next concept to explore is AWS IoT Rules, which allow you to process and act upon the messages your devices send, routing them to various AWS services based on sophisticated filtering and transformation logic.