Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix previously published db migration #1454

Merged
merged 1 commit into from
Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
drop table if exists valid_file_ids;
create table valid_file_ids (
id bigserial primary key,
file_id varchar(254) not null unique
);

-- Source files
insert into valid_file_ids (file_id)
select rs.file_id
from attachment_source rs
;

-- Archive files
insert into valid_file_ids (file_id)
select distinct rs.file_id
from attachment_archive rs
;

-- Preview image
insert into valid_file_ids (file_id)
select ap.file_id
from attachment_preview ap
;

-- classifier
insert into valid_file_ids (file_id)
select file_id
from classifier_model
;

-- save obsolete files
drop table if exists obsolete_files;
create table obsolete_files(
file_id varchar(254) not null
);

with
missing_ids as (
select file_id from filemeta
except
select file_id as file_id from valid_file_ids)
insert into obsolete_files (file_id)
select file_id from filemeta
where file_id in (select file_id from missing_ids)
;


-- remove orphaned chunks
delete from "filechunk"
where "file_id" in (
select distinct file_id from filechunk
where file_id not in (select file_id from valid_file_ids)
and file_id not in (select file_id from obsolete_files)
);

-- drop temp table
drop table valid_file_ids;
drop table obsolete_files;
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ final class StoreImpl[F[_]: Async](
FunctionK.lift(transact)

def migrate: F[Int] =
FlywayMigrate.run[F](jdbc).map(_.migrationsExecuted)
FlywayMigrate[F](jdbc, xa).run.map(_.migrationsExecuted)

def transact[A](prg: ConnectionIO[A]): F[A] =
prg.transact(xa)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,90 @@
package docspell.store.migrate

import cats.effect.Sync
import cats.implicits._

import docspell.store.JdbcConfig
import docspell.store.migrate.FlywayMigrate.MigrationKind

import doobie.implicits._
import doobie.util.transactor.Transactor
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.output.MigrateResult
import org.log4s._

object FlywayMigrate {
private[this] val logger = getLogger

def run[F[_]: Sync](jdbc: JdbcConfig): F[MigrateResult] =
Sync[F].delay {
logger.info("Running db migrations...")
val locations = jdbc.dbmsName match {
case Some(dbtype) =>
List(s"classpath:db/migration/$dbtype", "classpath:db/migration/common")
case None =>
logger.warn(
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
)
List("classpath:db/migration/h2", "classpath:db/migration/common")
}

logger.info(s"Using migration locations: $locations")
val fw = Flyway
class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) {
private[this] val logger = docspell.logging.getLogger[F]

private def createLocations(folder: String) =
jdbc.dbmsName match {
case Some(dbtype) =>
List(s"classpath:db/$folder/$dbtype", s"classpath:db/$folder/common")
case None =>
logger.warn(
s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2"
)
List(s"classpath:db/$folder/h2", s"classpath:db/$folder/common")
}

def createFlyway(kind: MigrationKind): F[Flyway] =
for {
locations <- Sync[F].pure(createLocations(kind.folder))
_ <- logger.info(s"Creating Flyway for: $locations")
fw = Flyway
.configure()
.table(kind.table)
.cleanDisabled(true)
.dataSource(jdbc.url.asString, jdbc.user, jdbc.password)
.locations(locations: _*)
.baselineOnMigrate(kind == MigrationKind.Fixups)
.load()
} yield fw

def run: F[MigrateResult] =
for {
_ <- runFixups
fw <- createFlyway(MigrationKind.Main)
_ <- logger.info(s"!!! Running main migrations")
result <- Sync[F].blocking(fw.migrate())
} yield result

// A hack to fix already published migrations
def runFixups: F[Unit] =
isSchemaEmpty.flatMap {
case true =>
().pure[F]
case false =>
for {
fw <- createFlyway(MigrationKind.Fixups)
_ <- logger.info(s"!!! Running fixup migrations")
_ <- Sync[F].blocking(fw.migrate())
} yield ()
}

private def isSchemaEmpty: F[Boolean] =
sql"select count(1) from flyway_schema_history"
.query[Int]
.unique
.attemptSql
.transact(xa)
.map(_.isLeft)
}

fw.migrate()
object FlywayMigrate {
def apply[F[_]: Sync](jdbcConfig: JdbcConfig, xa: Transactor[F]): FlywayMigrate[F] =
new FlywayMigrate[F](jdbcConfig, xa)

sealed trait MigrationKind {
def table: String
def folder: String
}
object MigrationKind {
case object Main extends MigrationKind {
val table = "flyway_schema_history"
val folder = "migration"
}
case object Fixups extends MigrationKind {
val table = "flyway_fixup_history"
val folder = "fixups"
}
}
}
31 changes: 28 additions & 3 deletions modules/store/src/test/scala/docspell/store/StoreFixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import docspell.store.migrate.FlywayMigrate

import doobie._
import munit._
import org.h2.jdbcx.JdbcConnectionPool
import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource}
import org.mariadb.jdbc.MariaDbDataSource
import org.postgresql.ds.PGConnectionPoolDataSource

trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite =>

Expand All @@ -26,7 +28,7 @@ trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite =>
for {
ds <- StoreFixture.dataSource(cfg)
xa <- StoreFixture.makeXA(ds)
_ <- Resource.eval(FlywayMigrate.run[IO](cfg))
_ <- Resource.eval(FlywayMigrate[IO](cfg, xa).run)
} yield xa
}

