From 61eabf185e71b7670e5d750048714636f85c5e58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thibaut=20Barr=C3=A8re?= Date: Mon, 18 Mar 2024 14:47:27 +0100 Subject: [PATCH] Travaux de qualification des flux IRVE dynamiques (#3816) * Adapt setup to rely on now-incorporated app resources * Leverage now existing HTTPClient * Remove legacy * Adapt remaining code to fix the script * Add io_ansi_table via hybrid mix setup * Improve reporting * Improve reporting * Add TODOs * Avoid parsing CSV multiple times * Show validity information * Add local frictionless validator * Help me understand * Fix cache code * Disable cache * Add notes on frictionless See: - https://github.com/frictionlessdata/frictionless-py/issues/1646 * Start parsing report * Move to some non-git-enabled folder * Shorten name * Dump frictionless output (not always reliable) --- scripts/irve/dynamic-irve.exs | 215 ++++++++++++++++++++++++++++------ 1 file changed, 177 insertions(+), 38 deletions(-) diff --git a/scripts/irve/dynamic-irve.exs b/scripts/irve/dynamic-irve.exs index 85f75f6ba5..2d5558de55 100644 --- a/scripts/irve/dynamic-irve.exs +++ b/scripts/irve/dynamic-irve.exs @@ -1,10 +1,14 @@ -Mix.install([ - {:req, "~> 0.3.9"}, - {:jason, "~> 1.4"}, - {:csv, "~> 3.0"} -]) - -Code.require_file(__DIR__ <> "/req_custom_cache.exs") +my_app_root = Path.join(__DIR__, "../..") + +# hybrid setup to rely on the whole app setup but increment with a specificy dependency +Mix.install( + [ + {:my_app, path: my_app_root, env: :dev}, + {:io_ansi_table, "~> 1.0"} + ], + config_path: Path.join(my_app_root, "config/config.exs"), + lockfile: Path.join(my_app_root, "mix.lock") +) params = %{ page: 1, @@ -17,13 +21,25 @@ url = "https://www.data.gouv.fr/api/1/datasets/?#{URI.encode_query(params)}" defmodule Query do def cache_dir, do: Path.join(__ENV__.file, "../cache-dir") |> Path.expand() - def cached_get!(url) do - req = Req.new() |> CustomCache.attach() - Req.get!(req, url: url, custom_cache_dir: cache_dir()) + def cached_get!(url, options \\ []) do + options = [ + decode_body: options |> Keyword.get(:decode_body, true), + enable_cache: options |> Keyword.get(:enable_cache, false) + ] + + options = + if options[:enable_cache] do + Keyword.merge(options, custom_cache_dir: cache_dir()) + else + options + end + + Transport.HTTPClient.get!(url, options) end end -%{status: 200, body: datasets} = Query.cached_get!(url) +# disabling cache because one dataset is refreshed very frequently, caching leads to 404 +%{status: 200, body: datasets} = Query.cached_get!(url, enable_cache: false) # ensure there is only one page + grab the data unless is_nil(datasets["next_page"]), do: raise("should not have next page") @@ -36,7 +52,13 @@ resources = dataset["resources"] |> Enum.filter(fn r -> r["schema"]["name"] == "etalab/schema-irve-dynamique" end) |> Enum.map(fn r -> - Map.put(r, "dataset_url", dataset["page"]) + r + |> Map.put("dataset_url", dataset["page"]) + |> Map.put("organization", dataset["organization"]["name"]) + |> Map.put("valid", get_in(r, ["extras", "validation-report:valid_resource"])) + |> Map.put("validation_date", get_in(r, ["extras", "validation-report:validation_date"])) + |> Map.put("schema_name", get_in(r, ["schema", "name"])) + |> Map.put("schema_version", get_in(r, ["schema", "version"])) end) end) |> Enum.reject(fn r -> @@ -45,28 +67,34 @@ resources = r["id"] == "5ef6ddff-2f98-4300-9e6e-1b47ea4ab779" end) -# IO.inspect resources |> Enum.map &(&1["url"]) - defmodule IRVECheck do - def is_dynamic_irve?(url) do - %{status: 200, body: body} = Query.cached_get!(url) + def get_body(url) do + # control the decoding ourselves ; by default Req would decode via CSV itself + %{status: 200, body: body} = Query.cached_get!(url, decode_body: false) + body + end - # quick first decode to get the headers, even if the file has no rows - data = - [body] - |> CSV.decode!(headers: false) - |> Enum.take(1) - |> List.first() + def parse_csv(body) do + [body] + |> CSV.decode!(headers: true) + |> Enum.to_list() + end - "id_pdc_itinerance" in data && "etat_pdc" in data + def get_headers(body) do + [body] + # quick first decode to get the headers, even if the file has no rows + |> CSV.decode!(headers: false) + |> Enum.take(1) + |> List.first() end - def time_window(url) do - %{status: 200, body: body} = Query.cached_get!(url) + def is_dynamic_irve?(headers) do + "id_pdc_itinerance" in headers && "etat_pdc" in headers + end + def time_window(rows) do data = - [body] - |> CSV.decode!(headers: true) + rows |> Enum.map(fn x -> (x["horodatage"] || "???") |> String.slice(0, 10) end) |> Enum.to_list() |> Enum.sort() @@ -75,14 +103,125 @@ defmodule IRVECheck do end end -resources -|> Enum.each(fn r -> - IO.puts("\n" <> r["dataset_url"]) - - IO.puts( - r["url"] <> - " --- " <> - if(IRVECheck.is_dynamic_irve?(r["url"]), do: "OK", else: "KO") <> - " " <> (IRVECheck.time_window(r["url"]) |> inspect) - ) -end) +# very brittle (false positives & false negatives) at the moment, but helped me a bit already +# Waiting for feedback on https://github.com/frictionlessdata/frictionless-py/issues/1646 +defmodule FrictionlessValidator do + @latest_dynamic_irve_schema "https://schema.data.gouv.fr/schemas/etalab/schema-irve-dynamique/latest/schema-dynamique.json" + + def validate(file_url, schema \\ @latest_dynamic_irve_schema) do + cmd = "frictionless" + # NOTE: I tried using `--schema-sync` as an attempt to avoid failure + # when an optional field column's header is missing. + args = ["validate", file_url, "--schema", schema, "--json"] + _debug_cmd = [cmd, args] |> List.flatten() |> Enum.join(" ") + + {output, result} = System.cmd(cmd, args) + + case result do + 0 -> + {:ok, Jason.decode!(output)} + + 1 -> + {:error, Jason.decode!(output)} + end + end + + # quick and dirty parsing + def errors_summary(output) do + output["tasks"] + |> Enum.map(& &1["errors"]) + |> List.flatten() + |> Enum.map(& &1["message"]) + |> Enum.take(5) + end +end + +IO.puts("========== #{resources |> length()} candidates ==========\n\n") + +rows = + resources + |> Enum.map(fn r -> + body = IRVECheck.get_body(r["url"]) + rows = IRVECheck.parse_csv(body) + headers = IRVECheck.get_headers(body) + + {local_valid, validation_result} = FrictionlessValidator.validate(r["url"]) + + File.write!( + "cache-dir/dyn-irve-" <> r["id"], + validation_result |> Jason.encode!() |> Jason.Formatter.pretty_print() + ) + + %{ + dataset_url: r["dataset_url"], + r_id: r["id"], + organization: r["organization"], + resource_url: r["url"], + dyn_irve_likely: IRVECheck.is_dynamic_irve?(headers), + time_window: IRVECheck.time_window(rows), + rows: rows |> length(), + valid: r["valid"], + local_valid: local_valid, + v_date: r["validation_date"], + schema_name: r["schema_name"], + schema_version: r["schema_version"], + local_validation_errors: FrictionlessValidator.errors_summary(validation_result) + } + end) + +IO.inspect(rows, IEx.inspect_opts()) + +IO.ANSI.Table.start( + [ + :organization, + :r_id, + :dyn_irve_likely, + :rows, + # :dataset_url, + :valid, + :local_valid, + :v_date, + :schema_name, + :schema_version + ], + sort_specs: [desc: :rows], + max_width: :infinity +) + +IO.ANSI.Table.format(rows) +IO.ANSI.Table.stop() + +IO.ANSI.Table.start( + [ + :organization, + :dyn_irve_likely, + :rows, + :dataset_url, + :valid + ], + sort_specs: [desc: :rows], + max_width: :infinity +) + +IO.ANSI.Table.format(rows) +IO.ANSI.Table.stop() + +IO.ANSI.Table.start( + [ + :organization, + :rows, + :local_valid, + :local_validation_errors + ], + sort_specs: [desc: :rows], + max_width: :infinity +) + +exploded_rows = + rows + |> Enum.flat_map(fn r -> + r[:local_validation_errors] + |> Enum.map(fn x -> r |> Map.put(:local_validation_errors, x) end) + end) + +IO.ANSI.Table.format(exploded_rows)