Skip to content

Commit

Permalink
docs: add sample for stale reads (#539)
Browse files Browse the repository at this point in the history
* docs: add sample for stale reads

Adds a sample and tests for executing stale reads on Spanner. Using
stale reads can improve performance when the application does not require
the guarantees that are given by strong reads.

Fixes #495

* chore: remove GetSession requests
  • Loading branch information
olavloite authored Dec 9, 2024
1 parent 108d965 commit e9df810
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 0 deletions.
5 changes: 5 additions & 0 deletions samples/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def transaction(session):
_sample(session)


@nox.session()
def stale_read(session):
_sample(session)


@nox.session()
def read_only_transaction(session):
_sample(session)
Expand Down
96 changes: 96 additions & 0 deletions samples/stale_read_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid
from sqlalchemy import create_engine, Engine, select, text
from sqlalchemy.orm import Session
from sample_helper import run_sample
from model import Singer


# Shows how to execute stale reads on Spanner using SQLAlchemy.
def stale_read_sample():
engine = create_engine(
"spanner:///projects/sample-project/"
"instances/sample-instance/"
"databases/sample-database",
echo=True,
)
# First get the current database timestamp. We can use this timestamp to
# query the database at a point in time where we know it was empty.
with Session(engine.execution_options(isolation_level="AUTOCOMMIT")) as session:
timestamp = session.execute(select(text("current_timestamp"))).one()[0]
print(timestamp)

# Insert a few test rows.
insert_test_data(engine)

# Create a session that uses a read-only transaction with a strong timestamp
# bound. This means that it will read all data that has been committed at the
# time this transaction starts.
# Read-only transactions do not take locks, and are therefore preferred
# above read/write transactions for workloads that only read data on Spanner.
with Session(engine.execution_options(read_only=True)) as session:
print("Found singers with strong timestamp bound:")
singers = session.query(Singer).order_by(Singer.last_name).all()
for singer in singers:
print("Singer: ", singer.full_name)

# Create a session that uses a read-only transaction that selects data in
# the past. We'll use the timestamp that we retrieved before inserting the
# test data for this transaction.
with Session(
engine.execution_options(
read_only=True, staleness={"read_timestamp": timestamp}
)
) as session:
print("Searching for singers using a read timestamp in the past:")
singers = session.query(Singer).order_by(Singer.last_name).all()
if singers:
for singer in singers:
print("Singer: ", singer.full_name)
else:
print("No singers found.")

# Spanner also supports min_read_timestamp and max_staleness as staleness
# options. These can only be used in auto-commit mode.
# Spanner will choose a read timestamp that satisfies the given restriction
# and that can be served as efficiently as possible.
with Session(
engine.execution_options(
isolation_level="AUTOCOMMIT", staleness={"max_staleness": {"seconds": 15}}
)
) as session:
print("Searching for singers using a max staleness of 15 seconds:")
singers = session.query(Singer).order_by(Singer.last_name).all()
if singers:
for singer in singers:
print("Singer: ", singer.full_name)
else:
print("No singers found.")


def insert_test_data(engine: Engine):
with Session(engine) as session:
session.add_all(
[
Singer(id=str(uuid.uuid4()), first_name="John", last_name="Doe"),
Singer(id=str(uuid.uuid4()), first_name="Jane", last_name="Doe"),
]
)
session.commit()


if __name__ == "__main__":
run_sample(stale_read_sample)
28 changes: 28 additions & 0 deletions test/mockserver_tests/stale_read_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import String, BigInteger
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
pass


class Singer(Base):
__tablename__ = "singers"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
name: Mapped[str] = mapped_column(String)
169 changes: 169 additions & 0 deletions test/mockserver_tests/test_stale_reads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from sqlalchemy.testing import eq_, is_instance_of
from google.cloud.spanner_v1 import (
FixedSizePool,
BatchCreateSessionsRequest,
ExecuteSqlRequest,
BeginTransactionRequest,
TransactionOptions,
)
from test.mockserver_tests.mock_server_test_base import MockServerTestBase
from test.mockserver_tests.mock_server_test_base import add_result
import google.cloud.spanner_v1.types.type as spanner_type
import google.cloud.spanner_v1.types.result_set as result_set


class TestStaleReads(MockServerTestBase):
def test_stale_read_multi_use(self):
from test.mockserver_tests.stale_read_model import Singer

add_singer_query_result("SELECT singers.id, singers.name \n" + "FROM singers")
engine = create_engine(
"spanner:///projects/p/instances/i/databases/d",
echo=True,
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
)

timestamp = datetime.datetime.fromtimestamp(1733328910)
for i in range(2):
with Session(
engine.execution_options(
read_only=True,
staleness={"read_timestamp": timestamp},
)
) as session:
# Execute two queries in a read-only transaction.
session.scalars(select(Singer)).all()
session.scalars(select(Singer)).all()

# Verify the requests that we got.
requests = self.spanner_service.requests
eq_(7, len(requests))
is_instance_of(requests[0], BatchCreateSessionsRequest)
is_instance_of(requests[1], BeginTransactionRequest)
is_instance_of(requests[2], ExecuteSqlRequest)
is_instance_of(requests[3], ExecuteSqlRequest)
is_instance_of(requests[4], BeginTransactionRequest)
is_instance_of(requests[5], ExecuteSqlRequest)
is_instance_of(requests[6], ExecuteSqlRequest)
# Verify that the transaction is a read-only transaction.
for index in [1, 4]:
begin_request: BeginTransactionRequest = requests[index]
eq_(
TransactionOptions(
dict(
read_only=TransactionOptions.ReadOnly(
dict(
read_timestamp={"seconds": 1733328910},
return_read_timestamp=True,
)
)
)
),
begin_request.options,
)

def test_stale_read_single_use(self):
from test.mockserver_tests.stale_read_model import Singer

add_singer_query_result("SELECT singers.id, singers.name\n" + "FROM singers")
engine = create_engine(
"spanner:///projects/p/instances/i/databases/d",
echo=True,
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
)

with Session(
engine.execution_options(
isolation_level="AUTOCOMMIT",
staleness={"max_staleness": {"seconds": 15}},
)
) as session:
# Execute two queries in autocommit.
session.scalars(select(Singer)).all()
session.scalars(select(Singer)).all()

# Verify the requests that we got.
requests = self.spanner_service.requests
eq_(3, len(requests))
is_instance_of(requests[0], BatchCreateSessionsRequest)
is_instance_of(requests[1], ExecuteSqlRequest)
is_instance_of(requests[2], ExecuteSqlRequest)
# Verify that the requests use a stale read.
for index in [1, 2]:
execute_request: ExecuteSqlRequest = requests[index]
eq_(
TransactionOptions(
dict(
read_only=TransactionOptions.ReadOnly(
dict(
max_staleness={"seconds": 15},
return_read_timestamp=True,
)
)
)
),
execute_request.transaction.single_use,
)


def add_singer_query_result(sql: str):
result = result_set.ResultSet(
dict(
metadata=result_set.ResultSetMetadata(
dict(
row_type=spanner_type.StructType(
dict(
fields=[
spanner_type.StructType.Field(
dict(
name="singers_id",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.INT64)
),
)
),
spanner_type.StructType.Field(
dict(
name="singers_name",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.STRING)
),
)
),
]
)
)
)
),
)
)
result.rows.extend(
[
(
"1",
"Jane Doe",
),
(
"2",
"John Doe",
),
]
)
add_result(sql, result)

0 comments on commit e9df810

Please sign in to comment.