Skip to content

Commit

Permalink
Use schemas in Postgres (#1866)
Browse files Browse the repository at this point in the history
Instead of having a flat organization under the default `public` schema, we classify tables in schemas. There is roughly one schema per database type.

The new hierarchy is:
- `local`
  - `channels`
  - `htlc_infos`
  - `pending_settlement_commands`
  - `peers`
- `network`
  - `nodes`
  - `public_channels`
  - `pruned_channels`
- `payments`
  - `received`
  - `sent`
- `audit`
  - (all the audit db tables)
- `public`
  - `lease`
  - `versions`

Note in particular, the change in naming for local channels vs external channels:
- `local_channels` -> `local.channels`
- `channels` -> `network.public_channels`

The two internal tables `lease` and `versions` stay in the `public`
schema, because we have no meta way of migrating them.
  • Loading branch information
pm47 authored Jul 8, 2021
1 parent 08faf3b commit f8feb19
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 152 deletions.
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ object Databases extends Logging {
readOnlyUser_opt.foreach { readOnlyUser =>
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement =>
logger.info(s"granting read-only access to user=$readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil
schemas.foreach { schema =>
logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema")
statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser")
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA $schema TO $readOnlyUser")
}
}
}
}
Expand Down
76 changes: 47 additions & 29 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 6
val CURRENT_VERSION = 7

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

Expand All @@ -62,32 +62,50 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("ALTER TABLE channel_errors ALTER COLUMN timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + timestamp * interval '1 millisecond'")
}

def migration67(statement: Statement): Unit = {
statement.executeUpdate("CREATE SCHEMA audit")
statement.executeUpdate("ALTER TABLE sent SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE received SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE relayed SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE relayed_trampoline SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE network_fees SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE channel_events SET SCHEMA audit")
statement.executeUpdate("ALTER TABLE channel_errors SET SCHEMA audit")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE SCHEMA audit")

statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON channel_errors(timestamp)")
statement.executeUpdate("CREATE TABLE audit.sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")

statement.executeUpdate("CREATE TABLE audit.channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON audit.sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON audit.received(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON audit.relayed(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON audit.relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON audit.relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON audit.relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON audit.network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON audit.channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON audit.channel_errors(timestamp)")
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
migration67(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
migration67(statement)
case Some(v@6) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration67(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand All @@ -97,7 +115,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
statement.setString(2, e.remoteNodeId.value.toHex)
statement.setLong(3, e.capacity.toLong)
Expand All @@ -112,7 +130,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
statement.setLong(1, p.amount.toLong)
statement.setLong(2, p.feesPaid.toLong)
Expand All @@ -133,7 +151,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.received VALUES (?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
statement.setLong(1, p.amount.toLong)
statement.setString(2, e.paymentHash.toHex)
Expand All @@ -153,7 +171,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
using(pg.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setString(3, nextTrampolineNodeId.value.toHex)
Expand All @@ -164,7 +182,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
}
for (p <- payments) {
using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, p.amount.toLong)
statement.setString(3, p.channelId.toHex)
Expand All @@ -179,7 +197,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
statement.setString(2, e.remoteNodeId.value.toHex)
statement.setString(3, e.tx.txid.toHex)
Expand All @@ -193,7 +211,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
val (errorName, errorMessage) = e.error match {
case LocalError(t) => (t.getClass.getSimpleName, t.getMessage)
case RemoteError(error) => ("remote", error.toAscii)
Expand All @@ -211,7 +229,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listSent(from: Long, to: Long): Seq[PaymentSent] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand Down Expand Up @@ -241,7 +259,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listReceived(from: Long, to: Long): Seq[PaymentReceived] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM received WHERE timestamp BETWEEN ? AND ?")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.received WHERE timestamp BETWEEN ? AND ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand All @@ -262,7 +280,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
inTransaction { pg =>
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand All @@ -273,7 +291,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
trampolineByHash + (paymentHash -> (amount, nodeId))
}
}
val relayedByHash = using(pg.prepareStatement("SELECT * FROM relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
val relayedByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery()
Expand Down Expand Up @@ -308,7 +326,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
using(pg.prepareStatement("SELECT * FROM audit.network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map { rs =>
Expand Down
Loading

0 comments on commit f8feb19

Please sign in to comment.