Skip to content

Commit

Permalink
feat: added a prometheus metrics exporter (#51)
Browse files Browse the repository at this point in the history
- Exposes metrics on `:9568/metrics`
- Defines basic VM metrics and basic Postgres/Vaxine replication stream events.
  • Loading branch information
icehaunter authored Nov 28, 2022
1 parent e1c0e2e commit 67d0248
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 7 deletions.
33 changes: 33 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/.git/

# The directory Mix will write compiled artifacts to.
/_build/
/.elixir_ls/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
electric-*.tar

# Temporary files, for example, from tests.
/tmp/

*offset_storage*.dat

/integration_tests/
1 change: 1 addition & 0 deletions lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Electric.Application do
auth_provider = Electric.Satellite.Auth.provider()

children = [
Electric.Telemetry,
Electric.Postgres.SchemaRegistry,
Electric.Replication.OffsetStorage,
{Plug.Cowboy, scheme: :http, plug: Electric.Plug.Router, options: [port: status_port()]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
use GenStage
require Logger

alias Electric.Telemetry.Metrics

alias Electric.Postgres.LogicalReplication
alias Electric.Postgres.LogicalReplication.Messages
alias Electric.Replication.Postgres.Client
Expand Down Expand Up @@ -134,6 +136,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end

defp process_message(%Insert{} = msg, %State{} = state) do
Metrics.pg_producer_received(state.origin, :insert)

relation = Map.get(state.relations, msg.relation_id)

data = data_tuple_to_map(relation.columns, msg.tuple_data)
Expand All @@ -152,6 +156,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end

defp process_message(%Update{} = msg, %State{} = state) do
Metrics.pg_producer_received(state.origin, :update)

relation = Map.get(state.relations, msg.relation_id)

old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data)
Expand All @@ -175,6 +181,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end

defp process_message(%Delete{} = msg, %State{} = state) do
Metrics.pg_producer_received(state.origin, :delete)

relation = Map.get(state.relations, msg.relation_id)

data =
Expand Down
38 changes: 31 additions & 7 deletions lib/electric/replication/postgres/slot_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
use GenStage

require Logger
alias Electric.Telemetry.Metrics
alias Electric.Postgres.Lsn
alias Electric.Postgres.LogicalReplication.Messages, as: ReplicationMessages
alias Electric.Postgres.Messaging
Expand All @@ -24,6 +25,7 @@ defmodule Electric.Replication.Postgres.SlotServer do

defmodule State do
defstruct current_lsn: %Lsn{segment: 0, offset: 1},
origin: nil,
send_fn: nil,
slot_name: nil,
publication: nil,
Expand Down Expand Up @@ -131,6 +133,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
{:consumer,
%State{
slot_name: slot,
origin: origin,
producer_name: producer,
producer: args.downstream.producer,
opts: Map.get(args.replication, :opts, [])
Expand Down Expand Up @@ -165,7 +168,11 @@ defmodule Electric.Replication.Postgres.SlotServer do
end

@impl true
def handle_call({:start_replication, send_fn, publication, start_lsn}, {from, _}, state) do
def handle_call(
{:start_replication, send_fn, publication, start_lsn},
{from, _},
%State{} = state
) do
ref = Process.monitor(from)

timer =
Expand All @@ -174,6 +181,8 @@ defmodule Electric.Replication.Postgres.SlotServer do

Logger.info("Starting replication to #{state.slot_name}")

Metrics.pg_slot_replication_event(state.origin, %{start: 1})

# FIXME: handle_continue should be supported on gen_stage
send(self(), {:start_from_lsn, start_lsn})

Expand All @@ -183,6 +192,8 @@ defmodule Electric.Replication.Postgres.SlotServer do

@impl true
def handle_call({:stop_replication}, _, state) do
Metrics.pg_slot_replication_event(state.origin, %{stop: 1})

{:reply, :ok, [], clear_replication(state)}
end

Expand Down Expand Up @@ -236,15 +247,17 @@ defmodule Electric.Replication.Postgres.SlotServer do
end

@impl true
def handle_events(events, _from, state) when replication_started?(state) do
def handle_events(events, _from, %State{origin: origin} = state)
when replication_started?(state) do
state =
Enum.reduce(events, state, fn {transaction, vx_offset}, state ->
Logger.debug(
"Will send #{length(transaction.changes)} to subscriber: #{inspect(transaction.changes, pretty: true)}"
)

{wal_messages, relations, new_lsn} = convert_to_wal(transaction, state)
send_all(Enum.reverse(wal_messages), state.send_fn)
send_all(wal_messages, state.send_fn, origin)

%{state | current_lsn: new_lsn, sent_relations: relations, current_vx_offset: vx_offset}
end)

Expand Down Expand Up @@ -285,13 +298,14 @@ defmodule Electric.Replication.Postgres.SlotServer do
}
end

defp send_all(messages, send_fn) when is_function(send_fn, 1) do
reversed_messages = Enum.reverse(messages)
{first_lsn, _} = List.first(messages)
defp send_all(reversed_messages, send_fn, origin) when is_function(send_fn, 1) do
{{first_lsn, _}, len} = list_last_and_length(reversed_messages)
{last_lsn, _} = List.first(reversed_messages)

Metrics.pg_slot_replication_event(origin, %{sent_total: len})

Logger.debug(
"Sending #{length(messages)} messages to the subscriber: from #{inspect(first_lsn)} to #{inspect(last_lsn)}"
"Sending #{len} messages to the subscriber: from #{inspect(first_lsn)} to #{inspect(last_lsn)}"
)

reversed_messages
Expand Down Expand Up @@ -438,4 +452,14 @@ defmodule Electric.Replication.Postgres.SlotServer do
|> Enum.map(&Map.fetch!(record, &1.name))
|> List.to_tuple()
end

# Get last element from the list and the list's length in one pass
# If list is empty, default is returned
@spec list_last_and_length(list(), any(), non_neg_integer()) :: {any(), non_neg_integer()}
defp list_last_and_length(list, default \\ nil, length_acc \\ 0)
defp list_last_and_length([], default, 0), do: {default, 0}
defp list_last_and_length([elem | []], _, length), do: {elem, length + 1}

defp list_last_and_length([_ | list], default, length),
do: list_last_and_length(list, default, length + 1)
end
1 change: 1 addition & 0 deletions lib/electric/replication/postgres/tcp_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ defmodule Electric.Replication.Postgres.TcpServer do
def handle_continue(:establish_connection, state) do
with {:ok, <<length::32>>} <- state.transport.recv(state.socket, 4, 100),
{:ok, data} <- state.transport.recv(state.socket, length - 4, 100) do
:telemetry.execute([:electric, :postgres_tcp_server, :connection], %{total: 1})
establish_connection(data, state)
else
{:error, :timeout} ->
Expand Down
3 changes: 3 additions & 0 deletions lib/electric/replication/vaxine/log_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Electric.Replication.Vaxine.LogConsumer do

alias Electric.Replication.Changes.Transaction
alias Electric.Replication.Vaxine
alias Electric.Telemetry.Metrics

require Logger
require Electric.Retry
Expand Down Expand Up @@ -121,9 +122,11 @@ defmodule Electric.Replication.Vaxine.LogConsumer do
:ok ->
# FIXME: Persist LSN from PG to Vaxine
:ok = ack_fn.()
Metrics.vaxine_consumer_replication_event(tx.origin, %{saved: 1})
{:halt, :ok}

{_change, error} ->
Metrics.vaxine_consumer_replication_event(tx.origin, %{failed_to_write: 1})
Logger.warning("Failure to write change into vaxine #{error}")
{:cont, tx}
end
Expand Down
5 changes: 5 additions & 0 deletions lib/electric/satellite/satellite_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Satellite.Protocol do
alias Electric.Replication.OffsetStorage
alias Electric.Satellite.Replication
alias Electric.Satellite.ClientManager
alias Electric.Telemetry.Metrics

@type lsn() :: non_neg_integer
@producer_timeout 5_000
Expand Down Expand Up @@ -149,6 +150,7 @@ defmodule Electric.Satellite.Protocol do
:ok <- ClientManager.register_client(client_id, reg_name) do
Logger.metadata(client_id: client_id, user_id: auth.user_id)
Logger.info("authenticated client #{client_id} as user #{auth.user_id}")
Metrics.satellite_connection_event(%{authorized_connection: 1})

{%SatAuthResp{id: Electric.regional_id()},
%State{state | auth: auth, auth_passed: true, client_id: client_id}}
Expand Down Expand Up @@ -213,6 +215,8 @@ defmodule Electric.Satellite.Protocol do
"Recieved start replication request lsn: #{inspect(client_lsn)} with options: #{inspect(opts)}"
)

Metrics.satellite_replication_event(%{started: 1})

out_rep = initiate_subscription(state.client_id, lsn, out_rep)
{[%SatInStartReplicationResp{}], %State{state | out_rep: out_rep}}
else
Expand Down Expand Up @@ -240,6 +244,7 @@ defmodule Electric.Satellite.Protocol do
def process_message(%SatInStopReplicationReq{} = _msg, %State{out_rep: out_rep} = state)
when out_rep?(state) do
Logger.debug("Received stop replication request")
Metrics.satellite_replication_event(%{stopped: 1})
# FIXME: We do not know whether the client intend to start from last LSN, or
# optional lsn, so we should just restart producer if the client would
# request different LSN than we are about to send.
Expand Down
47 changes: 47 additions & 0 deletions lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Electric.Telemetry do
use Supervisor
alias Telemetry.Metrics

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def init(_) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 2_000},
{TelemetryMetricsPrometheus, [metrics: metrics()]}
]

