Skip to content

Commit f755979

Browse files
authored
pageserver: payload compression for gRPC base backups (#12346)
## Problem gRPC base backups use gRPC compression. However, this has two problems: * Base backup caching will cache compressed base backups (making gRPC compression pointless). * Tonic does not support varying the compression level, and zstd default level is 10% slower than gzip fastest level. Touches #11728. Touches neondatabase/cloud#29353. ## Summary of changes This patch adds a gRPC parameter `BaseBackupRequest::compression` specifying the compression algorithm. It also moves compression into `send_basebackup_tarball` to reduce code duplication. A follow-up PR will integrate the base backup cache with gRPC.
1 parent 1d49eef commit f755979

File tree

7 files changed

+149
-69
lines changed

7 files changed

+149
-69
lines changed

pageserver/page_api/proto/page_service.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ message GetBaseBackupRequest {
110110
bool replica = 2;
111111
// If true, include relation files in the base backup. Mainly for debugging and tests.
112112
bool full = 3;
113+
// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
114+
// compression, so that we can cache compressed backups on the server.
115+
BaseBackupCompression compression = 4;
116+
}
117+
118+
// Base backup compression algorithms.
119+
enum BaseBackupCompression {
120+
// Unknown algorithm. Used when clients send an unsupported algorithm.
121+
BASE_BACKUP_COMPRESSION_UNKNOWN = 0;
122+
// No compression.
123+
BASE_BACKUP_COMPRESSION_NONE = 1;
124+
// GZIP compression.
125+
BASE_BACKUP_COMPRESSION_GZIP = 2;
113126
}
114127

115128
// Base backup response chunk, returned as an ordered stream.

pageserver/page_api/src/client.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ impl Client {
9595

9696
if let Some(compression) = compression {
9797
// TODO: benchmark this (including network latency).
98-
// TODO: consider enabling compression by default.
9998
client = client
10099
.accept_compressed(compression)
101100
.send_compressed(compression);

pageserver/page_api/src/model.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,21 @@ pub struct GetBaseBackupRequest {
191191
pub replica: bool,
192192
/// If true, include relation files in the base backup. Mainly for debugging and tests.
193193
pub full: bool,
194+
/// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
195+
/// compression, so that we can cache compressed backups on the server.
196+
pub compression: BaseBackupCompression,
194197
}
195198

196-
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
197-
fn from(pb: proto::GetBaseBackupRequest) -> Self {
198-
Self {
199+
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
200+
type Error = ProtocolError;
201+
202+
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
203+
Ok(Self {
199204
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
200205
replica: pb.replica,
201206
full: pb.full,
202-
}
207+
compression: pb.compression.try_into()?,
208+
})
203209
}
204210
}
205211

@@ -209,10 +215,55 @@ impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
209215
lsn: request.lsn.unwrap_or_default().0,
210216
replica: request.replica,
211217
full: request.full,
218+
compression: request.compression.into(),
219+
}
220+
}
221+
}
222+
223+
/// Base backup compression algorithm.
224+
#[derive(Clone, Copy, Debug)]
225+
pub enum BaseBackupCompression {
226+
None,
227+
Gzip,
228+
}
229+
230+
impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
231+
type Error = ProtocolError;
232+
233+
fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
234+
match pb {
235+
proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
236+
proto::BaseBackupCompression::None => Ok(Self::None),
237+
proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
238+
}
239+
}
240+
}
241+
242+
impl TryFrom<i32> for BaseBackupCompression {
243+
type Error = ProtocolError;
244+
245+
fn try_from(compression: i32) -> Result<Self, Self::Error> {
246+
proto::BaseBackupCompression::try_from(compression)
247+
.map_err(|_| ProtocolError::invalid("compression", compression))
248+
.and_then(Self::try_from)
249+
}
250+
}
251+
252+
impl From<BaseBackupCompression> for proto::BaseBackupCompression {
253+
fn from(compression: BaseBackupCompression) -> Self {
254+
match compression {
255+
BaseBackupCompression::None => Self::None,
256+
BaseBackupCompression::Gzip => Self::Gzip,
212257
}
213258
}
214259
}
215260

