Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support offloaded timelines during shard split #9489

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 96 additions & 40 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
use std::sync::OnceLock;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
Expand Down Expand Up @@ -518,7 +519,7 @@ pub struct OffloadedTimeline {
/// If we offload a timeline, we keep around the remote client
/// for the duration of the process. If we find it through the
/// manifest, we don't construct it up until it's needed (deletion).
pub remote_client: Option<Arc<RemoteTimelineClient>>,
pub remote_client: OnceLock<Arc<RemoteTimelineClient>>,
arpad-m marked this conversation as resolved.
Show resolved Hide resolved

/// Prevent two tasks from deleting the timeline at the same time. If held, the
/// timeline is being deleted. If 'true', the timeline has already been deleted.
Expand Down Expand Up @@ -546,7 +547,7 @@ impl OffloadedTimeline {
ancestor_retain_lsn,
archived_at,

remote_client: Some(timeline.remote_client.clone()),
remote_client: OnceLock::from(timeline.remote_client.clone()),
delete_progress: timeline.delete_progress.clone(),
})
}
Expand All @@ -563,7 +564,7 @@ impl OffloadedTimeline {
ancestor_timeline_id,
ancestor_retain_lsn,
archived_at,
remote_client: None,
remote_client: OnceLock::new(),
delete_progress: TimelineDeleteProgress::default(),
}
}
Expand All @@ -582,6 +583,13 @@ impl OffloadedTimeline {
archived_at: *archived_at,
}
}
fn remote_client_maybe_construct(&self, tenant: &Tenant) -> &Arc<RemoteTimelineClient> {
self.remote_client.get_or_init(|| {
let remote_client =
tenant.build_timeline_client(self.timeline_id, tenant.remote_storage.clone());
Arc::new(remote_client)
})
}
}

#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
Expand All @@ -597,17 +605,21 @@ pub enum TimelineOrOffloaded {
}

impl TimelineOrOffloaded {
pub fn tenant_shard_id(&self) -> TenantShardId {
pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
TimelineOrOffloaded::Timeline(timeline) => {
TimelineOrOffloadedArcRef::Timeline(timeline)
}
TimelineOrOffloaded::Offloaded(offloaded) => {
TimelineOrOffloadedArcRef::Offloaded(offloaded)
}
}
}
pub fn tenant_shard_id(&self) -> TenantShardId {
self.arc_ref().tenant_shard_id()
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
}
self.arc_ref().timeline_id()
}
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
match self {
Expand All @@ -616,20 +628,48 @@ impl TimelineOrOffloaded {
}
}
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
self.arc_ref().remote_client_maybe_construct(tenant)
}
}

pub enum TimelineOrOffloadedArcRef<'a> {
Timeline(&'a Arc<Timeline>),
Offloaded(&'a Arc<OffloadedTimeline>),
}

impl TimelineOrOffloadedArcRef<'_> {
pub fn tenant_shard_id(&self) -> TenantShardId {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
Some(remote_client) => remote_client,
None => {
let remote_client = tenant.build_timeline_client(
offloaded.timeline_id,
tenant.remote_storage.clone(),
);
Arc::new(remote_client)
}
},
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
}
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
}
}
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloadedArcRef::Offloaded(offloaded) => {
offloaded.remote_client_maybe_construct(tenant).clone()
}
}
}
}

impl<'a> From<&'a Arc<Timeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<Timeline>) -> Self {
Self::Timeline(timeline)
}
}

impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<OffloadedTimeline>) -> Self {
Self::Offloaded(timeline)
}
}

#[derive(Debug, thiserror::Error, PartialEq, Eq)]
Expand Down Expand Up @@ -2917,33 +2957,51 @@ impl Tenant {
&self,
child_shards: &Vec<TenantShardId>,
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let (timelines, offloaded) = {
let timelines = self.timelines.lock().unwrap();
let offloaded = self.timelines_offloaded.lock().unwrap();
(timelines.clone(), offloaded.clone())
};
let timelines_iter = timelines
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from)
.chain(
offloaded
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from),
);
for timeline in timelines_iter {
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.

// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
let timeline_id = timeline.timeline_id();

if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline {
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(%timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
}

let remote_client = timeline.remote_client_maybe_construct(self);

// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
timeline.remote_client.shutdown().await;
tracing::info!(%timeline_id, "Shutting down remote storage client");
remote_client.shutdown().await;

// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
let result = timeline.remote_client
tracing::info!(%timeline_id, "Downloading index_part from parent");
let result = remote_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))
.await?;
let index_part = match result {
MaybeDeletedIndexPart::Deleted(_) => {
Expand All @@ -2953,11 +3011,11 @@ impl Tenant {
};

for child_shard in child_shards {
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(
&self.remote_storage,
child_shard,
&timeline.timeline_id,
&timeline_id,
self.generation,
&index_part,
&self.cancel,
Expand All @@ -2966,8 +3024,6 @@ impl Tenant {
}
}

// TODO: also copy index files of offloaded timelines

let tenant_manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,10 +1278,14 @@ impl RemoteTimelineClient {
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return,
UploadQueue::Stopped(_) => {
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Uninitialized => {
// transition into Stopped state
self.stop_impl(&mut guard);
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Initialized(ref mut init) => init,
Expand Down
27 changes: 26 additions & 1 deletion test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@

from fixtures import overlayfs
from fixtures.auth_tokens import AuthKeys, TokenScope
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
from fixtures.common_types import (
Lsn,
NodeId,
TenantId,
TenantShardId,
TimelineArchivalState,
TimelineId,
)
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
Expand Down Expand Up @@ -2132,6 +2139,24 @@ def step_down(self):
response.raise_for_status()
return response.json()

def timeline_archival_config(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
state: TimelineArchivalState,
):
config = {"state": state.value}
log.info(
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
)
res = self.request(
"PUT",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
json=config,
headers=self.headers(TokenScope.ADMIN),
)
return res.json()

def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
Expand Down
25 changes: 25 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ def from_json(cls, d: dict[str, Any]) -> TenantConfig:
)


@dataclass
class TimelinesInfoAndOffloaded:
timelines: list[dict[str, Any]]
offloaded: list[dict[str, Any]]

@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded:
return TimelinesInfoAndOffloaded(
timelines=d["timelines"],
offloaded=d["offloaded"],
)


class PageserverHttpClient(requests.Session, MetricsGetter):
def __init__(
self,
Expand Down Expand Up @@ -464,6 +477,18 @@ def timeline_list(
assert isinstance(res_json, list)
return res_json

def timeline_and_offloaded_list(
self,
tenant_id: Union[TenantId, TenantShardId],
) -> TimelinesInfoAndOffloaded:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded",
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return TimelinesInfoAndOffloaded.from_json(res_json)

def timeline_create(
self,
pg_version: PgVersion,
Expand Down
Loading
Loading