Supervisor.init(children, strategy: :one_for_one)
end

defp metrics(),
do: [
Metrics.counter("electric.postgres_tcp_server.connection.total"),
Metrics.counter("electric.postgres_slot.replication.start"),
Metrics.counter("electric.postgres_slot.replication.stop"),
Metrics.sum("electric.postgres_slot.replication.sent_count"),
Metrics.counter("electric.postgres_logical.received.total"),
Metrics.counter("electric.vaxine_consumer.replication.saved"),
Metrics.counter("electric.vaxine_consumer.replication.failed_to_write"),
Metrics.counter("electric.satellite.connection.authorized_connection"),
Metrics.counter("electric.satellite.connection.authorized_connection"),
Metrics.counter("electric.satellite.replication.started"),
Metrics.counter("electric.satellite.replication.stopped"),
Metrics.last_value("vm.memory.total", unit: :byte),
Metrics.last_value("vm.total_run_queue_lengths.total"),
Metrics.last_value("vm.total_run_queue_lengths.cpu"),
Metrics.last_value("vm.total_run_queue_lengths.io"),
Metrics.last_value("vm.system_counts.process_count"),
Metrics.last_value("vm.system_counts.atom_count"),
Metrics.last_value("vm.system_counts.port_count")
]

defp periodic_measurements do
[
# A module, function and arguments to be invoked periodically.
# This function must call :telemetry.execute/3 and a metric must be added above.
# {TestAuthAppWeb, :count_users, []}
]
end
end
24 changes: 24 additions & 0 deletions lib/electric/telemetry/metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Electric.Telemetry.Metrics do
def pg_producer_received(origin, type) when type in [:insert, :update, :delete] do
:telemetry.execute([:electric, :postgres_logical, :received], %{total: 1}, %{
type: type,
origin: origin
})
end

