Skip to content

Commit

Permalink
Concentrateur IRVE dynamique national (#3839)
Browse files Browse the repository at this point in the history
* Attempt to detect when frictionless did not really validate

See frictionlessdata/frictionless-py#1646

* Improve debugging

* Improve caching (well required)

* Help me when errors arise

* Provide better reports

* Run mix format

* Add extra aggregate config type

* Deserialize aggregate config item

* Wire aggregate type to proxy (with a TODO)

* Implement rendering for backoffice

* Display nothing and not a buggy link

* Add a first test

* Implement core part of fetching

* Run mix format

* Implement parsing & recomposing of ids only

* Fix warning

* Add note

* Implement test on aggregation

* Add TODOs for remaining cases

* Add moduledoc

* Move Telemetry around before refactoring the rest

* Extract aggregate processor to its own module

* Add module_doc

* Extract method

* Rename variable

* Fix typo

* Start passing external options around

* Handle limit_per_source and include_origin

* Create schema-irve-dynamique.json

* Improve query parameters handling

* Add dynamic schema introspection wrapper

* Add note

* Throw to interrupt for now

* Ensure we have exactly all headers - drop sub-stream if not

* Remove test code

* Add WIP quick and dirty checker

* Refactor for testing

* Add placeholder

* Add test fix

* WIP test

* Fix comment

* Allow to customize line separator

* Fix test

* Remove live checking from current PR scope

* Finalize test

* Run mix format

* Fix credo warning

* Fix credo warning

* Simplify code

* Revise my position on TODO

* Scope has been verified

* Expose TTL for main IRVE feed

* Seems to be clean at this point

* Do not enforce optional key (we provide a default value)

* Refactor before adding more scenarios

* Extract method more

* Rework method

* Move todos

* Make sure non-200 responses are dropped out of resulting consolidation

* Add todos

* Add TODO for timeouts (would take down everything)

* Catch & handle sub-feeds errors safely (with logging)

* Handle timeouts (> 5sec) in sub-feeds to avoid global 500

* Remove apparently dead code

* Extract code before reuse

* Fix too loose typing in sub-items

* Add missing line jump

* Adapt code for reworked items

* DRY configuration & fix broken tests

* Add support for sub-key

* Allow function override

* Implement cached sub-source queries

* Allow override of sub-item type

* Add useful notes

* Make sure EnforceTTL do not crash on sub-items

* Remove TODO and make backoffice work

* Use hard-coded alt config for staging

* Use variable

* Add missing documentation

* Make sure anyone can understand this test in 6 months

* Add note

* Add missing assertions

* Extract method "setup_remote_responses"

Refactoring before adding more test-cases, to DRY things out.

* Remove unused code

* Simplify code

* Remove satisfied TODO

* Improve tests

* Add note

* Use with_log & remove already tested stuff

* Prepare remaining work items

* Allow to format numbers with nil variant

* Fix missing test & adapt the others

* Add TODO on bogus code

* Complete TODO

* Fix broken "exception in remote" test case

* Add test-case for regular non-200 case

* Add 302 test-case (important & used)

* Remove implemented TODO

* Finalize main test

* Add a test (timeout case), find a bug, fix the bug

* Remove left-over

* Implement 2 remaining tests

* Remove TODO (now properly tested & fixed)

* Add comments for maintenance

* Reduce code & make rows unique

This could save a lot of debugging later.

* Comply with Credo

* Do not use nil (== :nil so confusing) key

* Review change: stop leaking :resource down the line

This will only lead to confusing uses in the future.

* Add documentation

* Add note for later

* Fix doc

* Remove redundant doc

* Add missing end of sentence

* Add doc

* Simplify code & fix test

* Gimme some typing

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>

* Fix typo

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>

* Run mix format

* Add well-needed parameter type

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>

* Add missing namespace to module

* Add useful typing

* Move aggregate event construction to telemetry

* Simplify the construct

* Rename for consistency

* Refactor: simplify 302-redirect support code

Code review showed that passing function was not a clear way to support the different scenarios here. I remove the need for function passing, by supporting 302 redirect (and only them) directly in the Finch wrapper.

* Fix warning

* Add missing redirect tests for Finch wrapper

* Remove unclear comment

* Add note

* Add TODO

* Add failing test for telemetry bug

* Ensure no other telemetry event is sent

* Fix incorrect count of internal aggregated queries

* Add note

* Clarify un-registering of handlers

Also link to #3975 which can lead the maintainer to a problematic database situation.

* Remove note (this has been completed)

* DRY proxy request types

After verification, `@proxy_requests` is only referenced inside the same file, and only for unsorted guard checks.

* DRY cache separator

* Rollback partially incorrect DRYing

The same character is used, but 97800f3 incorrectly conflates telemetry events (metrics) with cache keys.

* Make sure to cover config parsing with tests

---------

Co-authored-by: Antoine Augusti <antoine.augusti@transport.data.gouv.fr>
  • Loading branch information
thbar and AntoineAugusti authored Jun 6, 2024
1 parent 8f48a47 commit 93e3672
Show file tree
Hide file tree
Showing 23 changed files with 1,250 additions and 89 deletions.
14 changes: 14 additions & 0 deletions apps/shared/lib/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ defmodule Helpers do
res
end

@doc """
Formats numbers, allowing for nil to be passed and formatted specifically
## Examples
iex> Helpers.format_number_maybe_nil(12_345, nil_result: "N/C")
"12 345"
iex> Helpers.format_number_maybe_nil(nil, nil_result: "N/C")
"N/C"
"""
def format_number_maybe_nil(nil, options), do: options |> Keyword.fetch!(:nil_result)
def format_number_maybe_nil(n, options), do: format_number(n, options |> Keyword.delete(:nil_result))

@spec last_updated([DB.Resource.t()]) :: binary()
def last_updated(resources) do
resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ defmodule TransportWeb.Backoffice.ProxyConfigLive do

defp config_module, do: Application.fetch_env!(:unlock, :config_fetcher)

defp get_proxy_configuration(proxy_base_url, stats_days) do
@doc """
Builds a list of maps containing what we need on display on screen, based on configuration
plus a bit of cache state and statistics.
"""
def get_proxy_configuration(proxy_base_url, stats_days) do
# NOTE: if the stats query becomes too costly, we will be able to throttle it every N seconds instead,
# using a simple cache. At the moment, `get_proxy_configuration` is called once per frame, and not
# once per item.
Expand Down Expand Up @@ -80,6 +84,18 @@ defmodule TransportWeb.Backoffice.ProxyConfigLive do
}
end

