Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More database nits #1773

Merged
merged 6 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,19 @@ eclair {
port = 5432
username = ""
password = ""
readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database
pool {
max-size = 10 // recommended value = number_of_cpu_cores * 2
connection-timeout = 30 seconds
idle-timeout = 10 minutes
max-life-time = 30 minutes
}
lock-type = "lease" // lease or none (do not use none in production)
lease {
interval = 5 minutes // lease-interval must be greater than lease-renew-interval
renew-interval = 1 minute
lock-timeout = 5 seconds // timeout for the lock statement on the lease table
}
lock-type = "lease" // lease or none
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,24 @@ object Databases extends Logging {
def apply(hikariConfig: HikariConfig,
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {
jdbcUrlFile_opt: Option[File],
readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig)
implicit val implicitLock: PgLock = lock

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
l.obtainExclusiveLock(ds)
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => l.obtainExclusiveLock(ds))
}

val databases = PostgresDatabases(
network = new PgNetworkDb,
audit = new PgAuditDb,
Expand All @@ -112,14 +123,13 @@ object Databases extends Logging {
dataSource = ds,
lock = lock)

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
databases.obtainExclusiveLock()
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
logger.info(s"granting read-only access to user=$readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
}
}
}

databases
Expand Down Expand Up @@ -183,6 +193,7 @@ object Databases extends Logging {
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
Expand All @@ -200,6 +211,10 @@ object Databases extends Logging {
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
// We use a timeout for locks, because we might not be able to get the lock right away due to concurrent access
// by other threads. That timeout gives time for other transactions to complete, then ours can take the lock
val lockTimeout = dbConfig.getDuration("postgres.lease.lock-timeout").toSeconds.seconds
hikariConfig.setConnectionInitSql(s"SET lock_timeout TO '${lockTimeout.toSeconds}s'")
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
}
Expand All @@ -210,7 +225,8 @@ object Databases extends Logging {
hikariConfig = hikariConfig,
instanceId = instanceId,
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile)
jdbcUrlFile_opt = Some(jdbcUrlFile),
readOnlyUser_opt = readOnlyUser_opt
)
}

Expand Down
19 changes: 12 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
val batchSize = 100
inTransaction { pg =>
using(pg.createStatement) { statement =>
using(pg.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -165,8 +169,9 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(pg.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package fr.acinq.eclair.db.pg

import fr.acinq.eclair.db.Monitoring.Metrics._
import fr.acinq.eclair.db.Monitoring.Tags
import fr.acinq.eclair.db.jdbc.JdbcUtils
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler.LockException
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -177,14 +179,16 @@ object PgUtils extends JdbcUtils {
using(connection.createStatement()) {
statement =>
// allow only one row in the ownership lease table
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
statement.executeUpdate(s"CREATE TABLE IF NOT EXISTS $LeaseTable (id INTEGER PRIMARY KEY default(1), expires_at TIMESTAMP WITH TIME ZONE NOT NULL, instance VARCHAR NOT NULL, CONSTRAINT one_row CHECK (id = 1))")
}
}

private def acquireExclusiveTableLock()(implicit connection: Connection): Unit = {
using(connection.createStatement()) {
statement =>
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE NOWAIT")
withMetrics("utils/lock", Tags.DbBackends.Postgres) {
statement.executeUpdate(s"LOCK TABLE $LeaseTable IN ACCESS EXCLUSIVE MODE")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.db.sqlite

import java.sql.Connection
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand All @@ -27,6 +26,7 @@ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncement
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
import grizzled.slf4j.Logging

import java.sql.Connection
import scala.collection.immutable.SortedMap

class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
Expand Down Expand Up @@ -132,12 +132,16 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
val batchSize = 100
using(sqlite.prepareStatement(s"DELETE FROM channels WHERE short_channel_id IN (${List.fill(batchSize)("?").mkString(",")})")) { statement =>
shortChannelIds
.grouped(1000) // remove channels by batch of 1000
.foreach { _ =>
val ids = shortChannelIds.map(_.toLong).mkString(",")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
.grouped(batchSize)
.foreach { group =>
val padded = group.toArray.padTo(batchSize, ShortChannelId(0L))
for (i <- 0 until batchSize) {
statement.setLong(1 + i, padded(i).toLong) // index for jdbc parameters starts at 1
}
statement.executeUpdate()
}
}
}
Expand All @@ -153,8 +157,9 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb with Logging {
}

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
using(sqlite.prepareStatement(s"DELETE FROM pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
import fr.acinq.eclair.{ShortChannelId, TxCoordinates}

import scala.collection.mutable
import scala.compat.Platform
import scala.concurrent.duration._

object StaleChannels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object TestDatabases {

// @formatter:off
override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection]
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile))
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile), readOnlyUser_opt = None)
override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion)
override def close(): Unit = pg.close()
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, Tor2}
import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, ShortChannelId, TestDatabases, randomBytes32, randomKey}
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.{SortedMap, mutable}
import scala.collection.{SortedMap, SortedSet, mutable}
import scala.util.Random

class NetworkDbSpec extends AnyFunSuite {

Expand Down Expand Up @@ -248,7 +249,7 @@ class NetworkDbSpec extends AnyFunSuite {
updates.foreach(u => db.updateChannel(u))
assert(db.listChannels().keySet === channels.map(_.shortChannelId).toSet)

val toDelete = channels.map(_.shortChannelId).drop(500).take(2500)
val toDelete = channels.map(_.shortChannelId).take(1 + Random.nextInt(2500))
db.removeChannels(toDelete)
assert(db.listChannels().keySet === (channels.map(_.shortChannelId).toSet -- toDelete))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,19 @@ object PgUtilsSpec extends Logging {
| port = $port
| username = "postgres"
| password = ""
| readonly-user = ""
| pool {
| max-size = 10 // recommended value = number_of_cpu_cores * 2
| connection-timeout = 30 seconds
| idle-timeout = 10 minutes
| max-life-time = 30 minutes
| }
| lock-type = "lease" // lease or none (do not use none in production)
| lease {
| interval = 5 seconds // lease-interval must be greater than lease-renew-interval
| renew-interval = 2 seconds
| lock-timeout = 5 seconds // timeout for the lock statement on the lease table
| }
| lock-type = "lease" // lease or none
|}
|""".stripMargin
)
Expand Down