Expand All @@ -52,7 +54,30 @@ object StoreFixture {

def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = {
def jdbcConnPool =
JdbcConnectionPool.create(jdbc.url.asString, jdbc.user, jdbc.password)
jdbc.dbmsName match {
case Some("mariadb") =>
val ds = new MariaDbDataSource()
ds.setUrl(jdbc.url.asString)
ds.setUser(jdbc.user)
ds.setPassword(jdbc.password)
JdbcConnectionPool.create(ds)

case Some("postgresql") =>
val ds = new PGConnectionPoolDataSource()
ds.setURL(jdbc.url.asString)
ds.setUser(jdbc.user)
ds.setPassword(jdbc.password)
JdbcConnectionPool.create(ds)

case Some("h2") =>
val ds = new JdbcDataSource()
ds.setURL(jdbc.url.asString)
ds.setUser(jdbc.user)
ds.setPassword(jdbc.password)
JdbcConnectionPool.create(ds)

case n => sys.error(s"Unknown db name: $n")
}

Resource.make(IO(jdbcConnPool))(cp => IO(cp.dispose()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ class H2MigrateTest extends FunSuite with TestLoggingConfig {

test("h2 empty schema migration") {
val jdbc = StoreFixture.memoryDB("h2test")
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
assert(result.migrationsExecuted > 0)
}
val ds = StoreFixture.dataSource(jdbc)
val result =
ds.flatMap(StoreFixture.makeXA).use { xa =>
FlywayMigrate[IO](jdbc, xa).run
}

assert(result.unsafeRunSync().migrationsExecuted > 0)

// a second time to apply fixup migrations
assert(result.unsafeRunSync().migrationsExecuted == 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._

import docspell.common.LenientUri
import docspell.logging.TestLoggingConfig
import docspell.store.JdbcConfig
import docspell.store.{JdbcConfig, StoreFixture}

import com.dimafeng.testcontainers.MariaDBContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
Expand All @@ -30,8 +30,13 @@ class MariaDbMigrateTest
withContainers { cnt =>
val jdbc =
JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.dbUsername, cnt.dbPassword)
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
assert(result.migrationsExecuted > 0)
val ds = StoreFixture.dataSource(jdbc)
val result = ds.flatMap(StoreFixture.makeXA).use { xa =>
FlywayMigrate[IO](jdbc, xa).run
}
assert(result.unsafeRunSync().migrationsExecuted > 0)
// a second time to apply fixup migrations
assert(result.unsafeRunSync().migrationsExecuted == 0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._

import docspell.common.LenientUri
import docspell.logging.TestLoggingConfig
import docspell.store.JdbcConfig
import docspell.store.{JdbcConfig, StoreFixture}

import com.dimafeng.testcontainers.PostgreSQLContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
Expand All @@ -30,8 +30,16 @@ class PostgresqlMigrateTest
withContainers { cnt =>
val jdbc =
JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.username, cnt.password)
val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync()
assert(result.migrationsExecuted > 0)

val ds = StoreFixture.dataSource(jdbc)
val result =
ds.flatMap(StoreFixture.makeXA).use { xa =>
FlywayMigrate[IO](jdbc, xa).run
}
assert(result.unsafeRunSync().migrationsExecuted > 0)

// a second time to apply fixup migrations
assert(result.unsafeRunSync().migrationsExecuted == 0)
}
}
}