defp extract_config(proxy_base_url, %Unlock.Config.Item.Aggregate{} = resource) do
%{
unique_slug: resource.identifier,
proxy_url: Transport.Proxy.resource_url(proxy_base_url, resource.identifier),
original_url: nil,
# At time of writing, the global feed is not cached
ttl: "N/A",
# We do not display the internal count for aggregate item at the moment
internal_count_default_value: nil
}
end

defp event_names do
Telemetry.proxy_request_event_names() |> Enum.map(&Telemetry.database_event_name/1)
end
Expand All @@ -92,12 +108,13 @@ defmodule TransportWeb.Backoffice.ProxyConfigLive do
end

defp add_stats(item, stats) do
metrics_target = Unlock.Controller.Telemetry.target_for_identifier(item.unique_slug)
metrics_target = Unlock.Telemetry.target_for_identifier(item.unique_slug)
counts = stats[metrics_target] || %{}

Map.merge(item, %{
stats_external_requests: Map.get(counts, db_filter_for_event(:external), 0),
stats_internal_requests: Map.get(counts, db_filter_for_event(:internal), 0)
stats_internal_requests:
Map.get(counts, db_filter_for_event(:internal), Map.get(item, :internal_count_default_value, 0))
})
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
<td><%= resource.unique_slug %></td>
<% # TODO: use regular link helper %>
<td><a href={resource.proxy_url}>lien</a></td>
<td><a href={resource.original_url}>lien</a></td>
<td><a :if={resource.original_url} href={resource.original_url}>lien</a></td>
<td><%= resource.ttl %></td>
<!-- optional stuff, only available when cache is loaded -->
<td><%= resource[:cache_size] %></td>
<td><%= resource[:cache_status] %></td>
<td><%= resource[:cache_ttl] %></td>
<!-- computed stuff -->
<td><%= Helpers.format_number(resource[:stats_external_requests]) %></td>
<td><%= Helpers.format_number(resource[:stats_internal_requests]) %></td>
<td><%= Helpers.format_number_maybe_nil(resource[:stats_internal_requests], nil_result: "N/C") %></td>
</tr>
<% end %>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule TransportWeb.Backoffice.ProxyConfigLiveTest do

setup :verify_on_exit!

def setup_proxy_config(slug, siri_slug) do
def setup_proxy_config(slug, siri_slug, aggregate_slug) do
config = %{
slug => %Unlock.Config.Item.Generic.HTTP{
identifier: slug,
Expand All @@ -27,6 +27,10 @@ defmodule TransportWeb.Backoffice.ProxyConfigLiveTest do
identifier: siri_slug,
target_url: "http://localhost/some-siri-resource",
requestor_ref: "secret"
},
aggregate_slug => %Unlock.Config.Item.Aggregate{
identifier: aggregate_slug,
feeds: []
}
}

