Skip to content

Use jsonb to store oplog and ps_data #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
use crate::fix_data::apply_v035_fix;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 10;
pub const LATEST_VERSION: i32 = 11;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -384,5 +384,26 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
.into_db_result(local_db)?;
}

if current_version < 11 && target_version >= 11 {
local_db.exec_safe("PRAGMA writable_schema = ON;")?;
local_db
.exec_safe("UPDATE sqlite_schema SET sql = replace(sql, 'data TEXT', 'data ANY') WHERE name = 'ps_oplog'")?;
local_db.exec_safe("PRAGMA writable_schema = RESET;")?;

local_db
.exec_safe(
"\
INSERT INTO ps_migration(id, down_migrations) VALUES (11, json_array(
json_object('sql', 'PRAGMA writable_schema = ON;'),
json_object('sql', 'UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';'),
json_object('sql', 'UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';'),
json_object('sql', 'PRAGMA writable_schema = OFF;'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
));
",
)
.into_db_result(local_db)?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion crates/core/src/sync/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ RETURNING op_id, hash",

// language=SQLite
let insert_statement = db.prepare_v2("\
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, jsonb(?), ?)")?;
insert_statement.bind_int64(1, bucket_id)?;

let updated_row_statement = db.prepare_v2(
Expand Down
50 changes: 30 additions & 20 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,33 +145,43 @@ impl<'a> SyncOperation<'a> {
let mut untyped_delete_statement: Option<ManagedStmt> = None;
let mut untyped_insert_statement: Option<ManagedStmt> = None;

let mut jsonb2json: Option<ManagedStmt> = None;

while statement.step().into_db_result(self.db)? == ResultCode::ROW {
let type_name = statement.column_text(0)?;
let id = statement.column_text(1)?;
let data = statement.column_text(2);
// data can either be null (delete), or a JSON object as text or blob (put)
let data_value = statement.column_value(2)?;
let has_data = data_value.value_type() != ColumnType::Null;

if let Some(known) = self.schema.tables.get_mut(type_name) {
if let Some(raw) = &mut known.raw {
match data {
Ok(data) => {
let stmt = raw.put_statement(self.db)?;
let parsed: serde_json::Value = serde_json::from_str(data)
.map_err(PowerSyncError::json_local_error)?;
stmt.bind_for_put(id, &parsed)?;
stmt.stmt.exec()?;
}
Err(_) => {
let stmt = raw.delete_statement(self.db)?;
stmt.bind_for_delete(id)?;
stmt.stmt.exec()?;
}
if has_data {
// data_value could be jsonb, but we need json to parse it for raw statements.
let convert_stmt = match &jsonb2json {
Some(stmt) => stmt,
None => jsonb2json.insert(self.db.prepare_v2("SELECT json(?)")?),
};

convert_stmt.reset()?;
convert_stmt.bind_value(1, data_value)?;
convert_stmt.step()?;
let data = convert_stmt.column_text(0)?;

let stmt = raw.put_statement(self.db)?;
let parsed: serde_json::Value =
serde_json::from_str(data).map_err(PowerSyncError::json_local_error)?;
stmt.bind_for_put(id, &parsed)?;
stmt.stmt.exec()?;
} else {
let stmt = raw.delete_statement(self.db)?;
stmt.bind_for_delete(id)?;
stmt.stmt.exec()?;
}
} else {
let quoted = quote_internal_name(type_name, false);

// is_err() is essentially a NULL check here.
// NULL data means no PUT operations found, so we delete the row.
if data.is_err() {
if !has_data {
// DELETE
if last_delete_table.as_deref() != Some(&quoted) {
// Prepare statement when the table changed
Expand Down Expand Up @@ -204,12 +214,12 @@ impl<'a> SyncOperation<'a> {
let insert_statement = last_insert_statement.as_mut().unwrap();
insert_statement.reset()?;
insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?;
insert_statement.bind_value(2, data_value)?;
insert_statement.exec()?;
}
}
} else {
if data.is_err() {
if !has_data {
// DELETE
if untyped_delete_statement.is_none() {
// Prepare statement on first use
Expand Down Expand Up @@ -240,7 +250,7 @@ impl<'a> SyncOperation<'a> {
insert_statement.reset()?;
insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?;
insert_statement.bind_value(3, data_value)?;
insert_statement.exec()?;
}
}
Expand Down
62 changes: 61 additions & 1 deletion dart/test/utils/migration_fixtures.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The current database version
const databaseVersion = 10;
const databaseVersion = 11;

/// This is the base database state that we expect at various schema versions.
/// Generated by loading the specific library version, and exporting the schema.
Expand Down Expand Up @@ -354,6 +354,54 @@ const expectedState = <int, String>{
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
''',
11: r'''
;CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
;CREATE TABLE ps_oplog(
bucket INTEGER NOT NULL,
op_id INTEGER NOT NULL,
row_type TEXT,
row_id TEXT,
key TEXT,
data ANY,
hash INTEGER NOT NULL) STRICT
;CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL PRIMARY KEY,
last_synced_at TEXT NOT NULL
) STRICT
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
;CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"PRAGMA writable_schema = ON;"},{"sql":"UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';"},{"sql":"UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';"},{"sql":"PRAGMA writable_schema = OFF;"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]')
''',
};

Expand Down Expand Up @@ -456,6 +504,17 @@ const data1 = <int, String>{
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
''',
11: r'''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
'''
};

Expand Down Expand Up @@ -501,6 +560,7 @@ final dataDown1 = <int, String>{
7: data1[5]!,
8: data1[5]!,
9: data1[9]!,
10: data1[9]!,
};

final finalData1 = data1[databaseVersion]!;
Expand Down
Loading