261+
impl From<BaseBackupCompression> for i32 {
262+
fn from(compression: BaseBackupCompression) -> Self {
263+
proto::BaseBackupCompression::from(compression).into()
264+
}
265+
}
266+
216267
pub type GetBaseBackupResponseChunk = Bytes;
217268

218269
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {

pageserver/pagebench/src/cmd/basebackup.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ impl Client for LibpqClient {
317317
/// A gRPC Pageserver client.
318318
struct GrpcClient {
319319
inner: page_api::Client,
320+
compression: page_api::BaseBackupCompression,
320321
}
321322

322323
impl GrpcClient {
@@ -331,10 +332,14 @@ impl GrpcClient {
331332
ttid.timeline_id,
332333
ShardIndex::unsharded(),
333334
None,
334-
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
335+
None, // NB: uses payload compression
335336
)
336337
.await?;
337-
Ok(Self { inner })
338+
let compression = match compression {
339+
true => page_api::BaseBackupCompression::Gzip,
340+
false => page_api::BaseBackupCompression::None,
341+
};
342+
Ok(Self { inner, compression })
338343
}
339344
}
340345

@@ -348,6 +353,7 @@ impl Client for GrpcClient {
348353
lsn,
349354
replica: false,
350355
full: false,
356+
compression: self.compression,
351357
};
352358
let stream = self.inner.get_base_backup(req).await?;
353359
Ok(Box::pin(StreamReader::new(

pageserver/src/basebackup.rs

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::fmt::Write as FmtWrite;
1414
use std::time::{Instant, SystemTime};
1515

1616
use anyhow::{Context, anyhow};
17+
use async_compression::tokio::write::GzipEncoder;
1718
use bytes::{BufMut, Bytes, BytesMut};
1819
use fail::fail_point;
1920
use pageserver_api::key::{Key, rel_block_to_key};
@@ -25,8 +26,7 @@ use postgres_ffi::{
2526
};
2627
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
2728
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
28-
use tokio::io;
29-
use tokio::io::AsyncWrite;
29+
use tokio::io::{self, AsyncWrite, AsyncWriteExt as _};
3030
use tokio_tar::{Builder, EntryType, Header};
3131
use tracing::*;
3232
use utils::lsn::Lsn;
@@ -97,13 +97,15 @@ impl From<BasebackupError> for tonic::Status {
9797
/// * When working without safekeepers. In this situation it is important to match the lsn
9898
/// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
9999
/// to start the replication.
100+
#[allow(clippy::too_many_arguments)]
100101
pub async fn send_basebackup_tarball<'a, W>(
101102
write: &'a mut W,
102103
timeline: &'a Timeline,
103104
req_lsn: Option<Lsn>,
104105
prev_lsn: Option<Lsn>,
105106
full_backup: bool,
106107
replica: bool,
108+
gzip_level: Option<async_compression::Level>,
107109
ctx: &'a RequestContext,
108110
) -> Result<(), BasebackupError>
109111
where
@@ -122,7 +124,7 @@ where
122124
// prev_lsn value; that happens if the timeline was just branched from
123125
// an old LSN and it doesn't have any WAL of its own yet. We will set
124126
// prev_lsn to Lsn(0) if we cannot provide the correct value.
125-
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
127+
let (backup_prev, lsn) = if let Some(req_lsn) = req_lsn {
126128
// Backup was requested at a particular LSN. The caller should've
127129
// already checked that it's a valid LSN.
128130

@@ -143,7 +145,7 @@ where
143145
};
144146

145147
// Consolidate the derived and the provided prev_lsn values
146-
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
148+
let prev_record_lsn = if let Some(provided_prev_lsn) = prev_lsn {
147149
if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
148150
return Err(BasebackupError::Server(anyhow!(
149151
"backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
@@ -155,30 +157,55 @@ where
155157
};
156158

157159
info!(
158-
"taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
159-
backup_lsn, prev_lsn, full_backup, replica
160+
"taking basebackup lsn={lsn}, prev_lsn={prev_record_lsn} \
161+
(full_backup={full_backup}, replica={replica}, gzip={gzip_level:?})",
162+
);
163+
let span = info_span!("send_tarball", backup_lsn=%lsn);
164+
165+
let io_concurrency = IoConcurrency::spawn_from_conf(
166+
timeline.conf.get_vectored_concurrent_io,
167+
timeline
168+
.gate
169+
.enter()
170+
.map_err(|_| BasebackupError::Shutdown)?,
160171
);
161172

162-
let basebackup = Basebackup {
163-
ar: Builder::new_non_terminated(write),
164-
timeline,
165-
lsn: backup_lsn,
166-
prev_record_lsn: prev_lsn,
167-
full_backup,
168-
replica,
169-
ctx,
170-
io_concurrency: IoConcurrency::spawn_from_conf(
171-
timeline.conf.get_vectored_concurrent_io,
172-
timeline
173-
.gate
174-
.enter()
175-
.map_err(|_| BasebackupError::Shutdown)?,
176-
),
177-
};
178-
basebackup
173+
if let Some(gzip_level) = gzip_level {
174+
let mut encoder = GzipEncoder::with_quality(write, gzip_level);
175+
Basebackup {
176+
ar: Builder::new_non_terminated(&mut encoder),
177+
timeline,
178+
lsn,
179+
prev_record_lsn,
180+
full_backup,
181+
replica,
182+
ctx,
183+
io_concurrency,
184+
}
179185
.send_tarball()
180-
.instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
181-
.await
186+
.instrument(span)
187+
.await?;
188+
encoder
189+
.shutdown()
190+
.await
191+
.map_err(|err| BasebackupError::Client(err, "gzip"))?;
192+
} else {
193+
Basebackup {
194+
ar: Builder::new_non_terminated(write),
195+
timeline,
196+
lsn,
197+
prev_record_lsn,
198+
full_backup,
199+
replica,
200+
ctx,
201+
io_concurrency,
202+
}
203+
.send_tarball()
204+
.instrument(span)
205+
.await?;
206+
}
207+
208+
Ok(())
182209
}
183210

184211
/// This is short-living object only for the time of tarball creation,

pageserver/src/basebackup_cache.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{collections::HashMap, sync::Arc};
22

33
use anyhow::Context;
4-
use async_compression::tokio::write::GzipEncoder;
54
use camino::{Utf8Path, Utf8PathBuf};
65
use metrics::core::{AtomicU64, GenericCounter};
76
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
@@ -594,13 +593,6 @@ impl BackgroundTask {
594593
let file = tokio::fs::File::create(entry_tmp_path).await?;
595594
let mut writer = BufWriter::new(file);
596595

597-
let mut encoder = GzipEncoder::with_quality(
598-
&mut writer,
599-
// Level::Best because compression is not on the hot path of basebackup requests.
600-
// The decompression is almost not affected by the compression level.
601-
async_compression::Level::Best,
602-
);
603-
604596
// We may receive a request before the WAL record is applied to the timeline.
605597
// Wait for the requested LSN to be applied.
606598
timeline
@@ -613,17 +605,19 @@ impl BackgroundTask {
613605
.await?;
614606

615607
send_basebackup_tarball(
616-
&mut encoder,
608+
&mut writer,
617609
timeline,
618610
Some(req_lsn),
619611
None,
620612
false,
621613
false,
614+
// Level::Best because compression is not on the hot path of basebackup requests.
615+
// The decompression is almost not affected by the compression level.
616+
Some(async_compression::Level::Best),
622617
&ctx,
623618
)
624619
.await?;
625620

626-
encoder.shutdown().await?;
627621
writer.flush().await?;
628622
writer.into_inner().sync_all().await?;
629623

0 commit comments

Comments
 (0)