From 2086f99d6b152c318e5377528535f1755b3bca33 Mon Sep 17 00:00:00 2001 From: Juliana Fajardini Date: Tue, 29 Apr 2025 10:33:38 -0300 Subject: [PATCH] pgsql: add initial support to CopyIn mode/subproto This sub-protocol inspects messages sent mainly from the frontend to the backend after a 'COPY FROM STDIN' has been processed by the backend. Parses new messages: - CopyInResponse -- initiates copy-in mode/sub-protocol - CopyData (In) -- data transfer message, from frontend to backend - CopyDone -- signals that no more CopyData messages will be seen from the frontend, for the current transaction - CopyFail -- used by the frontend to signal some failure to proceed with sending CopyData messages Task #7645 --- doc/userguide/output/eve/eve-json-format.rst | 6 ++ etc/schema.json | 24 +++++++ rust/src/pgsql/logger.rs | 22 ++++++- rust/src/pgsql/parser.rs | 69 +++++++++++++++++++- rust/src/pgsql/pgsql.rs | 63 ++++++++++++++++-- 5 files changed, 175 insertions(+), 9 deletions(-) diff --git a/doc/userguide/output/eve/eve-json-format.rst b/doc/userguide/output/eve/eve-json-format.rst index ff2f3643d9..0ad3a38468 100644 --- a/doc/userguide/output/eve/eve-json-format.rst +++ b/doc/userguide/output/eve/eve-json-format.rst @@ -2545,6 +2545,10 @@ flow. Some of the possible request messages are: transaction where the query was sent. * "message": requests which do not have meaningful payloads are logged like this, where the field value is the message type +* "copy_data_in": object. Part of the CopyIn subprotocol, consolidated data + resulting from a ``Copy From Stdin`` query +* "copy_done": string. Similar to ``command_completed`` but sent after the + frontend finishes sending a batch of ``CopyData`` messages There are several different authentication messages possible, based on selected authentication method. (e.g. the SASL authentication will have a set of @@ -2571,6 +2575,8 @@ pgsql flow. Some of the possible request messages are: * "data_size": in bytes. When one or many ``DataRow`` messages are parsed, the total size in bytes of the data returned * "command_completed": string. Informs the command just completed by the backend +* "copy_in_response": object. Indicates the beginning of a CopyIn mode, shows + how many columns will be copied from STDIN (``copy_column_cnt`` field) * "copy_out_response": object. Indicates the beginning of a CopyTo mode, shows how many columns will be copied to STDOUT (``copy_column_cnt`` field) * "copy_data_out": object. Consolidated data on the CopyData sent by the backend diff --git a/etc/schema.json b/etc/schema.json index 41a7911e9d..78c2f685dd 100644 --- a/etc/schema.json +++ b/etc/schema.json @@ -3693,6 +3693,20 @@ "type": "object", "additionalProperties": false, "properties": { + "copy_data_in": { + "type": "object", + "description": "CopyData message from CopyIn mode", + "properties": { + "data_size": { + "type": "integer", + "description": "Accumulated data size of all CopyData messages sent" + }, + "msg_count": { + "type": "integer", + "description": "How many CopyData messages were sent (does not necessarily match number of rows from the query)" + } + } + }, "message": { "type": "string" }, @@ -3795,6 +3809,16 @@ } } }, + "copy_in_response": { + "type": "object", + "description": "Backend/server response accepting CopyIn mode", + "properties": { + "copy_column_count": { + "type": "integer", + "description": "Number of columns that will be copied in the CopyData message" + } + } + }, "copy_out_response": { "type": "object", "description": "Backend/server response accepting CopyOut mode", diff --git a/rust/src/pgsql/logger.rs b/rust/src/pgsql/logger.rs index a714603a85..b98874d5b1 100644 --- a/rust/src/pgsql/logger.rs +++ b/rust/src/pgsql/logger.rs @@ -102,6 +102,11 @@ fn log_request(req: &PgsqlFEMessage, flags: u32, js: &mut JsonBuilder) -> Result identifier: _, length: _, payload, + }) + | PgsqlFEMessage::CopyFail(RegularPacket { + identifier: _, + length: _, + payload, }) => { js.set_string_from_bytes(req.to_str(), payload)?; } @@ -110,10 +115,18 @@ fn log_request(req: &PgsqlFEMessage, flags: u32, js: &mut JsonBuilder) -> Result js.set_uint("process_id", *pid)?; js.set_uint("secret_key", *backend_key)?; } - PgsqlFEMessage::Terminate(NoPayloadMessage { + PgsqlFEMessage::ConsolidatedCopyDataIn(ConsolidatedDataRowPacket { identifier: _, - length: _, + row_cnt, + data_size, }) => { + js.open_object(req.to_str())?; + js.set_uint("msg_count", *row_cnt)?; + js.set_uint("data_size", *data_size)?; + js.close()?; + } + PgsqlFEMessage::CopyDone(_) + | PgsqlFEMessage::Terminate(_) => { js.set_string("message", req.to_str())?; } PgsqlFEMessage::UnknownMessageType(RegularPacket { @@ -220,6 +233,11 @@ fn log_response(res: &PgsqlBEMessage, jb: &mut JsonBuilder) -> Result<(), JsonEr identifier: _, length: _, column_cnt, + }) + | PgsqlBEMessage::CopyInResponse(CopyResponse { + identifier: _, + length: _, + column_cnt, }) => { jb.open_object(res.to_str())?; jb.set_uint("copy_column_count", *column_cnt)?; diff --git a/rust/src/pgsql/parser.rs b/rust/src/pgsql/parser.rs index c5745ee5b4..6201f6ad7f 100644 --- a/rust/src/pgsql/parser.rs +++ b/rust/src/pgsql/parser.rs @@ -299,6 +299,7 @@ pub enum PgsqlBEMessage { BackendKeyData(BackendKeyDataMessage), CommandComplete(RegularPacket), CopyOutResponse(CopyResponse), + CopyInResponse(CopyResponse), ConsolidatedCopyDataOut(ConsolidatedDataRowPacket), CopyDone(NoPayloadMessage), ReadyForQuery(ReadyForQueryMessage), @@ -328,6 +329,7 @@ impl PgsqlBEMessage { PgsqlBEMessage::BackendKeyData(_) => "backend_key_data", PgsqlBEMessage::CommandComplete(_) => "command_completed", PgsqlBEMessage::CopyOutResponse(_) => "copy_out_response", + PgsqlBEMessage::CopyInResponse(_) => "copy_in_response", PgsqlBEMessage::ConsolidatedCopyDataOut(_) => "copy_data_out", PgsqlBEMessage::CopyDone(_) => "copy_done", PgsqlBEMessage::ReadyForQuery(_) => "ready_for_query", @@ -383,6 +385,9 @@ pub enum PgsqlFEMessage { SASLInitialResponse(SASLInitialResponsePacket), SASLResponse(RegularPacket), SimpleQuery(RegularPacket), + ConsolidatedCopyDataIn(ConsolidatedDataRowPacket), + CopyDone(NoPayloadMessage), + CopyFail(RegularPacket), CancelRequest(CancelRequestMessage), Terminate(NoPayloadMessage), UnknownMessageType(RegularPacket), @@ -397,6 +402,9 @@ impl PgsqlFEMessage { PgsqlFEMessage::SASLInitialResponse(_) => "sasl_initial_response", PgsqlFEMessage::SASLResponse(_) => "sasl_response", PgsqlFEMessage::SimpleQuery(_) => "simple_query", + PgsqlFEMessage::ConsolidatedCopyDataIn(_) => "copy_data_in", + PgsqlFEMessage::CopyDone(_) => "copy_done", + PgsqlFEMessage::CopyFail(_) => "copy_fail", PgsqlFEMessage::CancelRequest(_) => "cancel_request", PgsqlFEMessage::Terminate(_) => "termination_message", PgsqlFEMessage::UnknownMessageType(_) => "unknown_message_type", @@ -787,6 +795,9 @@ pub fn parse_request(i: &[u8]) -> IResult<&[u8], PgsqlFEMessage, PgsqlParseError b'\0' => pgsql_parse_startup_packet(i)?, b'Q' => parse_simple_query(i)?, b'X' => parse_terminate_message(i)?, + b'd' => parse_consolidated_copy_data_in(i)?, + b'c' => parse_copy_in_done(i)?, + b'f' => parse_copy_fail(i)?, _ => { let (i, identifier) = be_u8(i)?; let (i, length) = parse_gte_length(i, PGSQL_LENGTH_FIELD)?; @@ -1049,6 +1060,22 @@ pub fn parse_copy_out_response(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, Pgsql )) } +pub fn parse_copy_in_response(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> { + let (i, identifier) = verify(be_u8, |&x| x == b'G')(i)?; + let (i, length) = parse_gte_length(i, 8)?; + let (i, _format) = be_u8(i)?; + let (i, columns) = be_u16(i)?; + let (i, _formats) = many_m_n(0, columns.to_usize(), be_u16)(i)?; + Ok(( + i, + PgsqlBEMessage::CopyInResponse(CopyResponse { + identifier, + length, + column_cnt: columns, + }) + )) +} + pub fn parse_consolidated_copy_data_out(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> { let (i, identifier) = verify(be_u8, |&x| x == b'd')(i)?; let (i, length) = parse_gte_length(i, 5)?; @@ -1062,7 +1089,31 @@ pub fn parse_consolidated_copy_data_out(i: &[u8]) -> IResult<&[u8], PgsqlBEMessa )) } -fn parse_copy_done(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> { +pub fn parse_consolidated_copy_data_in(i: &[u8]) -> IResult<&[u8], PgsqlFEMessage, PgsqlParseError<&[u8]>> { + let (i, identifier) = verify(be_u8, |&x| x == b'd')(i)?; + let (i, length) = parse_gte_length(i, 5)?; + let (i, _data) = take(length - PGSQL_LENGTH_FIELD)(i)?; + SCLogDebug!("data size is {:?}", _data); + Ok(( + i, PgsqlFEMessage::ConsolidatedCopyDataIn(ConsolidatedDataRowPacket { + identifier, + row_cnt: 1, + data_size: (length - PGSQL_LENGTH_FIELD) as u64 }) + )) +} + +fn parse_copy_in_done(i: &[u8]) -> IResult<&[u8], PgsqlFEMessage, PgsqlParseError<&[u8]>> { + let (i, identifier) = verify(be_u8, |&x| x == b'c')(i)?; + let (i, length) = parse_exact_length(i, PGSQL_LENGTH_FIELD)?; + Ok(( + i, PgsqlFEMessage::CopyDone(NoPayloadMessage { + identifier, + length + }) + )) +} + +fn parse_copy_out_done(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<&[u8]>> { let (i, identifier) = verify(be_u8, |&x| x == b'c')(i)?; let (i, length) = parse_exact_length(i, PGSQL_LENGTH_FIELD)?; Ok(( @@ -1073,6 +1124,19 @@ fn parse_copy_done(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlParseError<& )) } +fn parse_copy_fail(i: &[u8]) -> IResult<&[u8], PgsqlFEMessage, PgsqlParseError<&[u8]>> { + let (i, identifier) = verify(be_u8, |&x| x == b'f')(i)?; + let (i, length) = parse_gte_length(i, 5)?; + let (i, data) = take(length - PGSQL_LENGTH_FIELD)(i)?; + Ok(( + i, PgsqlFEMessage::CopyFail(RegularPacket { + identifier, + length, + payload: data.to_vec(), + }) + )) +} + // Currently, we don't store the actual DataRow messages, as those could easily become a burden, memory-wise // We use ConsolidatedDataRow to store info we still want to log: message size. // Later on, we calculate the number of lines the command actually returned by counting ConsolidatedDataRow messages @@ -1267,13 +1331,14 @@ pub fn pgsql_parse_response(i: &[u8]) -> IResult<&[u8], PgsqlBEMessage, PgsqlPar b'R' => pgsql_parse_authentication_message(i)?, b'S' => parse_parameter_status_message(i)?, b'C' => parse_command_complete(i)?, - b'c' => parse_copy_done(i)?, + b'c' => parse_copy_out_done(i)?, b'Z' => parse_ready_for_query(i)?, b'T' => parse_row_description(i)?, b'A' => parse_notification_response(i)?, b'D' => parse_consolidated_data_row(i)?, b'd' => parse_consolidated_copy_data_out(i)?, b'H' => parse_copy_out_response(i)?, + b'G' => parse_copy_in_response(i)?, _ => { let (i, identifier) = be_u8(i)?; let (i, length) = parse_gte_length(i, PGSQL_LENGTH_FIELD)?; diff --git a/rust/src/pgsql/pgsql.rs b/rust/src/pgsql/pgsql.rs index d569069350..52300dc80a 100644 --- a/rust/src/pgsql/pgsql.rs +++ b/rust/src/pgsql/pgsql.rs @@ -123,7 +123,11 @@ pub enum PgsqlStateProgress { // Related to Backend-received messages // CopyOutResponseReceived, CopyDataOutReceived, + CopyInResponseReceived, + FirstCopyDataInReceived, + ConsolidatingCopyDataIn, CopyDoneReceived, + CopyFailReceived, SSLRejectedReceived, // SSPIAuthenticationReceived, // TODO implement SASLAuthenticationReceived, @@ -257,6 +261,7 @@ impl PgsqlState { || self.state_progress == PgsqlStateProgress::SSLRequestReceived || self.state_progress == PgsqlStateProgress::ConnectionTerminated || self.state_progress == PgsqlStateProgress::CancelRequestReceived + || self.state_progress == PgsqlStateProgress::FirstCopyDataInReceived { let tx = self.new_tx(); self.transactions.push_back(tx); @@ -266,13 +271,17 @@ impl PgsqlState { return self.transactions.back_mut(); } + fn get_curr_state(&mut self) -> PgsqlStateProgress { + self.state_progress + } + /// Define PgsqlState progression, based on the request received /// /// As PostgreSQL transactions can have multiple messages, State progression /// is what helps us keep track of the PgsqlTransactions - when one finished /// when the other starts. /// State isn't directly updated to avoid reference borrowing conflicts. - fn request_next_state(request: &PgsqlFEMessage) -> Option { + fn request_next_state(&mut self, request: &PgsqlFEMessage) -> Option { match request { PgsqlFEMessage::SSLRequest(_) => Some(PgsqlStateProgress::SSLRequestReceived), PgsqlFEMessage::StartupMessage(_) => Some(PgsqlStateProgress::StartupMessageReceived), @@ -288,6 +297,25 @@ impl PgsqlState { // Important to keep in mind that: "In simple Query mode, the format of retrieved values is always text, except when the given command is a FETCH from a cursor declared with the BINARY option. In that case, the retrieved values are in binary format. The format codes given in the RowDescription message tell which format is being used." (from pgsql official documentation) } + PgsqlFEMessage::ConsolidatedCopyDataIn(_) => { + match self.get_curr_state() { + PgsqlStateProgress::CopyInResponseReceived => { + return Some(PgsqlStateProgress::FirstCopyDataInReceived); + } + PgsqlStateProgress::FirstCopyDataInReceived + | PgsqlStateProgress::ConsolidatingCopyDataIn => { + // We are in CopyInResponseReceived state, and we received a CopyDataIn message + // We can either be in the first CopyDataIn message or in the middle + // of consolidating CopyDataIn messages + return Some(PgsqlStateProgress::ConsolidatingCopyDataIn); + } + _ => { + return None; + } + } + } + PgsqlFEMessage::CopyDone(_) => Some(PgsqlStateProgress::CopyDoneReceived), + PgsqlFEMessage::CopyFail(_) => Some(PgsqlStateProgress::CopyFailReceived), PgsqlFEMessage::CancelRequest(_) => Some(PgsqlStateProgress::CancelRequestReceived), PgsqlFEMessage::Terminate(_) => { SCLogDebug!("Match: Terminate message"); @@ -330,6 +358,8 @@ impl PgsqlState { | PgsqlStateProgress::SASLInitialResponseReceived | PgsqlStateProgress::SASLResponseReceived | PgsqlStateProgress::CancelRequestReceived + | PgsqlStateProgress::CopyDoneReceived + | PgsqlStateProgress::CopyFailReceived | PgsqlStateProgress::ConnectionTerminated => true, _ => false, } @@ -364,7 +394,7 @@ impl PgsqlState { match PgsqlState::state_based_req_parsing(self.state_progress, start) { Ok((rem, request)) => { start = rem; - let new_state = PgsqlState::request_next_state(&request); + let new_state = self.request_next_state(&request); if let Some(state) = new_state { self.state_progress = state; @@ -380,10 +410,31 @@ impl PgsqlState { // https://samadhiweb.com/blog/2013.04.28.graphviz.postgresv3.html if let Some(tx) = self.find_or_create_tx() { tx.tx_data.updated_ts = true; - tx.requests.push(request); if let Some(state) = new_state { - if Self::request_is_complete(state) { - // The request is always complete at this point + if state == PgsqlStateProgress::FirstCopyDataInReceived + || state == PgsqlStateProgress::ConsolidatingCopyDataIn { + // here we're actually only counting how many messages were received. + // frontends are not forced to send one row per message + if let PgsqlFEMessage::ConsolidatedCopyDataIn(msg) = request { + tx.sum_data_size(msg.data_size); + tx.incr_row_cnt(); + } + } else if (state == PgsqlStateProgress::CopyDoneReceived || state == PgsqlStateProgress::CopyFailReceived) && tx.get_row_cnt() > 0 { + let consolidated_copy_data = PgsqlFEMessage::ConsolidatedCopyDataIn( + ConsolidatedDataRowPacket { + identifier: b'd', + row_cnt: tx.get_row_cnt(), + data_size: tx.data_size, // total byte count of all copy_data messages combined + }, + ); + tx.requests.push(consolidated_copy_data); + tx.requests.push(request); + // reset values + tx.data_row_cnt = 0; + tx.data_size = 0; + } else if Self::request_is_complete(state) { + tx.requests.push(request); + // The request is complete at this point tx.tx_req_state = PgsqlTxProgress::TxDone; if state == PgsqlStateProgress::ConnectionTerminated || state == PgsqlStateProgress::CancelRequestReceived @@ -491,6 +542,7 @@ impl PgsqlState { } PgsqlBEMessage::RowDescription(_) => Some(PgsqlStateProgress::RowDescriptionReceived), PgsqlBEMessage::CopyOutResponse(_) => Some(PgsqlStateProgress::CopyOutResponseReceived), + PgsqlBEMessage::CopyInResponse(_) => Some(PgsqlStateProgress::CopyInResponseReceived), PgsqlBEMessage::ConsolidatedDataRow(msg) => { // Increment tx.data_size here, since we know msg type, so that we can later on log that info self.transactions.back_mut()?.sum_data_size(msg.data_size); @@ -541,6 +593,7 @@ impl PgsqlState { | PgsqlStateProgress::SASLAuthenticationReceived | PgsqlStateProgress::SASLAuthenticationContinueReceived | PgsqlStateProgress::SASLAuthenticationFinalReceived + | PgsqlStateProgress::CopyInResponseReceived | PgsqlStateProgress::Finished => true, _ => false, }