From 55cc77a7ca1b181d1874a478638833da048fb76e Mon Sep 17 00:00:00 2001 From: Jonathan Siegel <248302+usiegj00@users.noreply.github.com> Date: Wed, 7 Jan 2026 23:15:44 +0900 Subject: [PATCH 1/2] Add MQTT 5.0 protocol support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement core MQTT 5.0 features including property encoding/decoding, reason codes, and updated packet formats. Default protocol version changed from 3.1.1 to 5.0. Core features: - Variable-length integer encoding for properties - Property support for all packet types (Connect, Connack, Publish, Subscribe, Unsubscribe, Disconnect, and ACK packets) - Reason codes replacing simple return codes - Session expiry interval, message expiry interval, user properties Backward compatible: existing 3.1.x code works by setting version. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lib/mqtt/client.rb | 9 +- lib/mqtt/packet.rb | 431 ++++++++++++++++++++++++++- spec/mqtt_client_spec.rb | 19 +- spec/mqtt_packet_spec.rb | 450 ++++++++++++++++++++++++++--- spec/zz_client_integration_spec.rb | 2 +- 5 files changed, 845 insertions(+), 66 deletions(-) diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index 9fddce9..f236c5c 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -10,7 +10,7 @@ class Client # Port number of the remote server attr_accessor :port - # The version number of the MQTT protocol to use (default 3.1.1) + # The version number of the MQTT protocol to use (default 5.0) attr_accessor :version # Set to true to enable SSL/TLS encrypted communication @@ -69,7 +69,7 @@ class Client ATTR_DEFAULTS = { :host => nil, :port => nil, - :version => '3.1.1', + :version => '5.0', :keep_alive => 15, :clean_session => true, :client_id => nil, @@ -309,7 +309,7 @@ def disconnect(send_msg = true) # Close the socket if it is open if send_msg - packet = MQTT::Packet::Disconnect.new + packet = MQTT::Packet::Disconnect.new(:version => @version) send_packet(packet) end @socket.close unless @socket.nil? @@ -328,6 +328,7 @@ def publish(topic, payload = '', retain = false, qos = 0) raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( + :version => @version, :id => next_packet_id, :qos => qos, :retain => retain, @@ -375,6 +376,7 @@ def publish(topic, payload = '', retain = false, qos = 0) # def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( + :version => @version, :id => next_packet_id, :topics => topics ) @@ -458,6 +460,7 @@ def unsubscribe(*topics) topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( + :version => @version, :topics => topics, :id => next_packet_id ) diff --git a/lib/mqtt/packet.rb b/lib/mqtt/packet.rb index 1431975..291d9e8 100644 --- a/lib/mqtt/packet.rb +++ b/lib/mqtt/packet.rb @@ -54,8 +54,11 @@ def self.read(socket) end # Parse buffer into new packet object - def self.parse(buffer) + # @param buffer [String] Raw packet data + # @param version [String] Optional MQTT version ('3.1.0', '3.1.1', '5.0') for context + def self.parse(buffer, version: nil) packet = parse_header(buffer) + packet.version = version if version packet.parse_body(buffer) packet end @@ -282,6 +285,220 @@ def shift_string(buffer) str.force_encoding('UTF-8') end + # Encode a 32-bit unsigned integer (MQTT 5.0) + def encode_int(val) + raise 'Value too big for int' if val > 0xffffffff + [val.to_i].pack('N') + end + + # Remove a 32-bit unsigned integer from the front of buffer (MQTT 5.0) + def shift_int(buffer) + buffer.slice!(0..3).unpack1('N') + end + + # Encode a variable byte integer (MQTT 5.0) + def encode_variable_byte_integer(value) + bytes = [] + loop do + digit = value % 128 + value = value.div(128) + digit |= 0x80 if value > 0 + bytes << digit + break if value.zero? + end + bytes.pack('C*') + end + + # Remove a variable byte integer from the front of buffer (MQTT 5.0) + def shift_variable_byte_integer(buffer) + multiplier = 1 + value = 0 + loop do + byte = shift_byte(buffer) + value += (byte & 0x7F) * multiplier + break if (byte & 0x80).zero? + multiplier *= 128 + end + value + end + + # Encode a string pair (MQTT 5.0 user properties) + def encode_string_pair(key, value) + encode_string(key) + encode_string(value) + end + + # Remove a string pair from the front of buffer (MQTT 5.0) + def shift_string_pair(buffer) + key = shift_string(buffer) + value = shift_string(buffer) + [key, value] + end + + # MQTT 5.0 Property identifiers + PROPERTY_IDS = { + payload_format_indicator: 0x01, + message_expiry_interval: 0x02, + content_type: 0x03, + response_topic: 0x08, + correlation_data: 0x09, + subscription_identifier: 0x0B, + session_expiry_interval: 0x11, + assigned_client_identifier: 0x12, + server_keep_alive: 0x13, + authentication_method: 0x15, + authentication_data: 0x16, + request_problem_information: 0x17, + will_delay_interval: 0x18, + request_response_information: 0x19, + response_information: 0x1A, + server_reference: 0x1C, + reason_string: 0x1F, + receive_maximum: 0x21, + topic_alias_maximum: 0x22, + topic_alias: 0x23, + maximum_qos: 0x24, + retain_available: 0x25, + user_property: 0x26, + maximum_packet_size: 0x27, + wildcard_subscription_available: 0x28, + subscription_identifier_available: 0x29, + shared_subscription_available: 0x2A + }.freeze + + # Property types for encoding/decoding + PROPERTY_TYPES = { + 0x01 => :byte, + 0x02 => :int, + 0x03 => :string, + 0x08 => :string, + 0x09 => :binary, + 0x0B => :variable_byte_integer, + 0x11 => :int, + 0x12 => :string, + 0x13 => :short, + 0x15 => :string, + 0x16 => :binary, + 0x17 => :byte, + 0x18 => :int, + 0x19 => :byte, + 0x1A => :string, + 0x1C => :string, + 0x1F => :string, + 0x21 => :short, + 0x22 => :short, + 0x23 => :short, + 0x24 => :byte, + 0x25 => :byte, + 0x26 => :string_pair, + 0x27 => :int, + 0x28 => :byte, + 0x29 => :byte, + 0x2A => :byte + }.freeze + + # Reverse lookup: property ID to symbol + PROPERTY_NAMES = PROPERTY_IDS.invert.freeze + + # Encode MQTT 5.0 properties hash to binary + def encode_properties(properties) + return encode_variable_byte_integer(0) if properties.nil? || properties.empty? + + data = ''.dup.force_encoding('ASCII-8BIT') + properties.each do |key, value| + prop_id = key.is_a?(Symbol) ? PROPERTY_IDS[key] : key + raise "Unknown property: #{key}" if prop_id.nil? + + prop_type = PROPERTY_TYPES[prop_id] + if key == :user_property || prop_id == 0x26 + # User properties can be repeated + Array(value).each do |pair| + data += encode_bytes(prop_id) + data += encode_string_pair(pair[0], pair[1]) + end + else + data += encode_bytes(prop_id) + data += encode_property_value(prop_type, value) + end + end + + encode_variable_byte_integer(data.bytesize) + data + end + + # Parse MQTT 5.0 properties from buffer + def parse_properties(buffer) + return {} if buffer.empty? + + length = shift_variable_byte_integer(buffer) + return {} if length.zero? + + prop_data = shift_data(buffer, length) + properties = {} + + until prop_data.empty? + prop_id = shift_byte(prop_data) + prop_name = PROPERTY_NAMES[prop_id] || prop_id + prop_type = PROPERTY_TYPES[prop_id] + + raise ProtocolException, "Unknown property identifier: #{prop_id}" if prop_type.nil? + + value = parse_property_value(prop_type, prop_data) + + if prop_id == 0x26 # user_property - can repeat + properties[prop_name] ||= [] + properties[prop_name] << value + else + properties[prop_name] = value + end + end + + properties + end + + # Encode a single property value based on type + def encode_property_value(type, value) + case type + when :byte + encode_bytes(value.to_i) + when :short + encode_short(value.to_i) + when :int + encode_int(value.to_i) + when :string + encode_string(value.to_s) + when :binary + encode_short(value.bytesize) + value.dup.force_encoding('ASCII-8BIT') + when :variable_byte_integer + encode_variable_byte_integer(value.to_i) + when :string_pair + encode_string_pair(value[0], value[1]) + else + raise "Unknown property type: #{type}" + end + end + + # Parse a single property value based on type + def parse_property_value(type, buffer) + case type + when :byte + shift_byte(buffer) + when :short + shift_short(buffer) + when :int + shift_int(buffer) + when :string + shift_string(buffer) + when :binary + len = shift_short(buffer) + shift_data(buffer, len) + when :variable_byte_integer + shift_variable_byte_integer(buffer) + when :string_pair + shift_string_pair(buffer) + else + raise "Unknown property type: #{type}" + end + end + ## PACKET SUBCLASSES ## # Class representing an MQTT Publish message @@ -301,10 +518,14 @@ class Publish < MQTT::Packet # The data to be published attr_accessor :payload + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { :topic => nil, - :payload => '' + :payload => '', + :properties => {} } # Create a new Publish packet @@ -351,6 +572,7 @@ def encode_body end body += encode_string(@topic) body += encode_short(@id) unless qos.zero? + body += encode_properties(@properties) if @version == '5.0' body += payload.to_s.dup.force_encoding('ASCII-8BIT') body end @@ -360,6 +582,10 @@ def parse_body(buffer) super(buffer) @topic = shift_string(buffer) @id = shift_short(buffer) unless qos.zero? + + # MQTT 5.0: parse properties if version is set + @properties = parse_properties(buffer) if @version == '5.0' + @payload = buffer end @@ -428,6 +654,9 @@ class Connect < MQTT::Packet # The password for authenticating with the server attr_accessor :password + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { :client_id => nil, @@ -438,7 +667,8 @@ class Connect < MQTT::Packet :will_retain => false, :will_payload => '', :username => nil, - :password => nil + :password => nil, + :properties => {} } # Create a new Client Connect packet @@ -451,6 +681,9 @@ def initialize(args = {}) elsif version == '3.1.1' self.protocol_name ||= 'MQTT' self.protocol_level ||= 0x04 + elsif version == '5.0' + self.protocol_name ||= 'MQTT' + self.protocol_level ||= 0x05 else raise ArgumentError, "Unsupported protocol version: #{version}" end @@ -483,6 +716,10 @@ def encode_body body += encode_bytes(@connect_flags) body += encode_short(@keep_alive) + + # MQTT 5.0: encode properties after keep_alive + body += encode_properties(@properties) if @version == '5.0' + body += encode_string(@client_id) unless will_topic.nil? body += encode_string(@will_topic) @@ -503,6 +740,8 @@ def parse_body(buffer) @version = '3.1.0' elsif @protocol_name == 'MQTT' && @protocol_level == 4 @version = '3.1.1' + elsif @protocol_name == 'MQTT' && @protocol_level == 5 + @version = '5.0' else raise ProtocolException, "Unsupported protocol: #{@protocol_name}/#{@protocol_level}" end @@ -510,6 +749,10 @@ def parse_body(buffer) @connect_flags = shift_byte(buffer) @clean_session = ((@connect_flags & 0x02) >> 1) == 0x01 @keep_alive = shift_short(buffer) + + # MQTT 5.0: parse properties after keep_alive + @properties = parse_properties(buffer) if @version == '5.0' + @client_id = shift_string(buffer) if ((@connect_flags & 0x04) >> 2) == 0x01 # Last Will and Testament @@ -559,8 +802,11 @@ class Connack < MQTT::Packet # The return code (defaults to 0 for connection accepted) attr_accessor :return_code + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values - ATTR_DEFAULTS = { :return_code => 0x00 } + ATTR_DEFAULTS = { :return_code => 0x00, :properties => {} } # Create a new Client Connect packet def initialize(args = {}) @@ -604,6 +850,7 @@ def encode_body body = '' body += encode_bits(@connack_flags) body += encode_bytes(@return_code.to_i) + body += encode_properties(@properties) if @version == '5.0' body end @@ -616,6 +863,14 @@ def parse_body(buffer) end @return_code = shift_byte(buffer) + # MQTT 5.0: body_length >= 3 indicates properties are present + # (3.1.1 Connack is always exactly 2 bytes: flags + return_code) + if @body_length && @body_length >= 3 + @properties = parse_properties(buffer) + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Connect Acknowledgment packet' end @@ -628,9 +883,20 @@ def inspect # Class representing an MQTT Publish Acknowledgment packet class Puback < MQTT::Packet + # MQTT 5.0 reason code (default 0x00 = success) + attr_accessor :reason_code + + # MQTT 5.0 properties hash + attr_accessor :properties + # Get serialisation of packet's body def encode_body - encode_short(@id) + body = encode_short(@id) + if @version == '5.0' + body += encode_bytes(@reason_code || 0x00) + body += encode_properties(@properties) + end + body end # Parse the body (variable header and payload) of a packet @@ -638,6 +904,14 @@ def parse_body(buffer) super(buffer) @id = shift_short(buffer) + # MQTT 5.0: body_length >= 3 means reason code and possibly properties + if @body_length && @body_length >= 3 + @reason_code = shift_byte(buffer) + @properties = parse_properties(buffer) unless buffer.empty? + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Publish Acknowledgment packet' end @@ -650,9 +924,20 @@ def inspect # Class representing an MQTT Publish Received packet class Pubrec < MQTT::Packet + # MQTT 5.0 reason code (default 0x00 = success) + attr_accessor :reason_code + + # MQTT 5.0 properties hash + attr_accessor :properties + # Get serialisation of packet's body def encode_body - encode_short(@id) + body = encode_short(@id) + if @version == '5.0' + body += encode_bytes(@reason_code || 0x00) + body += encode_properties(@properties) + end + body end # Parse the body (variable header and payload) of a packet @@ -660,6 +945,14 @@ def parse_body(buffer) super(buffer) @id = shift_short(buffer) + # MQTT 5.0: body_length >= 3 means reason code and possibly properties + if @body_length && @body_length >= 3 + @reason_code = shift_byte(buffer) + @properties = parse_properties(buffer) unless buffer.empty? + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Publish Received packet' end @@ -672,6 +965,12 @@ def inspect # Class representing an MQTT Publish Release packet class Pubrel < MQTT::Packet + # MQTT 5.0 reason code (default 0x00 = success) + attr_accessor :reason_code + + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { :flags => [false, true, false, false] @@ -684,7 +983,12 @@ def initialize(args = {}) # Get serialisation of packet's body def encode_body - encode_short(@id) + body = encode_short(@id) + if @version == '5.0' + body += encode_bytes(@reason_code || 0x00) + body += encode_properties(@properties) + end + body end # Parse the body (variable header and payload) of a packet @@ -692,6 +996,14 @@ def parse_body(buffer) super(buffer) @id = shift_short(buffer) + # MQTT 5.0: body_length >= 3 means reason code and possibly properties + if @body_length && @body_length >= 3 + @reason_code = shift_byte(buffer) + @properties = parse_properties(buffer) unless buffer.empty? + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Publish Release packet' end @@ -711,9 +1023,20 @@ def inspect # Class representing an MQTT Publish Complete packet class Pubcomp < MQTT::Packet + # MQTT 5.0 reason code (default 0x00 = success) + attr_accessor :reason_code + + # MQTT 5.0 properties hash + attr_accessor :properties + # Get serialisation of packet's body def encode_body - encode_short(@id) + body = encode_short(@id) + if @version == '5.0' + body += encode_bytes(@reason_code || 0x00) + body += encode_properties(@properties) + end + body end # Parse the body (variable header and payload) of a packet @@ -721,6 +1044,14 @@ def parse_body(buffer) super(buffer) @id = shift_short(buffer) + # MQTT 5.0: body_length >= 3 means reason code and possibly properties + if @body_length && @body_length >= 3 + @reason_code = shift_byte(buffer) + @properties = parse_properties(buffer) unless buffer.empty? + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Publish Complete packet' end @@ -736,10 +1067,14 @@ class Subscribe < MQTT::Packet # One or more topic filters to subscribe to attr_accessor :topics + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { :topics => [], - :flags => [false, true, false, false] + :flags => [false, true, false, false], + :properties => {} } # Create a new Subscribe packet @@ -789,6 +1124,7 @@ def topics=(value) def encode_body raise 'no topics given when serialising packet' if @topics.empty? body = encode_short(@id) + body += encode_properties(@properties) if @version == '5.0' topics.each do |item| body += encode_string(item[0]) body += encode_bytes(item[1]) @@ -800,6 +1136,7 @@ def encode_body def parse_body(buffer) super(buffer) @id = shift_short(buffer) + @properties = parse_properties(buffer) if @version == '5.0' @topics = [] while buffer.bytesize > 0 topic_name = shift_string(buffer) @@ -829,9 +1166,13 @@ class Suback < MQTT::Packet # An array of return codes, ordered by the topics that were subscribed to attr_accessor :return_codes + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { - :return_codes => [] + :return_codes => [], + :properties => {} } # Create a new Subscribe Acknowledgment packet @@ -857,6 +1198,7 @@ def encode_body raise 'no granted QoS given when serialising packet' end body = encode_short(@id) + body += encode_properties(@properties) if @version == '5.0' return_codes.each { |qos| body += encode_bytes(qos) } body end @@ -865,6 +1207,7 @@ def encode_body def parse_body(buffer) super(buffer) @id = shift_short(buffer) + @properties = parse_properties(buffer) if @version == '5.0' @return_codes << shift_byte(buffer) while buffer.bytesize > 0 end @@ -891,10 +1234,14 @@ class Unsubscribe < MQTT::Packet # One or more topic paths to unsubscribe from attr_accessor :topics + # MQTT 5.0 properties hash + attr_accessor :properties + # Default attribute values ATTR_DEFAULTS = { :topics => [], - :flags => [false, true, false, false] + :flags => [false, true, false, false], + :properties => {} } # Create a new Unsubscribe packet @@ -911,6 +1258,7 @@ def topics=(value) def encode_body raise 'no topics given when serialising packet' if @topics.empty? body = encode_short(@id) + body += encode_properties(@properties) if @version == '5.0' topics.each { |topic| body += encode_string(topic) } body end @@ -919,6 +1267,7 @@ def encode_body def parse_body(buffer) super(buffer) @id = shift_short(buffer) + @properties = parse_properties(buffer) if @version == '5.0' @topics << shift_string(buffer) while buffer.bytesize > 0 end @@ -940,14 +1289,31 @@ def inspect # Class representing an MQTT Unsubscribe Acknowledgment packet class Unsuback < MQTT::Packet + # MQTT 5.0 reason codes (one per topic) + attr_accessor :reason_codes + + # MQTT 5.0 properties hash + attr_accessor :properties + + # Default attribute values + ATTR_DEFAULTS = { + :reason_codes => [], + :properties => {} + } + # Create a new Unsubscribe Acknowledgment packet def initialize(args = {}) - super(args) + super(ATTR_DEFAULTS.merge(args)) end # Get serialisation of packet's body def encode_body - encode_short(@id) + body = encode_short(@id) + if @version == '5.0' + body += encode_properties(@properties) + @reason_codes.each { |rc| body += encode_bytes(rc) } + end + body end # Parse the body (variable header and payload) of a packet @@ -955,6 +1321,15 @@ def parse_body(buffer) super(buffer) @id = shift_short(buffer) + # MQTT 5.0: body_length >= 3 means properties and reason codes + if @body_length && @body_length >= 3 + @properties = parse_properties(buffer) + @reason_codes = [] + @reason_codes << shift_byte(buffer) while buffer.bytesize > 0 + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Unsubscribe Acknowledgment packet' end @@ -999,15 +1374,43 @@ def parse_body(buffer) # Class representing an MQTT Client Disconnect packet class Disconnect < MQTT::Packet + # MQTT 5.0 reason code (default 0x00 = normal disconnection) + attr_accessor :reason_code + + # MQTT 5.0 properties hash + attr_accessor :properties + + # Default attribute values + ATTR_DEFAULTS = { + :reason_code => 0x00, + :properties => {} + } + # Create a new Client Disconnect packet def initialize(args = {}) - super(args) + super(ATTR_DEFAULTS.merge(args)) + end + + # Get serialisation of packet's body + def encode_body + return '' unless @version == '5.0' + body = encode_bytes(@reason_code || 0x00) + body += encode_properties(@properties) + body end # Check the body def parse_body(buffer) super(buffer) + # MQTT 5.0: body_length >= 1 means reason code present + if @body_length && @body_length >= 1 + @reason_code = shift_byte(buffer) + @properties = parse_properties(buffer) unless buffer.empty? + @version = '5.0' + return + end + return if buffer.empty? raise ProtocolException, 'Extra bytes at end of Disconnect packet' end diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index f8b671a..be4dc68 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -39,7 +39,7 @@ def now client = MQTT::Client.new expect(client.host).to eq(nil) expect(client.port).to eq(1883) - expect(client.version).to eq('3.1.1') + expect(client.version).to eq('5.0') expect(client.keep_alive).to eq(15) end @@ -382,6 +382,7 @@ def now end it "should include the username and password for an authenticated connection" do + client.version = '3.1.1' client.username = 'username' client.password = 'password' client.connect('myclient') @@ -512,6 +513,7 @@ def now end it "should include the will in the CONNECT message" do + client.version = '3.1.1' client.connect('myclient') expect(socket.string).to eq( "\x10\x22"+ @@ -613,6 +615,7 @@ def now end it "should write a valid DISCONNECT packet to the socket if connected and the send_msg=true an" do + client.version = '3.1.1' allow(client).to receive(:connected?).and_return(true) client.disconnect(true) expect(socket.string).to eq("\xE0\x00") @@ -687,22 +690,26 @@ def wait_for_puback(id) end it "should write a valid PUBLISH packet to the socket without the retain flag" do + client.version = '3.1.1' client.publish('topic','payload', false, 0) expect(socket.string).to eq("\x30\x0e\x00\x05topicpayload") end it "should write a valid PUBLISH packet to the socket with the retain flag set" do + client.version = '3.1.1' client.publish('topic','payload', true, 0) expect(socket.string).to eq("\x31\x0e\x00\x05topicpayload") end it "should write a valid PUBLISH packet to the socket with the QoS set to 1" do + client.version = '3.1.1' inject_puback(1) client.publish('topic','payload', false, 1) expect(socket.string).to eq("\x32\x10\x00\x05topic\x00\x01payload") end it "should wrap the packet id after 65535" do + client.version = '3.1.1' 0xffff.times do |n| inject_puback(n + 1) client.publish('topic','payload', false, 1) @@ -716,17 +723,20 @@ def wait_for_puback(id) end it "should write a valid PUBLISH packet to the socket with the QoS set to 2" do + client.version = '3.1.1' inject_puback(1) client.publish('topic','payload', false, 2) expect(socket.string).to eq("\x34\x10\x00\x05topic\x00\x01payload") end it "should write a valid PUBLISH packet with no payload" do + client.version = '3.1.1' client.publish('test') expect(socket.string).to eq("\x30\x06\x00\x04test") end it "should write a valid PUBLISH packet with frozen payload" do + client.version = '3.1.1' client.publish('topic', 'payload'.freeze, false, 0) expect(socket.string).to eq("\x30\x0e\x00\x05topicpayload") end @@ -780,21 +790,25 @@ def wait_for_puback(id) end it "should write a valid SUBSCRIBE packet to the socket if given a single topic String" do + client.version = '3.1.1' client.subscribe('a/b') expect(socket.string).to eq("\x82\x08\x00\x01\x00\x03a/b\x00") end it "should write a valid SUBSCRIBE packet to the socket if given a two topic Strings in an Array" do + client.version = '3.1.1' client.subscribe('a/b','c/d') expect(socket.string).to eq("\x82\x0e\x00\x01\x00\x03a/b\x00\x00\x03c/d\x00") end it "should write a valid SUBSCRIBE packet to the socket if given a two topic Strings with QoS in an Array" do + client.version = '3.1.1' client.subscribe(['a/b',0],['c/d',1]) expect(socket.string).to eq("\x82\x0e\x00\x01\x00\x03a/b\x00\x00\x03c/d\x01") end it "should write a valid SUBSCRIBE packet to the socket if given a two topic Strings with QoS in a Hash" do + client.version = '3.1.1' client.subscribe('a/b' => 0,'c/d' => 1) expect(socket.string).to eq("\x82\x0e\x00\x01\x00\x03a/b\x00\x00\x03c/d\x01") end @@ -961,16 +975,19 @@ def wait_for_puback(id) end it "should write a valid UNSUBSCRIBE packet to the socket if given a single topic String" do + client.version = '3.1.1' client.unsubscribe('a/b') expect(socket.string).to eq("\xa2\x07\x00\x01\x00\x03a/b") end it "should write a valid UNSUBSCRIBE packet to the socket if given a two topic Strings" do + client.version = '3.1.1' client.unsubscribe('a/b','c/d') expect(socket.string).to eq("\xa2\x0c\x00\x01\x00\x03a/b\x00\x03c/d") end it "should write a valid UNSUBSCRIBE packet to the socket if given an array of Strings" do + client.version = '3.1.1' client.unsubscribe(['a/b','c/d']) expect(socket.string).to eq("\xa2\x0c\x00\x01\x00\x03a/b\x00\x03c/d") end diff --git a/spec/mqtt_packet_spec.rb b/spec/mqtt_packet_spec.rb index 928a9ed..bee1b15 100644 --- a/spec/mqtt_packet_spec.rb +++ b/spec/mqtt_packet_spec.rb @@ -99,6 +99,180 @@ expect(packet.id).to eq(4321) end end + + describe "MQTT 5.0 encoding methods" do + let(:packet) { MQTT::Packet.new } + + describe "encode_int" do + it "should encode a 32-bit unsigned integer in big-endian" do + data = packet.send(:encode_int, 0x12345678) + expect(data).to eq("\x12\x34\x56\x78") + expect(data.encoding.to_s).to eq("ASCII-8BIT") + end + + it "should encode zero correctly" do + expect(packet.send(:encode_int, 0)).to eq("\x00\x00\x00\x00") + end + + it "should encode max value correctly" do + expect(packet.send(:encode_int, 0xFFFFFFFF)).to eq("\xFF\xFF\xFF\xFF") + end + + it "should raise an error if value is too big" do + expect { + packet.send(:encode_int, 0x100000000) + }.to raise_error('Value too big for int') + end + end + + describe "shift_int" do + it "should extract a 32-bit unsigned integer from a buffer" do + buffer = "\x12\x34\x56\x78remaining" + expect(packet.send(:shift_int, buffer)).to eq(0x12345678) + expect(buffer).to eq('remaining') + end + end + + describe "encode_variable_byte_integer" do + it "should encode single byte values (0-127)" do + expect(packet.send(:encode_variable_byte_integer, 0)).to eq("\x00") + expect(packet.send(:encode_variable_byte_integer, 127)).to eq("\x7F") + end + + it "should encode two byte values (128-16383)" do + expect(packet.send(:encode_variable_byte_integer, 128)).to eq("\x80\x01") + expect(packet.send(:encode_variable_byte_integer, 16383)).to eq("\xFF\x7F") + end + + it "should encode three byte values (16384-2097151)" do + expect(packet.send(:encode_variable_byte_integer, 16384)).to eq("\x80\x80\x01") + expect(packet.send(:encode_variable_byte_integer, 2097151)).to eq("\xFF\xFF\x7F") + end + + it "should encode four byte values (2097152-268435455)" do + expect(packet.send(:encode_variable_byte_integer, 2097152)).to eq("\x80\x80\x80\x01") + expect(packet.send(:encode_variable_byte_integer, 268435455)).to eq("\xFF\xFF\xFF\x7F") + end + end + + describe "shift_variable_byte_integer" do + it "should decode single byte values" do + buffer = "\x00remaining" + expect(packet.send(:shift_variable_byte_integer, buffer)).to eq(0) + expect(buffer).to eq('remaining') + + buffer = "\x7Fremaining" + expect(packet.send(:shift_variable_byte_integer, buffer)).to eq(127) + expect(buffer).to eq('remaining') + end + + it "should decode two byte values" do + buffer = "\x80\x01remaining" + expect(packet.send(:shift_variable_byte_integer, buffer)).to eq(128) + expect(buffer).to eq('remaining') + end + + it "should decode three byte values" do + buffer = "\x80\x80\x01remaining" + expect(packet.send(:shift_variable_byte_integer, buffer)).to eq(16384) + expect(buffer).to eq('remaining') + end + + it "should decode four byte values" do + buffer = "\xFF\xFF\xFF\x7Fremaining" + expect(packet.send(:shift_variable_byte_integer, buffer)).to eq(268435455) + expect(buffer).to eq('remaining') + end + end + + describe "encode_string_pair" do + it "should encode a key-value string pair" do + data = packet.send(:encode_string_pair, 'key', 'value') + expect(data).to eq("\x00\x03key\x00\x05value") + end + end + + describe "shift_string_pair" do + it "should decode a key-value string pair" do + buffer = "\x00\x03key\x00\x05valueremaining" + key, value = packet.send(:shift_string_pair, buffer) + expect(key).to eq('key') + expect(value).to eq('value') + expect(buffer).to eq('remaining') + end + end + + describe "encode_properties" do + it "should encode empty properties as a single zero byte" do + expect(packet.send(:encode_properties, {})).to eq("\x00") + expect(packet.send(:encode_properties, nil)).to eq("\x00") + end + + it "should encode session_expiry_interval (4-byte int)" do + props = { session_expiry_interval: 3600 } + # Length byte (5) + prop_id (0x11) + 4-byte int (3600 = 0x00000E10) + expect(packet.send(:encode_properties, props)).to eq("\x05\x11\x00\x00\x0E\x10") + end + + it "should encode reason_string (string)" do + props = { reason_string: 'test' } + # Length (7) + prop_id (0x1F) + string length (2) + 'test' (4) + expect(packet.send(:encode_properties, props)).to eq("\x07\x1F\x00\x04test") + end + + it "should encode user_property as repeatable" do + props = { user_property: [['key1', 'val1'], ['key2', 'val2']] } + encoded = packet.send(:encode_properties, props) + # Should have two user properties + expect(encoded.scan(/\x26/).length).to eq(2) + end + + it "should encode receive_maximum (2-byte int)" do + props = { receive_maximum: 100 } + expect(packet.send(:encode_properties, props)).to eq("\x03\x21\x00\x64") + end + end + + describe "parse_properties" do + it "should parse empty properties" do + buffer = "\x00" + expect(packet.send(:parse_properties, buffer)).to eq({}) + end + + it "should parse session_expiry_interval" do + buffer = "\x05\x11\x00\x00\x0E\x10" + props = packet.send(:parse_properties, buffer) + expect(props[:session_expiry_interval]).to eq(3600) + end + + it "should parse reason_string" do + buffer = "\x07\x1F\x00\x04test" + props = packet.send(:parse_properties, buffer) + expect(props[:reason_string]).to eq('test') + end + + it "should parse multiple user_property values" do + # Two user properties: each is 1 (id) + 2+4 (key) + 2+4 (val) = 13 bytes + # Total = 26 bytes = 0x1A + buffer = "\x1A\x26\x00\x04key1\x00\x04val1\x26\x00\x04key2\x00\x04val2" + props = packet.send(:parse_properties, buffer) + expect(props[:user_property]).to eq([['key1', 'val1'], ['key2', 'val2']]) + end + + it "should roundtrip encode/parse properties" do + original = { + session_expiry_interval: 7200, + reason_string: 'hello', + receive_maximum: 50 + } + encoded = packet.send(:encode_properties, original) + parsed = packet.send(:parse_properties, encoded) + expect(parsed[:session_expiry_interval]).to eq(7200) + expect(parsed[:reason_string]).to eq('hello') + expect(parsed[:receive_maximum]).to eq(50) + end + end + end end describe MQTT::Packet::Publish do @@ -456,6 +630,64 @@ expect(packet.inspect).to eq("#") end end + + describe "MQTT 5.0" do + describe "when serialising a packet" do + it "should output correct bytes for 5.0 publish with empty properties" do + packet = MQTT::Packet::Publish.new( + :version => '5.0', + :topic => 'test', + :payload => 'hi' + ) + # Topic (2+4) + props_len (1) + payload (2) = 9 bytes + expect(packet.to_s).to eq("\x30\x09\x00\x04test\x00hi") + end + + it "should output correct bytes with message_expiry_interval" do + packet = MQTT::Packet::Publish.new( + :version => '5.0', + :topic => 'a', + :payload => 'x', + :properties => { message_expiry_interval: 60 } + ) + encoded = packet.to_s + expect(encoded).to include("\x02") # message_expiry_interval property id + end + + it "should include properties between header and payload" do + packet = MQTT::Packet::Publish.new( + :version => '5.0', + :qos => 1, + :id => 1, + :topic => 't', + :payload => 'p', + :properties => { user_property: [['k', 'v']] } + ) + encoded = packet.to_s + expect(encoded).to include("\x26") # user_property id + end + end + + describe "when parsing a 5.0 Publish with properties" do + let(:packet) do + # Build packet: topic 'a' + props (message_expiry=60) + payload 'x' + # Topic: \x00\x01a, Props: \x05\x02\x00\x00\x00\x3C, Payload: x + MQTT::Packet.parse("\x30\x0A\x00\x01a\x05\x02\x00\x00\x00\x3Cx", version: '5.0') + end + + it "should parse message_expiry_interval property" do + expect(packet.properties[:message_expiry_interval]).to eq(60) + end + + it "should set the topic correctly" do + expect(packet.topic).to eq('a') + end + + it "should set the payload correctly" do + expect(packet.payload).to eq('x') + end + end + end end describe MQTT::Packet::Connect do @@ -587,6 +819,34 @@ end end + context 'protocol version 5.0' do + it "should output the correct bytes for a basic 5.0 connect packet" do + packet = MQTT::Packet::Connect.new(:version => '5.0', :client_id => 'myclient') + # 0x10 = CONNECT, length, "MQTT", 0x05 (level), 0x02 (clean), keep_alive, props (0x00), client_id + expect(packet.to_s).to eq("\x10\x15\x00\x04MQTT\x05\x02\x00\x0f\x00\x00\x08myclient") + end + + it "should output correct bytes with session_expiry_interval property" do + packet = MQTT::Packet::Connect.new( + :version => '5.0', + :client_id => 'test', + :properties => { session_expiry_interval: 3600 } + ) + # Properties: 0x05 (length) + 0x11 (session_expiry) + 4 bytes (3600) + expect(packet.to_s).to eq("\x10\x16\x00\x04MQTT\x05\x02\x00\x0f\x05\x11\x00\x00\x0E\x10\x00\x04test") + end + + it "should output correct bytes with user_property" do + packet = MQTT::Packet::Connect.new( + :version => '5.0', + :client_id => 'c', + :properties => { user_property: [['k', 'v']] } + ) + encoded = packet.to_s + expect(encoded).to include("\x26") # user property id + end + end + context 'an invalid protocol version number' do it "should raise a protocol exception" do expect { @@ -700,6 +960,50 @@ end end + describe "when parsing a simple 5.0 Connect packet" do + let(:packet) do + MQTT::Packet.parse( + "\x10\x15\x00\x04MQTT\x05\x02\x00\x0a\x00\x00\x08myclient" + ) + end + + it "should correctly create the right type of packet object" do + expect(packet.class).to eq(MQTT::Packet::Connect) + end + + it "should set the Protocol Level of the packet correctly" do + expect(packet.protocol_level).to eq(5) + end + + it "should set the Protocol version of the packet correctly" do + expect(packet.version).to eq('5.0') + end + + it "should set the Client Identifier correctly" do + expect(packet.client_id).to eq('myclient') + end + + it "should set properties to empty hash" do + expect(packet.properties).to eq({}) + end + end + + describe "when parsing a 5.0 Connect packet with properties" do + let(:packet) do + MQTT::Packet.parse( + "\x10\x1A\x00\x04MQTT\x05\x02\x00\x0a\x05\x11\x00\x00\x0E\x10\x00\x08myclient" + ) + end + + it "should parse session_expiry_interval property" do + expect(packet.properties[:session_expiry_interval]).to eq(3600) + end + + it "should set the Protocol version correctly" do + expect(packet.version).to eq('5.0') + end + end + describe "when parsing a Connect packet with the clean session flag set" do let(:packet) do MQTT::Packet.parse( @@ -1215,13 +1519,16 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do + it "should raise an exception for 3.1.1 (body length 2 + extra)" do + # Body length says 3 but we only have 2 bytes of valid 3.1.1 data + # This is actually valid as 5.0 with empty properties, so test a different case expect { - packet = MQTT::Packet.parse( "\x20\x03\x00\x00\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Connect Acknowledgment packet" - ) + # Force a truly invalid packet: body_length=2 but 3 bytes provided + buffer = "\x20\x02\x00\x00".dup + buffer << "\x00" # extra byte after valid 3.1.1 body + # The parse will fail because buffer length doesn't match body_length + MQTT::Packet.parse(buffer + "extra") + }.to raise_error(MQTT::ProtocolException) end end @@ -1246,6 +1553,53 @@ expect(packet.inspect).to eq("#") end end + + describe "MQTT 5.0" do + describe "when serialising a packet" do + it "should output correct bytes for basic 5.0 connack" do + packet = MQTT::Packet::Connack.new(:version => '5.0', :return_code => 0x00) + # Flags (1) + reason code (1) + properties length (1) + expect(packet.to_s).to eq("\x20\x03\x00\x00\x00") + end + + it "should output correct bytes with properties" do + packet = MQTT::Packet::Connack.new( + :version => '5.0', + :return_code => 0x00, + :properties => { session_expiry_interval: 3600 } + ) + encoded = packet.to_s + expect(encoded).to include("\x11") # session_expiry_interval property id + end + end + + describe "when parsing a basic 5.0 Connack packet" do + let(:packet) { MQTT::Packet.parse("\x20\x03\x00\x00\x00") } + + it "should correctly create the right type" do + expect(packet.class).to eq(MQTT::Packet::Connack) + end + + it "should set the return code correctly" do + expect(packet.return_code).to eq(0x00) + end + + it "should set properties to empty hash" do + expect(packet.properties).to eq({}) + end + end + + describe "when parsing a 5.0 Connack with properties" do + let(:packet) do + # Flags (0x00) + reason code (0x00) + props length (5) + session_expiry (0x11) + value + MQTT::Packet.parse("\x20\x08\x00\x00\x05\x11\x00\x00\x0E\x10") + end + + it "should parse session_expiry_interval property" do + expect(packet.properties[:session_expiry_interval]).to eq(3600) + end + end + end end describe MQTT::Packet::Puback do @@ -1269,13 +1623,12 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do - expect { - packet = MQTT::Packet.parse( "\x40\x03\x12\x34\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Publish Acknowledgment packet" - ) + it "should interpret as MQTT 5.0 with reason code" do + # body_length=3 is now valid as 5.0: id (2) + reason_code (1) + packet = MQTT::Packet.parse("\x40\x03\x12\x34\x00") + expect(packet.id).to eq(0x1234) + expect(packet.reason_code).to eq(0x00) + expect(packet.version).to eq('5.0') end end @@ -1290,6 +1643,18 @@ end end + describe "MQTT 5.0" do + it "should encode with reason code and empty properties" do + packet = MQTT::Packet::Puback.new(:version => '5.0', :id => 0x0001) + expect(packet.to_s).to eq("\x40\x04\x00\x01\x00\x00") + end + + it "should encode with non-zero reason code" do + packet = MQTT::Packet::Puback.new(:version => '5.0', :id => 0x0001, :reason_code => 0x10) + expect(packet.to_s).to eq("\x40\x04\x00\x01\x10\x00") + end + end + it "should output the right string when calling inspect" do packet = MQTT::Packet::Puback.new( :id => 0x1234 ) expect(packet.inspect).to eq("#") @@ -1317,13 +1682,11 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do - expect { - packet = MQTT::Packet.parse( "\x50\x03\x12\x34\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Publish Received packet" - ) + it "should interpret as MQTT 5.0 with reason code" do + packet = MQTT::Packet.parse("\x50\x03\x12\x34\x00") + expect(packet.id).to eq(0x1234) + expect(packet.reason_code).to eq(0x00) + expect(packet.version).to eq('5.0') end end @@ -1365,13 +1728,11 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do - expect { - packet = MQTT::Packet.parse( "\x62\x03\x12\x34\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Publish Release packet" - ) + it "should interpret as MQTT 5.0 with reason code" do + packet = MQTT::Packet.parse("\x62\x03\x12\x34\x00") + expect(packet.id).to eq(0x1234) + expect(packet.reason_code).to eq(0x00) + expect(packet.version).to eq('5.0') end end @@ -1413,13 +1774,11 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do - expect { - MQTT::Packet.parse( "\x70\x03\x12\x34\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Publish Complete packet" - ) + it "should interpret as MQTT 5.0 with reason code" do + packet = MQTT::Packet.parse("\x70\x03\x12\x34\x00") + expect(packet.id).to eq(0x1234) + expect(packet.reason_code).to eq(0x00) + expect(packet.version).to eq('5.0') end end @@ -1743,13 +2102,11 @@ end describe "when parsing packet with extra bytes on the end" do - it "should raise an exception" do - expect { - packet = MQTT::Packet.parse( "\xB0\x03\x12\x34\x00" ) - }.to raise_error( - MQTT::ProtocolException, - "Extra bytes at end of Unsubscribe Acknowledgment packet" - ) + it "should interpret as MQTT 5.0 with properties" do + # body_length=3 is valid as 5.0: id (2) + props_len (1) + packet = MQTT::Packet.parse("\xB0\x03\x12\x34\x00") + expect(packet.id).to eq(0x1234) + expect(packet.version).to eq('5.0') end end @@ -1865,12 +2222,11 @@ expect(packet.class).to eq(MQTT::Packet::Disconnect) end - it "should raise an exception if the packet has a payload" do - expect { - MQTT::Packet.parse( "\xE0\x05hello" ) - }.to raise_error( - 'Extra bytes at end of Disconnect packet' - ) + it "should interpret as MQTT 5.0 with reason code when body present" do + # body_length >= 1 means MQTT 5.0 with reason code + packet = MQTT::Packet.parse("\xE0\x02\x00\x00") # reason_code=0, empty props + expect(packet.reason_code).to eq(0x00) + expect(packet.version).to eq('5.0') end end diff --git a/spec/zz_client_integration_spec.rb b/spec/zz_client_integration_spec.rb index 0a2bc08..519b651 100644 --- a/spec/zz_client_integration_spec.rb +++ b/spec/zz_client_integration_spec.rb @@ -15,7 +15,7 @@ @server.logger.level = Logger::WARN @server.start - @client = MQTT::Client.new(@server.address, @server.port) + @client = MQTT::Client.new(@server.address, @server.port, :version => '3.1.1') end after(:each) do From 5b0b834b1235253348f63dd2784f13e82738bb7e Mon Sep 17 00:00:00 2001 From: Jonathan Siegel <248302+usiegj00@users.noreply.github.com> Date: Wed, 7 Jan 2026 23:35:27 +0900 Subject: [PATCH 2/2] Fix MQTT 5.0 message parsing - pass version to Packet.read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When receiving packets from a broker, the version was not being passed to Packet.read(), causing MQTT 5.0 properties to not be parsed. This resulted in the properties length byte being left in the payload. Changes: - Add version parameter to MQTT::Packet.read() - Pass @version from client when reading packets - Update receive_packet tests to use explicit version 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lib/mqtt/client.rb | 2 +- lib/mqtt/packet.rb | 9 +++++++-- spec/mqtt_client_spec.rb | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index f236c5c..54bb035 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -477,7 +477,7 @@ def receive_packet handle_timeouts unless result.nil? # Yes - read in the packet - packet = MQTT::Packet.read(@socket) + packet = MQTT::Packet.read(@socket, version: @version) handle_packet packet end keep_alive! diff --git a/lib/mqtt/packet.rb b/lib/mqtt/packet.rb index 291d9e8..9ec2e45 100644 --- a/lib/mqtt/packet.rb +++ b/lib/mqtt/packet.rb @@ -23,8 +23,10 @@ class Packet :body_length => nil } - # Read in a packet from a socket - def self.read(socket) + # Read a packet from a socket + # @param socket [IO] Socket to read from + # @param version [String] Optional MQTT version ('3.1.0', '3.1.1', '5.0') for context + def self.read(socket, version: nil) # Read in the packet header and create a new packet object packet = create_from_header( read_byte(socket) @@ -47,6 +49,9 @@ def self.read(socket) # Store the expected body length in the packet packet.instance_variable_set('@body_length', body_length) + # Set version before parsing body so properties are handled correctly + packet.version = version if version + # Read in the packet body packet.parse_body(socket.read(body_length)) diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index be4dc68..bd95924 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -995,6 +995,7 @@ def wait_for_puback(id) describe "when calling the 'receive_packet' method" do before(:each) do + client.version = '3.1.1' client.instance_variable_set('@socket', socket) allow(IO).to receive(:select).and_return([[socket], [], []]) @read_queue = client.instance_variable_get('@read_queue')