Expand All @@ -53,7 +57,8 @@ defmodule TransportWeb.Backoffice.ProxyConfigLiveTest do
test "disconnected and connected mount refresh stats", %{conn: conn} do
item_id = "gtfs-rt-slug"
siri_item_id = "siri-slug"
setup_proxy_config(item_id, siri_item_id)
aggregate_item_id = "aggregate-slug"
setup_proxy_config(item_id, siri_item_id, aggregate_item_id)

add_events(item_id)

Expand All @@ -63,7 +68,13 @@ defmodule TransportWeb.Backoffice.ProxyConfigLiveTest do
response = html_response(conn, 200)
assert response =~ "Configuration du Proxy"

# NOTE: alphabetical slug order
assert [
%{
"Identifiant" => "aggregate-slug",
"Req ext 7j" => "0",
"Req int 7j" => "N/C"
},
%{
"Identifiant" => "gtfs-rt-slug",
"Req ext 7j" => "2",
Expand All @@ -83,12 +94,13 @@ defmodule TransportWeb.Backoffice.ProxyConfigLiveTest do
send(view.pid, :update_data)

assert [
_aggregate_item,
%{
"Identifiant" => "gtfs-rt-slug",
"Req ext 7j" => "4",
"Req int 7j" => "2"
},
_second_item
_siri_item
] = extract_data_from_html(render(view))
end
end
183 changes: 183 additions & 0 deletions apps/unlock/lib/aggregate_processor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
defmodule Unlock.AggregateProcessor do
@moduledoc """
The aggregate processor is able to consolidate (Dynamic IRVE CSV only for now, but the name has been kept
generic as it could quite be made generic) multiple feeds as one.
"""

require Logger

# We actually look into the schema to build this. This is preliminary work
# to add live validation later here.
@schema_fields Unlock.DynamicIRVESchema.build_schema_fields_list()

@doc """
Given an aggregate item, achieve concurrent querying of all sub-items and consolidate the outputs.
This implementation safely handles technical errors, non-200 responses and timeouts by returning empty
lists so that the consolidated feed is still made available.
Each sub-item result is cached in its own `Cachex` key.
The consolidated feed isn't cached, on purpose at the moment, because it allows a bit of dynamic behaviour
which is helpful (we will still be able to cache the global feed later, but if we do so we will want to
make sure the overall TTL does not increase too much).
NOTE: special care will be needed as we add more feeds, if the risk of timeout increases: the total
computation delay, in case of timeouts, will increase accordingly, as the global consolidation has to
wait for each timed-out item to reach its timeout (5 seconds currently).
"""
def process_resource(%Unlock.Config.Item.Aggregate{} = item, options \\ []) do
options =
Keyword.validate!(options, [
:limit_per_source,
:include_origin
])

headers = @schema_fields
headers = if options[:include_origin], do: headers ++ ["origin"], else: headers

rows_stream =
item.feeds
|> Task.async_stream(
&process_sub_item(item, &1, options),
max_concurrency: 10,
# this is the default, but highlighted for maintenance clarity
ordered: true,
# allow override from tests, default to 5 seconds which is `async_stream` default
timeout: Process.get(:override_aggregate_processor_async_timeout, 5_000),
# only kill the relevant sub-task, not the whole processing
on_timeout: :kill_task,
# make sure to pass the sub-item to the exit (used for logging)
zip_input_on_exit: true
)
|> Stream.map(fn
{:ok, stream} ->
stream

{:exit, {sub_item, :timeout}} ->
Logger.warning("Timeout for origin #{sub_item.identifier}, response has been dropped")
[]
end)
|> Stream.concat()

[headers]
|> Stream.concat(rows_stream)
|> Enum.into([])
|> NimbleCSV.RFC4180.dump_to_iodata()
end

@doc """
Probably one of the most complicated parts of the proxy.
A computation function is built to query the data via HTTP, but only if `Cachex`
asks for it (based on expiry dates & TTL registered with the caching key).
`Cachex` ensures uniqueness of concurrent calls & RAM storage, returning tuples to
hint us what happened.
This code needs to be DRYed ultimately (see `Controller.fetch_remote`), and simplified
via an extraction of the caching logic in a specific place.
"""
def cached_fetch(
%Unlock.Config.Item.Aggregate{} = item,
%Unlock.Config.Item.Generic.HTTP{identifier: origin} = sub_item
) do
comp_fn = fn _key ->
Unlock.Telemetry.trace_request([item.identifier, origin], :internal)
Unlock.CachedFetch.fetch_data(sub_item, max_redirects: 2)
end

cache_name = Unlock.Shared.cache_name()
cache_key = Unlock.Shared.cache_key(item.identifier, origin)
outcome = Cachex.fetch(cache_name, cache_key, comp_fn)

case outcome do
{:ok, result} ->
Logger.info("Proxy response for #{item.identifier}:#{origin} served from cache")
result

{:commit, result, _options} ->
result

{:ignore, result} ->
# Too large - not expected to occur in nominal circumstances
Logger.info("Cache has been skipped for proxy response")
result

{:error, _error} ->
# NOTE: we'll want to have some monitoring here, but not using Sentry
# because in case of troubles, we will blow up our quota.
Logger.error("Error while fetching key #{cache_key}")
# Bad gateway - which will be processed upstream
%Unlock.HTTP.Response{status: 502, body: "", headers: []}
end
end

@doc """
Process a sub-item (sub-feed of the aggregate item), "safely" returning an empty list
should any error occur.
"""
def process_sub_item(
%Unlock.Config.Item.Aggregate{} = item,
%Unlock.Config.Item.Generic.HTTP{identifier: origin} = sub_item,
options
) do
Logger.debug("Fetching aggregated sub-item #{origin} at #{sub_item.target_url}")

%{status: status, body: body} =
cached_fetch(item, %Unlock.Config.Item.Generic.HTTP{identifier: origin} = sub_item)

Logger.debug("#{origin} responded with HTTP code #{status} (#{body |> byte_size} bytes)")

if status == 200 do
# NOTE: at this point of deployment, having a log in case of error will be good enough.
# We can later expose to the public with an alternate sub-url for observability.
try do
process_csv_payload(body, origin, options)
catch
{:non_matching_headers, headers} ->
Logger.info("Broken stream for origin #{origin} (headers are #{headers |> inspect})")
[]
end
else
Logger.info("Non-200 response for origin #{origin} (status=#{status}), response has been dropped")
[]
end
rescue
# Since the code is ran via `Task.async_stream`, we wrap it with a rescue block, otherwise
# the whole consolidated response will stop and a 500 will be generated
e ->
Logger.warning("Error occurred during processing origin #{origin} (#{e |> inspect}), response has been dropped")
[]
end

# NOTE: we could avoid "decoding" the payload, but doing so will allow us
# to integrate live validation (e.g. of id_pdc_itinerance against static database)
# more easily, with less refactoring.
def process_csv_payload(body, origin, options \\ []) do
# NOTE: currently fully in RAM - an improvement point for later
[headers | rows] = NimbleCSV.RFC4180.parse_string(body, skip_headers: false)

# SEE: https://specs.frictionlessdata.io/table-schema/#descriptor
# The order of elements in fields array SHOULD be the order of fields in the CSV file.
# The number of elements in fields array SHOULD be the same as the number of fields in the CSV file.

# once we assert that, the rest of the processing is easy
unless headers == @schema_fields do
throw({:non_matching_headers, headers})
end

# Only keeping the id for now, on purpose
rows = if options[:limit_per_source], do: Stream.take(rows, options[:limit_per_source]), else: rows

mapper =
if options[:include_origin] do
fn columns -> columns ++ [origin] end
else
fn columns -> columns end
end

rows
|> Stream.map(mapper)
end
end
25 changes: 25 additions & 0 deletions apps/unlock/lib/cached_fetch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Unlock.CachedFetch do
@moduledoc """
A place centralizing Cachex-compatible HTTP calls.
"""

require Logger

# We put a hard limit on what can be cached, and otherwise will just
# send back without caching. This means the remote server is less protected
# temporarily, but also that we do not blow up our whole architecture due to
# RAM consumption
@max_allowed_cached_byte_size 20 * 1024 * 1024

def fetch_data(%Unlock.Config.Item.Generic.HTTP{} = item, http_client_options \\ []) do
response = Unlock.HTTP.Client.impl().get!(item.target_url, item.request_headers, http_client_options)
size = byte_size(response.body)

if size > @max_allowed_cached_byte_size do
Logger.warning("Payload is too large (#{size} bytes > #{@max_allowed_cached_byte_size}). Skipping cache.")
{:ignore, response}
else
{:commit, response, ttl: :timer.seconds(item.ttl)}
end
end
end
Loading

0 comments on commit 93e3672

Please sign in to comment.