diff --git a/modules/store/src/main/resources/db/fixups/postgresql/V1.33.0__fix_reorganize_files.sql b/modules/store/src/main/resources/db/fixups/postgresql/V1.33.0__fix_reorganize_files.sql new file mode 100644 index 0000000000..6ac74c2b14 --- /dev/null +++ b/modules/store/src/main/resources/db/fixups/postgresql/V1.33.0__fix_reorganize_files.sql @@ -0,0 +1,58 @@ +drop table if exists valid_file_ids; +create table valid_file_ids ( + id bigserial primary key, + file_id varchar(254) not null unique +); + +-- Source files +insert into valid_file_ids (file_id) + select rs.file_id + from attachment_source rs +; + +-- Archive files +insert into valid_file_ids (file_id) + select distinct rs.file_id + from attachment_archive rs +; + +-- Preview image +insert into valid_file_ids (file_id) + select ap.file_id + from attachment_preview ap +; + +-- classifier +insert into valid_file_ids (file_id) + select file_id + from classifier_model +; + +-- save obsolete files +drop table if exists obsolete_files; +create table obsolete_files( + file_id varchar(254) not null +); + +with + missing_ids as ( + select file_id from filemeta + except + select file_id as file_id from valid_file_ids) +insert into obsolete_files (file_id) + select file_id from filemeta + where file_id in (select file_id from missing_ids) +; + + +-- remove orphaned chunks +delete from "filechunk" +where "file_id" in ( + select distinct file_id from filechunk + where file_id not in (select file_id from valid_file_ids) + and file_id not in (select file_id from obsolete_files) +); + +-- drop temp table +drop table valid_file_ids; +drop table obsolete_files; diff --git a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala index 1c7749565b..87703a8bd9 100644 --- a/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala +++ b/modules/store/src/main/scala/docspell/store/impl/StoreImpl.scala @@ -37,7 +37,7 @@ final class StoreImpl[F[_]: Async]( FunctionK.lift(transact) def migrate: F[Int] = - FlywayMigrate.run[F](jdbc).map(_.migrationsExecuted) + FlywayMigrate[F](jdbc, xa).run.map(_.migrationsExecuted) def transact[A](prg: ConnectionIO[A]): F[A] = prg.transact(xa) diff --git a/modules/store/src/main/scala/docspell/store/migrate/FlywayMigrate.scala b/modules/store/src/main/scala/docspell/store/migrate/FlywayMigrate.scala index c5c749d029..3e3a3e7f34 100644 --- a/modules/store/src/main/scala/docspell/store/migrate/FlywayMigrate.scala +++ b/modules/store/src/main/scala/docspell/store/migrate/FlywayMigrate.scala @@ -7,37 +7,90 @@ package docspell.store.migrate import cats.effect.Sync +import cats.implicits._ import docspell.store.JdbcConfig +import docspell.store.migrate.FlywayMigrate.MigrationKind +import doobie.implicits._ +import doobie.util.transactor.Transactor import org.flywaydb.core.Flyway import org.flywaydb.core.api.output.MigrateResult -import org.log4s._ -object FlywayMigrate { - private[this] val logger = getLogger - - def run[F[_]: Sync](jdbc: JdbcConfig): F[MigrateResult] = - Sync[F].delay { - logger.info("Running db migrations...") - val locations = jdbc.dbmsName match { - case Some(dbtype) => - List(s"classpath:db/migration/$dbtype", "classpath:db/migration/common") - case None => - logger.warn( - s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2" - ) - List("classpath:db/migration/h2", "classpath:db/migration/common") - } - - logger.info(s"Using migration locations: $locations") - val fw = Flyway +class FlywayMigrate[F[_]: Sync](jdbc: JdbcConfig, xa: Transactor[F]) { + private[this] val logger = docspell.logging.getLogger[F] + + private def createLocations(folder: String) = + jdbc.dbmsName match { + case Some(dbtype) => + List(s"classpath:db/$folder/$dbtype", s"classpath:db/$folder/common") + case None => + logger.warn( + s"Cannot read database name from jdbc url: ${jdbc.url}. Go with H2" + ) + List(s"classpath:db/$folder/h2", s"classpath:db/$folder/common") + } + + def createFlyway(kind: MigrationKind): F[Flyway] = + for { + locations <- Sync[F].pure(createLocations(kind.folder)) + _ <- logger.info(s"Creating Flyway for: $locations") + fw = Flyway .configure() + .table(kind.table) .cleanDisabled(true) .dataSource(jdbc.url.asString, jdbc.user, jdbc.password) .locations(locations: _*) + .baselineOnMigrate(kind == MigrationKind.Fixups) .load() + } yield fw + + def run: F[MigrateResult] = + for { + _ <- runFixups + fw <- createFlyway(MigrationKind.Main) + _ <- logger.info(s"!!! Running main migrations") + result <- Sync[F].blocking(fw.migrate()) + } yield result + + // A hack to fix already published migrations + def runFixups: F[Unit] = + isSchemaEmpty.flatMap { + case true => + ().pure[F] + case false => + for { + fw <- createFlyway(MigrationKind.Fixups) + _ <- logger.info(s"!!! Running fixup migrations") + _ <- Sync[F].blocking(fw.migrate()) + } yield () + } + + private def isSchemaEmpty: F[Boolean] = + sql"select count(1) from flyway_schema_history" + .query[Int] + .unique + .attemptSql + .transact(xa) + .map(_.isLeft) +} - fw.migrate() +object FlywayMigrate { + def apply[F[_]: Sync](jdbcConfig: JdbcConfig, xa: Transactor[F]): FlywayMigrate[F] = + new FlywayMigrate[F](jdbcConfig, xa) + + sealed trait MigrationKind { + def table: String + def folder: String + } + object MigrationKind { + case object Main extends MigrationKind { + val table = "flyway_schema_history" + val folder = "migration" + } + case object Fixups extends MigrationKind { + val table = "flyway_fixup_history" + val folder = "fixups" } + } } diff --git a/modules/store/src/test/scala/docspell/store/StoreFixture.scala b/modules/store/src/test/scala/docspell/store/StoreFixture.scala index 2933b51fac..1213f31cef 100644 --- a/modules/store/src/test/scala/docspell/store/StoreFixture.scala +++ b/modules/store/src/test/scala/docspell/store/StoreFixture.scala @@ -17,7 +17,9 @@ import docspell.store.migrate.FlywayMigrate import doobie._ import munit._ -import org.h2.jdbcx.JdbcConnectionPool +import org.h2.jdbcx.{JdbcConnectionPool, JdbcDataSource} +import org.mariadb.jdbc.MariaDbDataSource +import org.postgresql.ds.PGConnectionPoolDataSource trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite => @@ -26,7 +28,7 @@ trait StoreFixture extends CatsEffectFunFixtures { self: CatsEffectSuite => for { ds <- StoreFixture.dataSource(cfg) xa <- StoreFixture.makeXA(ds) - _ <- Resource.eval(FlywayMigrate.run[IO](cfg)) + _ <- Resource.eval(FlywayMigrate[IO](cfg, xa).run) } yield xa } @@ -52,7 +54,30 @@ object StoreFixture { def dataSource(jdbc: JdbcConfig): Resource[IO, JdbcConnectionPool] = { def jdbcConnPool = - JdbcConnectionPool.create(jdbc.url.asString, jdbc.user, jdbc.password) + jdbc.dbmsName match { + case Some("mariadb") => + val ds = new MariaDbDataSource() + ds.setUrl(jdbc.url.asString) + ds.setUser(jdbc.user) + ds.setPassword(jdbc.password) + JdbcConnectionPool.create(ds) + + case Some("postgresql") => + val ds = new PGConnectionPoolDataSource() + ds.setURL(jdbc.url.asString) + ds.setUser(jdbc.user) + ds.setPassword(jdbc.password) + JdbcConnectionPool.create(ds) + + case Some("h2") => + val ds = new JdbcDataSource() + ds.setURL(jdbc.url.asString) + ds.setUser(jdbc.user) + ds.setPassword(jdbc.password) + JdbcConnectionPool.create(ds) + + case n => sys.error(s"Unknown db name: $n") + } Resource.make(IO(jdbcConnPool))(cp => IO(cp.dispose())) } diff --git a/modules/store/src/test/scala/docspell/store/migrate/H2MigrateTest.scala b/modules/store/src/test/scala/docspell/store/migrate/H2MigrateTest.scala index df03453ff9..1223237685 100644 --- a/modules/store/src/test/scala/docspell/store/migrate/H2MigrateTest.scala +++ b/modules/store/src/test/scala/docspell/store/migrate/H2MigrateTest.scala @@ -18,8 +18,15 @@ class H2MigrateTest extends FunSuite with TestLoggingConfig { test("h2 empty schema migration") { val jdbc = StoreFixture.memoryDB("h2test") - val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync() - assert(result.migrationsExecuted > 0) - } + val ds = StoreFixture.dataSource(jdbc) + val result = + ds.flatMap(StoreFixture.makeXA).use { xa => + FlywayMigrate[IO](jdbc, xa).run + } + + assert(result.unsafeRunSync().migrationsExecuted > 0) + // a second time to apply fixup migrations + assert(result.unsafeRunSync().migrationsExecuted == 0) + } } diff --git a/modules/store/src/test/scala/docspell/store/migrate/MariaDbMigrateTest.scala b/modules/store/src/test/scala/docspell/store/migrate/MariaDbMigrateTest.scala index 321a1b4db1..a77382dcc2 100644 --- a/modules/store/src/test/scala/docspell/store/migrate/MariaDbMigrateTest.scala +++ b/modules/store/src/test/scala/docspell/store/migrate/MariaDbMigrateTest.scala @@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._ import docspell.common.LenientUri import docspell.logging.TestLoggingConfig -import docspell.store.JdbcConfig +import docspell.store.{JdbcConfig, StoreFixture} import com.dimafeng.testcontainers.MariaDBContainer import com.dimafeng.testcontainers.munit.TestContainerForAll @@ -30,8 +30,13 @@ class MariaDbMigrateTest withContainers { cnt => val jdbc = JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.dbUsername, cnt.dbPassword) - val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync() - assert(result.migrationsExecuted > 0) + val ds = StoreFixture.dataSource(jdbc) + val result = ds.flatMap(StoreFixture.makeXA).use { xa => + FlywayMigrate[IO](jdbc, xa).run + } + assert(result.unsafeRunSync().migrationsExecuted > 0) + // a second time to apply fixup migrations + assert(result.unsafeRunSync().migrationsExecuted == 0) } } } diff --git a/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala b/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala index 9decab2f29..1ba69f55c5 100644 --- a/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala +++ b/modules/store/src/test/scala/docspell/store/migrate/PostgresqlMigrateTest.scala @@ -11,7 +11,7 @@ import cats.effect.unsafe.implicits._ import docspell.common.LenientUri import docspell.logging.TestLoggingConfig -import docspell.store.JdbcConfig +import docspell.store.{JdbcConfig, StoreFixture} import com.dimafeng.testcontainers.PostgreSQLContainer import com.dimafeng.testcontainers.munit.TestContainerForAll @@ -30,8 +30,16 @@ class PostgresqlMigrateTest withContainers { cnt => val jdbc = JdbcConfig(LenientUri.unsafe(cnt.jdbcUrl), cnt.username, cnt.password) - val result = FlywayMigrate.run[IO](jdbc).unsafeRunSync() - assert(result.migrationsExecuted > 0) + + val ds = StoreFixture.dataSource(jdbc) + val result = + ds.flatMap(StoreFixture.makeXA).use { xa => + FlywayMigrate[IO](jdbc, xa).run + } + assert(result.unsafeRunSync().migrationsExecuted > 0) + + // a second time to apply fixup migrations + assert(result.unsafeRunSync().migrationsExecuted == 0) } } }