mqtt: rustfmt

pull/7223/head
Sascha Steinbiss 4 years ago committed by Victor Julien
parent 1ba62993d5
commit 2a3ed9a6ae

@ -17,7 +17,7 @@
// written by Sascha Steinbiss <sascha@steinbiss.name> // written by Sascha Steinbiss <sascha@steinbiss.name>
use crate::mqtt::mqtt::{MQTTTransaction, MQTTState}; use crate::mqtt::mqtt::{MQTTState, MQTTTransaction};
use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode}; use crate::mqtt::mqtt_message::{MQTTOperation, MQTTTypeCode};
use std::ffi::CStr; use std::ffi::CStr;
use std::ptr; use std::ptr;
@ -33,31 +33,24 @@ pub enum MQTTFlagState {
} }
#[inline] #[inline]
fn check_flag_state( fn check_flag_state(flag_state: MQTTFlagState, flag_value: bool, ok: &mut bool) {
flag_state: MQTTFlagState,
flag_value: bool,
ok: &mut bool,
) {
match flag_state { match flag_state {
MQTTFlagState::MQTT_MUST_BE_SET => { MQTTFlagState::MQTT_MUST_BE_SET => {
if !flag_value { if !flag_value {
*ok = false; *ok = false;
} }
}, }
MQTTFlagState::MQTT_CANT_BE_SET => { MQTTFlagState::MQTT_CANT_BE_SET => {
if flag_value { if flag_value {
*ok = false; *ok = false;
} }
}, }
_ => {} _ => {}
} }
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_has_type( pub extern "C" fn rs_mqtt_tx_has_type(tx: &MQTTTransaction, mtype: u8) -> u8 {
tx: &MQTTTransaction,
mtype: u8,
) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if mtype == msg.header.message_type as u8 { if mtype == msg.header.message_type as u8 {
return 1; return 1;
@ -81,9 +74,7 @@ pub unsafe extern "C" fn rs_mqtt_cstr_message_code(
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_has_flags( pub extern "C" fn rs_mqtt_tx_has_flags(
tx: &MQTTTransaction, tx: &MQTTTransaction, qretain: MQTTFlagState, qdup: MQTTFlagState,
qretain: MQTTFlagState,
qdup: MQTTFlagState,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
let mut ok = true; let mut ok = true;
@ -98,10 +89,7 @@ pub extern "C" fn rs_mqtt_tx_has_flags(
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_has_qos( pub extern "C" fn rs_mqtt_tx_has_qos(tx: &MQTTTransaction, qos: u8) -> u8 {
tx: &MQTTTransaction,
qos: u8,
) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if qos == msg.header.qos_level { if qos == msg.header.qos_level {
return 1; return 1;
@ -111,20 +99,14 @@ pub extern "C" fn rs_mqtt_tx_has_qos(
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_get_protocol_version( pub extern "C" fn rs_mqtt_tx_get_protocol_version(state: &MQTTState) -> u8 {
state: &MQTTState,
) -> u8 {
return state.protocol_version; return state.protocol_version;
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_has_connect_flags( pub extern "C" fn rs_mqtt_tx_has_connect_flags(
tx: &MQTTTransaction, tx: &MQTTTransaction, username: MQTTFlagState, password: MQTTFlagState, will: MQTTFlagState,
username: MQTTFlagState, will_retain: MQTTFlagState, clean_session: MQTTFlagState,
password: MQTTFlagState,
will: MQTTFlagState,
will_retain: MQTTFlagState,
clean_session: MQTTFlagState,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -144,9 +126,7 @@ pub extern "C" fn rs_mqtt_tx_has_connect_flags(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid( pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -166,9 +146,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_clientid(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username( pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -190,9 +168,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_username(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password( pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -213,9 +189,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_password(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic( pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -237,9 +211,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willtopic(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage( pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNECT(ref cv) = msg.op { if let MQTTOperation::CONNECT(ref cv) = msg.op {
@ -261,8 +233,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connect_willmessage(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent( pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent(
tx: &MQTTTransaction, tx: &MQTTTransaction, session_present: *mut bool,
session_present: *mut bool,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::CONNACK(ref ca) = msg.op { if let MQTTOperation::CONNACK(ref ca) = msg.op {
@ -275,9 +246,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_connack_sessionpresent(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic( pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::PUBLISH(ref pubv) = msg.op { if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
@ -298,9 +267,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_topic(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message( pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message(
tx: &MQTTTransaction, tx: &MQTTTransaction, buffer: *mut *const u8, buffer_len: *mut u32,
buffer: *mut *const u8,
buffer_len: *mut u32,
) -> u8 { ) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::PUBLISH(ref pubv) = msg.op { if let MQTTOperation::PUBLISH(ref pubv) = msg.op {
@ -320,12 +287,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_publish_message(
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction, pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(
i: u32, tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
buf: *mut *const u8, ) -> u8 {
len: *mut u32)
-> u8
{
let mut offset = 0; let mut offset = 0;
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op { if let MQTTOperation::SUBSCRIBE(ref subv) = msg.op {
@ -349,12 +313,9 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_subscribe_topic(tx: &MQTTTransaction,
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction, pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(
i: u32, tx: &MQTTTransaction, i: u32, buf: *mut *const u8, len: *mut u32,
buf: *mut *const u8, ) -> u8 {
len: *mut u32)
-> u8
{
let mut offset = 0; let mut offset = 0;
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op { if let MQTTOperation::UNSUBSCRIBE(ref unsubv) = msg.op {
@ -378,10 +339,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_unsubscribe_topic(tx: &MQTTTransaction,
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code( pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(tx: &MQTTTransaction, result: *mut u8) -> u8 {
tx: &MQTTTransaction,
result: *mut u8,
) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
match msg.op { match msg.op {
MQTTOperation::PUBACK(ref v) MQTTOperation::PUBACK(ref v)
@ -407,17 +365,14 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_reason_code(
return 1; return 1;
} }
} }
_ => return 0 _ => return 0,
} }
} }
return 0; return 0;
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code( pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(tx: &MQTTTransaction, code: u8) -> u8 {
tx: &MQTTTransaction,
code: u8,
) -> u8 {
for msg in tx.msg.iter() { for msg in tx.msg.iter() {
if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op { if let MQTTOperation::UNSUBACK(ref unsuback) = msg.op {
if let Some(ref reason_codes) = unsuback.reason_codes { if let Some(ref reason_codes) = unsuback.reason_codes {
@ -435,10 +390,10 @@ pub extern "C" fn rs_mqtt_tx_unsuback_has_reason_code(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use std;
use crate::mqtt::mqtt::MQTTTransaction; use crate::mqtt::mqtt::MQTTTransaction;
use crate::mqtt::mqtt_message::*; use crate::mqtt::mqtt_message::*;
use crate::mqtt::parser::FixedHeader; use crate::mqtt::parser::FixedHeader;
use std;
#[test] #[test]
fn test_multi_unsubscribe() { fn test_multi_unsubscribe() {
@ -472,23 +427,23 @@ mod test {
}); });
let mut s: *const u8 = std::ptr::null_mut(); let mut s: *const u8 = std::ptr::null_mut();
let mut slen: u32 = 0; let mut slen: u32 = 0;
let mut r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen)}; let mut r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 0, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "foo"); assert_eq!(topic, "foo");
r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 1, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "baar"); assert_eq!(topic, "baar");
r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 2, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "fieee"); assert_eq!(topic, "fieee");
r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 3, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "baaaaz"); assert_eq!(topic, "baaaaz");
r = unsafe{rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_unsubscribe_topic(&t, 4, &mut s, &mut slen) };
assert_eq!(r, 0); assert_eq!(r, 0);
} }
@ -512,7 +467,8 @@ mod test {
MQTTSubscribeTopicData { MQTTSubscribeTopicData {
topic_name: "baar".to_string(), topic_name: "baar".to_string(),
qos: 1, qos: 1,
}], },
],
properties: None, properties: None,
}), }),
}); });
@ -534,29 +490,30 @@ mod test {
MQTTSubscribeTopicData { MQTTSubscribeTopicData {
topic_name: "baaaaz".to_string(), topic_name: "baaaaz".to_string(),
qos: 1, qos: 1,
}], },
],
properties: None, properties: None,
}), }),
}); });
let mut s: *const u8 = std::ptr::null_mut(); let mut s: *const u8 = std::ptr::null_mut();
let mut slen: u32 = 0; let mut slen: u32 = 0;
let mut r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen)}; let mut r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 0, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
let mut topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); let mut topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "foo"); assert_eq!(topic, "foo");
r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 1, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "baar"); assert_eq!(topic, "baar");
r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 2, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "fieee"); assert_eq!(topic, "fieee");
r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 3, &mut s, &mut slen) };
assert_eq!(r, 1); assert_eq!(r, 1);
topic = String::from_utf8_lossy(unsafe{build_slice!(s, slen as usize)}); topic = String::from_utf8_lossy(unsafe { build_slice!(s, slen as usize) });
assert_eq!(topic, "baaaaz"); assert_eq!(topic, "baaaaz");
r = unsafe{rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen)}; r = unsafe { rs_mqtt_tx_get_subscribe_topic(&t, 4, &mut s, &mut slen) };
assert_eq!(r, 0); assert_eq!(r, 0);
} }
} }

@ -17,17 +17,16 @@
// written by Sascha Steinbiss <sascha@steinbiss.name> // written by Sascha Steinbiss <sascha@steinbiss.name>
use std; use super::mqtt::{MQTTState, MQTTTransaction};
use super::mqtt::{MQTTTransaction, MQTTState};
use crate::jsonbuilder::{JsonBuilder, JsonError}; use crate::jsonbuilder::{JsonBuilder, JsonError};
use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData}; use crate::mqtt::mqtt_message::{MQTTOperation, MQTTSubscribeTopicData};
use crate::mqtt::parser::{FixedHeader}; use crate::mqtt::parser::FixedHeader;
use std;
pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0); pub const MQTT_LOG_PASSWORDS: u32 = BIT_U32!(0);
#[inline] #[inline]
fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<(), JsonError> {
{
js.start_object()?; js.start_object()?;
js.set_string("topic", &t.topic_name)?; js.set_string("topic", &t.topic_name)?;
js.set_uint("qos", t.qos as u64)?; js.set_uint("qos", t.qos as u64)?;
@ -36,8 +35,7 @@ fn log_mqtt_topic(js: &mut JsonBuilder, t: &MQTTSubscribeTopicData) -> Result<()
} }
#[inline] #[inline]
fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> fn log_mqtt_header(js: &mut JsonBuilder, hdr: &FixedHeader) -> Result<(), JsonError> {
{
js.set_uint("qos", hdr.qos_level as u64)?; js.set_uint("qos", hdr.qos_level as u64)?;
js.set_bool("retain", hdr.retain)?; js.set_bool("retain", hdr.retain)?;
js.set_bool("dup", hdr.dup_flag)?; js.set_bool("dup", hdr.dup_flag)?;
@ -247,12 +245,12 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
js.open_object("pingreq")?; js.open_object("pingreq")?;
log_mqtt_header(js, &msg.header)?; log_mqtt_header(js, &msg.header)?;
js.close()?; // pingreq js.close()?; // pingreq
}, }
MQTTOperation::PINGRESP => { MQTTOperation::PINGRESP => {
js.open_object("pingresp")?; js.open_object("pingresp")?;
log_mqtt_header(js, &msg.header)?; log_mqtt_header(js, &msg.header)?;
js.close()?; // pingresp js.close()?; // pingresp
}, }
MQTTOperation::AUTH(ref auth) => { MQTTOperation::AUTH(ref auth) => {
js.open_object("auth")?; js.open_object("auth")?;
log_mqtt_header(js, &msg.header)?; log_mqtt_header(js, &msg.header)?;
@ -265,7 +263,7 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
js.close()?; // properties js.close()?; // properties
} }
js.close()?; // auth js.close()?; // auth
}, }
MQTTOperation::DISCONNECT(ref disco) => { MQTTOperation::DISCONNECT(ref disco) => {
js.open_object("disconnect")?; js.open_object("disconnect")?;
log_mqtt_header(js, &msg.header)?; log_mqtt_header(js, &msg.header)?;
@ -280,15 +278,15 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
js.close()?; // properties js.close()?; // properties
} }
js.close()?; // disconnect js.close()?; // disconnect
}, }
MQTTOperation::TRUNCATED(ref trunc) => { MQTTOperation::TRUNCATED(ref trunc) => {
js.open_object(&trunc.original_message_type.to_lower_str())?; js.open_object(&trunc.original_message_type.to_lower_str())?;
log_mqtt_header(js, &msg.header)?; log_mqtt_header(js, &msg.header)?;
js.set_bool("truncated", true)?; js.set_bool("truncated", true)?;
js.set_uint("skipped_length", trunc.skipped_length as u64)?; js.set_uint("skipped_length", trunc.skipped_length as u64)?;
js.close()?; // truncated js.close()?; // truncated
}, }
MQTTOperation::UNASSIGNED => {}, MQTTOperation::UNASSIGNED => {}
} }
} }
js.close()?; // mqtt js.close()?; // mqtt
@ -297,7 +295,9 @@ fn log_mqtt(tx: &MQTTTransaction, flags: u32, js: &mut JsonBuilder) -> Result<()
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_logger_log(_state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder) -> bool { pub unsafe extern "C" fn rs_mqtt_logger_log(
_state: &mut MQTTState, tx: *mut std::os::raw::c_void, flags: u32, js: &mut JsonBuilder,
) -> bool {
let tx = cast_pointer!(tx, MQTTTransaction); let tx = cast_pointer!(tx, MQTTTransaction);
log_mqtt(tx, flags, js).is_ok() log_mqtt(tx, flags, js).is_ok()
} }

@ -19,8 +19,8 @@
use super::mqtt_message::*; use super::mqtt_message::*;
use super::parser::*; use super::parser::*;
use crate::applayer::{self, LoggerFlags};
use crate::applayer::*; use crate::applayer::*;
use crate::applayer::{self, LoggerFlags};
use crate::conf::conf_get; use crate::conf::conf_get;
use crate::core::*; use crate::core::*;
use nom7::Err; use nom7::Err;
@ -204,7 +204,7 @@ impl MQTTState {
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::PUBLISH(ref publish) => { MQTTOperation::PUBLISH(ref publish) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -219,7 +219,7 @@ impl MQTTState {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
self.transactions.push(tx); self.transactions.push(tx);
}, }
1..=2 => { 1..=2 => {
if let Some(pkt_id) = publish.message_id { if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -230,14 +230,14 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
_ => { _ => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx); self.transactions.push(tx);
} }
} }
}, }
MQTTOperation::SUBSCRIBE(ref subscribe) => { MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -253,19 +253,19 @@ impl MQTTState {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
self.transactions.push(tx); self.transactions.push(tx);
}, }
1..=2 => { 1..=2 => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id); tx.pkt_id = Some(pkt_id);
self.transactions.push(tx); self.transactions.push(tx);
}, }
_ => { _ => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx); self.transactions.push(tx);
} }
} }
}, }
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -281,19 +281,19 @@ impl MQTTState {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
self.transactions.push(tx); self.transactions.push(tx);
}, }
1..=2 => { 1..=2 => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id); tx.pkt_id = Some(pkt_id);
self.transactions.push(tx); self.transactions.push(tx);
}, }
_ => { _ => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx); self.transactions.push(tx);
} }
} }
}, }
MQTTOperation::CONNACK(ref _connack) => { MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
(*tx).msg.push(msg); (*tx).msg.push(msg);
@ -305,9 +305,8 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::PUBREC(ref v) MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
| MQTTOperation::PUBREL(ref v) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@ -321,9 +320,8 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::PUBACK(ref v) MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
| MQTTOperation::PUBCOMP(ref v) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@ -339,7 +337,7 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::SUBACK(ref suback) => { MQTTOperation::SUBACK(ref suback) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -356,7 +354,7 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::UNSUBACK(ref unsuback) => { MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
@ -373,20 +371,19 @@ impl MQTTState {
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
self.transactions.push(tx); self.transactions.push(tx);
} }
}, }
MQTTOperation::UNASSIGNED => { MQTTOperation::UNASSIGNED => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType); MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgType);
self.transactions.push(tx); self.transactions.push(tx);
}, }
MQTTOperation::TRUNCATED(_) => { MQTTOperation::TRUNCATED(_) => {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
self.transactions.push(tx); self.transactions.push(tx);
}, }
MQTTOperation::AUTH(_) MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
| MQTTOperation::DISCONNECT(_) => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@ -396,9 +393,8 @@ impl MQTTState {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
tx.complete = true; tx.complete = true;
self.transactions.push(tx); self.transactions.push(tx);
}, }
MQTTOperation::PINGREQ MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
| MQTTOperation::PINGRESP => {
if !self.connected { if !self.connected {
let mut tx = self.new_tx(msg, toclient); let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
@ -419,7 +415,11 @@ impl MQTTState {
} }
let mut consumed = 0; let mut consumed = 0;
SCLogDebug!("skip_request {} input len {}", self.skip_request, input.len()); SCLogDebug!(
"skip_request {} input len {}",
self.skip_request,
input.len()
);
if self.skip_request > 0 { if self.skip_request > 0 {
if input.len() <= self.skip_request { if input.len() <= self.skip_request {
SCLogDebug!("reducing skip_request by {}", input.len()); SCLogDebug!("reducing skip_request by {}", input.len());
@ -427,13 +427,16 @@ impl MQTTState {
return AppLayerResult::ok(); return AppLayerResult::ok();
} else { } else {
current = &input[self.skip_request..]; current = &input[self.skip_request..];
SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); SCLogDebug!(
"skip end reached, skipping {} :{:?}",
self.skip_request,
current
);
consumed = self.skip_request; consumed = self.skip_request;
self.skip_request = 0; self.skip_request = 0;
} }
} }
while current.len() > 0 { while current.len() > 0 {
let mut skipped = false; let mut skipped = false;
SCLogDebug!("request: handling {}", current.len()); SCLogDebug!("request: handling {}", current.len());
@ -441,7 +444,11 @@ impl MQTTState {
Ok((rem, msg)) => { Ok((rem, msg)) => {
SCLogDebug!("request msg {:?}", msg); SCLogDebug!("request msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); SCLogDebug!(
"found truncated with skipped {} current len {}",
trunc.skipped_length,
current.len()
);
if trunc.skipped_length >= current.len() { if trunc.skipped_length >= current.len() {
skipped = true; skipped = true;
self.skip_request = trunc.skipped_length - current.len(); self.skip_request = trunc.skipped_length - current.len();
@ -458,8 +465,13 @@ impl MQTTState {
current = rem; current = rem;
} }
Err(Err::Incomplete(_)) => { Err(Err::Incomplete(_)) => {
SCLogDebug!("incomplete request: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); SCLogDebug!(
return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); "incomplete request: consumed {} needed {} (input len {})",
consumed,
(current.len() + 1),
input.len()
);
return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
} }
Err(_) => { Err(_) => {
self.set_event_notx(MQTTEvent::MalformedTraffic, false); self.set_event_notx(MQTTEvent::MalformedTraffic, false);
@ -478,14 +490,22 @@ impl MQTTState {
} }
let mut consumed = 0; let mut consumed = 0;
SCLogDebug!("skip_response {} input len {}", self.skip_response, current.len()); SCLogDebug!(
"skip_response {} input len {}",
self.skip_response,
current.len()
);
if self.skip_response > 0 { if self.skip_response > 0 {
if input.len() <= self.skip_response { if input.len() <= self.skip_response {
self.skip_response -= current.len(); self.skip_response -= current.len();
return AppLayerResult::ok(); return AppLayerResult::ok();
} else { } else {
current = &input[self.skip_response..]; current = &input[self.skip_response..];
SCLogDebug!("skip end reached, skipping {} :{:?}", self.skip_request, current); SCLogDebug!(
"skip end reached, skipping {} :{:?}",
self.skip_request,
current
);
consumed = self.skip_response; consumed = self.skip_response;
self.skip_response = 0; self.skip_response = 0;
} }
@ -498,7 +518,11 @@ impl MQTTState {
Ok((rem, msg)) => { Ok((rem, msg)) => {
SCLogDebug!("response msg {:?}", msg); SCLogDebug!("response msg {:?}", msg);
if let MQTTOperation::TRUNCATED(ref trunc) = msg.op { if let MQTTOperation::TRUNCATED(ref trunc) = msg.op {
SCLogDebug!("found truncated with skipped {} current len {}", trunc.skipped_length, current.len()); SCLogDebug!(
"found truncated with skipped {} current len {}",
trunc.skipped_length,
current.len()
);
if trunc.skipped_length >= current.len() { if trunc.skipped_length >= current.len() {
skipped = true; skipped = true;
self.skip_response = trunc.skipped_length - current.len(); self.skip_response = trunc.skipped_length - current.len();
@ -516,7 +540,12 @@ impl MQTTState {
current = rem; current = rem;
} }
Err(Err::Incomplete(_)) => { Err(Err::Incomplete(_)) => {
SCLogDebug!("incomplete response: consumed {} needed {} (input len {})", consumed, (current.len() + 1), input.len()); SCLogDebug!(
"incomplete response: consumed {} needed {} (input len {})",
consumed,
(current.len() + 1),
input.len()
);
return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32); return AppLayerResult::incomplete(consumed as u32, (current.len() + 1) as u32);
} }
Err(_) => { Err(_) => {
@ -552,11 +581,7 @@ impl MQTTState {
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_probing_parser( pub unsafe extern "C" fn rs_mqtt_probing_parser(
_flow: *const Flow, _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
_direction: u8,
input: *const u8,
input_len: u32,
_rdir: *mut u8,
) -> AppProto { ) -> AppProto {
let buf = build_slice!(input, input_len as usize); let buf = build_slice!(input, input_len as usize);
match parse_fixed_header(buf) { match parse_fixed_header(buf) {
@ -570,14 +595,16 @@ pub unsafe extern "C" fn rs_mqtt_probing_parser(
return ALPROTO_FAILED; return ALPROTO_FAILED;
} }
return ALPROTO_MQTT; return ALPROTO_MQTT;
}, }
Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN, Err(Err::Incomplete(_)) => ALPROTO_UNKNOWN,
Err(_) => ALPROTO_FAILED Err(_) => ALPROTO_FAILED,
} }
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn rs_mqtt_state_new(_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto) -> *mut std::os::raw::c_void { pub extern "C" fn rs_mqtt_state_new(
_orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto,
) -> *mut std::os::raw::c_void {
let state = MQTTState::new(); let state = MQTTState::new();
let boxed = Box::new(state); let boxed = Box::new(state);
return Box::into_raw(boxed) as *mut _; return Box::into_raw(boxed) as *mut _;
@ -596,11 +623,8 @@ pub unsafe extern "C" fn rs_mqtt_state_tx_free(state: *mut std::os::raw::c_void,
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_request( pub unsafe extern "C" fn rs_mqtt_parse_request(
_flow: *const Flow, _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
state: *mut std::os::raw::c_void, stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
_pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice,
_data: *const std::os::raw::c_void,
) -> AppLayerResult { ) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState); let state = cast_pointer!(state, MQTTState);
return state.parse_request(stream_slice.as_slice()); return state.parse_request(stream_slice.as_slice());
@ -608,11 +632,8 @@ pub unsafe extern "C" fn rs_mqtt_parse_request(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_parse_response( pub unsafe extern "C" fn rs_mqtt_parse_response(
_flow: *const Flow, _flow: *const Flow, state: *mut std::os::raw::c_void, _pstate: *mut std::os::raw::c_void,
state: *mut std::os::raw::c_void, stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
_pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice,
_data: *const std::os::raw::c_void,
) -> AppLayerResult { ) -> AppLayerResult {
let state = cast_pointer!(state, MQTTState); let state = cast_pointer!(state, MQTTState);
return state.parse_response(stream_slice.as_slice()); return state.parse_response(stream_slice.as_slice());
@ -620,8 +641,7 @@ pub unsafe extern "C" fn rs_mqtt_parse_response(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_state_get_tx( pub unsafe extern "C" fn rs_mqtt_state_get_tx(
state: *mut std::os::raw::c_void, state: *mut std::os::raw::c_void, tx_id: u64,
tx_id: u64,
) -> *mut std::os::raw::c_void { ) -> *mut std::os::raw::c_void {
let state = cast_pointer!(state, MQTTState); let state = cast_pointer!(state, MQTTState);
match state.get_tx(tx_id) { match state.get_tx(tx_id) {
@ -641,7 +661,9 @@ pub unsafe extern "C" fn rs_mqtt_state_get_tx_count(state: *mut std::os::raw::c_
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void) -> std::os::raw::c_int { pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(
tx: *const std::os::raw::c_void,
) -> std::os::raw::c_int {
let tx = cast_pointer!(tx, MQTTTransaction); let tx = cast_pointer!(tx, MQTTTransaction);
if tx.toclient { if tx.toclient {
return 1; return 1;
@ -651,8 +673,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_is_toclient(tx: *const std::os::raw::c_void)
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress( pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
tx: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, direction: u8,
direction: u8,
) -> std::os::raw::c_int { ) -> std::os::raw::c_int {
let tx = cast_pointer!(tx, MQTTTransaction); let tx = cast_pointer!(tx, MQTTTransaction);
match direction.into() { match direction.into() {
@ -672,8 +693,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_alstate_progress(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_get_logged( pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
_state: *mut std::os::raw::c_void, _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void,
tx: *mut std::os::raw::c_void,
) -> u32 { ) -> u32 {
let tx = cast_pointer!(tx, MQTTTransaction); let tx = cast_pointer!(tx, MQTTTransaction);
return tx.logged.get(); return tx.logged.get();
@ -681,9 +701,7 @@ pub unsafe extern "C" fn rs_mqtt_tx_get_logged(
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn rs_mqtt_tx_set_logged( pub unsafe extern "C" fn rs_mqtt_tx_set_logged(
_state: *mut std::os::raw::c_void, _state: *mut std::os::raw::c_void, tx: *mut std::os::raw::c_void, logged: u32,
tx: *mut std::os::raw::c_void,
logged: u32,
) { ) {
let tx = cast_pointer!(tx, MQTTTransaction); let tx = cast_pointer!(tx, MQTTTransaction);
tx.logged.set(logged); tx.logged.set(logged);

@ -1,4 +1,3 @@
/* Copyright (C) 2020 Open Information Security Foundation /* Copyright (C) 2020 Open Information Security Foundation
* *
* You can copy, redistribute or modify this Program under the terms of * You can copy, redistribute or modify this Program under the terms of
@ -111,7 +110,6 @@ impl std::str::FromStr for MQTTTypeCode {
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct MQTTConnectData { pub struct MQTTConnectData {
pub protocol_string: String, pub protocol_string: String,
@ -204,4 +202,4 @@ pub struct MQTTDisconnectData {
pub struct MQTTTruncatedData { pub struct MQTTTruncatedData {
pub original_message_type: MQTTTypeCode, pub original_message_type: MQTTTypeCode,
pub skipped_length: usize, pub skipped_length: usize,
} }

@ -21,8 +21,8 @@ use crate::common::nom7::bits;
use crate::mqtt::mqtt_message::*; use crate::mqtt::mqtt_message::*;
use crate::mqtt::mqtt_property::*; use crate::mqtt::mqtt_property::*;
use nom7::bits::streaming::take as take_bits; use nom7::bits::streaming::take as take_bits;
use nom7::bytes::streaming::take_while_m_n;
use nom7::bytes::complete::take; use nom7::bytes::complete::take;
use nom7::bytes::streaming::take_while_m_n;
use nom7::combinator::{complete, cond, verify}; use nom7::combinator::{complete, cond, verify};
use nom7::multi::{length_data, many0, many1}; use nom7::multi::{length_data, many0, many1};
use nom7::number::streaming::*; use nom7::number::streaming::*;

Loading…
Cancel
Save