def pg_slot_replication_event(origin, data) when is_map(data) do
:telemetry.execute([:electric, :postgres_slot, :replication], data, %{origin: origin})
end

def vaxine_consumer_replication_event(origin, data) when is_map(data) do
:telemetry.execute([:electric, :vaxine_consumer, :replication], data, %{origin: origin})
end

def satellite_connection_event(data) when is_map(data) do
:telemetry.execute([:electric, :satellite, :connection], data)
end

def satellite_replication_event(data) when is_map(data) do
:telemetry.execute([:electric, :satellite, :replication], data)
end
end
4 changes: 4 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ defmodule Electric.MixProject do
{:gun, "~> 2.0.0-rc.2"},
{:cowboy, "~> 2.9.0"},
{:gen_stage, "~> 1.1.2"},
{:telemetry, "~> 1.1", override: true},
{:telemetry_poller, "~> 1.0"},
{:telemetry_metrics, "~> 0.6"},
{:telemetry_metrics_prometheus, "~> 1.1.0"},
{:satellite_proto,
git: "https://github.com/electric-sql/typescript-client.git",
runtime: false,
Expand Down
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
"satellite_proto": {:git, "https://github.com/electric-sql/typescript-client.git", "8eb20c640142533babc7972ca20d8d6e0bb3ee5d", [ref: "8eb20c640142533babc7972ca20d8d6e0bb3ee5d", sparse: "proto"]},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"},
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"},
"telemetry_metrics_prometheus": {:hex, :telemetry_metrics_prometheus, "1.1.0", "1cc23e932c1ef9aa3b91db257ead31ea58d53229d407e059b29bb962c1505a13", [:mix], [{:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}], "hexpm", "d43b3659b3244da44fe0275b717701542365d4519b79d9ce895b9719c1ce4d26"},
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"},
"telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"vax": {:git, "https://github.com/electric-sql/vaxine.git", "0c9233fe9189c0c430b382a7f3dec6379569d8bc", [sparse: "apps/vax"]},
"vx_client": {:git, "https://github.com/electric-sql/vaxine.git", "0c9233fe9189c0c430b382a7f3dec6379569d8bc", [sparse: "apps/vx_client"]},
Expand Down

0 comments on commit 67d0248

Please sign in to comment.