diff --git a/lib/sanbase/external_services/coinmarketcap/coinmarketcap.ex b/lib/sanbase/external_services/coinmarketcap/coinmarketcap.ex index 4407402d40..2bc70d8009 100644 --- a/lib/sanbase/external_services/coinmarketcap/coinmarketcap.ex +++ b/lib/sanbase/external_services/coinmarketcap/coinmarketcap.ex @@ -270,22 +270,11 @@ defmodule Sanbase.ExternalServices.Coinmarketcap do Registry.register(Sanbase.Registry, key, :running) Logger.info("Fetch and process prices for #{project.slug}") - case last_price_datetime(project) do - {:ok, datetime} -> - Logger.info( - "[CMC] Latest price datetime for #{Project.describe(project)} - #{datetime}" - ) - - WebApi.fetch_and_store_prices(project, datetime) - - Registry.unregister(Sanbase.Registry, key) - :ok - - _ -> - err_msg = "[CMC] Cannot fetch the last price datetime for project #{project.slug}" - - Logger.warning(err_msg) - {:error, err_msg} + with {:ok, datetime} <- last_price_datetime(project), + :ok <- WebApi.fetch_and_store_prices(project, datetime) do + :ok = Registry.unregister(Sanbase.Registry, key) + else + error -> error end else Logger.info( @@ -327,15 +316,11 @@ defmodule Sanbase.ExternalServices.Coinmarketcap do end defp fetch_total_market_data() do - case last_price_datetime("TOTAL_MARKET") do - {:ok, %DateTime{} = datetime} -> - WebApi.fetch_and_store_prices("TOTAL_MARKET", datetime) - :ok - - _ -> - err_msg = "[CMC] Cannot fetch the last price datetime for TOTAL_MARKET" - Logger.warning(err_msg) - {:error, err_msg} + with {:ok, %DateTime{} = datetime} <- last_price_datetime("TOTAL_MARKET"), + :ok <- WebApi.fetch_and_store_prices("TOTAL_MARKET", datetime) do + :ok + else + error -> error end end diff --git a/lib/sanbase/external_services/coinmarketcap/web_api.ex b/lib/sanbase/external_services/coinmarketcap/web_api.ex index ed1ba5b9a6..fac24fbdbb 100644 --- a/lib/sanbase/external_services/coinmarketcap/web_api.ex +++ b/lib/sanbase/external_services/coinmarketcap/web_api.ex @@ -89,13 +89,13 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do # in case the next fetch succeeds and we store a later progress datetime price_stream(coinmarketcap_integer_id, last_fetched_datetime, DateTime.utc_now()) |> Stream.take(10) - |> Enum.reduce_while(%{}, fn + |> Enum.reduce_while(:ok, fn {:ok, result, interval}, acc -> store_price_points(project, result, interval) {:cont, acc} - _, acc -> - {:halt, acc} + error, _acc -> + {:halt, {:error, "Error in fetch_and_store_prices/2 for project: #{inspect(error)}"}} end) end end @@ -110,13 +110,13 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do # in case the next fetch succeeds and we store a later progress datetime price_stream("TOTAL_MARKET", last_fetched_datetime, DateTime.utc_now()) |> Stream.take(10) - |> Enum.reduce_while(%{}, fn + |> Enum.reduce_while(:ok, fn {:ok, result, interval}, acc -> store_price_points("TOTAL_MARKET", result, interval) {:cont, acc} - _, acc -> - {:halt, acc} + error, _acc -> + {:halt, {:error, "Error in fetch_and_store_prices/2 for TOTAL_MARKET: #{inspect(error)}"}} end) end @@ -170,11 +170,19 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do defp json_to_price_points(json, "TOTAL_MARKET", interval) do with {:ok, decoded} <- Jason.decode(json), %{ - "data" => data, - "status" => %{"error_code" => 0, "error_message" => nil} + "data" => %{"quotes" => quotes}, + "status" => %{"error_code" => "0"} } <- decoded do result = - Enum.map(data, fn {datetime_iso8601, [marketcap_usd, volume_usd]} -> + Enum.map(quotes, fn %{ + "quote" => [ + %{ + "timestamp" => datetime_iso8601, + "totalMarketCap" => marketcap_usd, + "totalVolume24H" => volume_usd + } + ] + } -> %PricePoint{ marketcap_usd: marketcap_usd |> Sanbase.Math.to_integer(), volume_usd: volume_usd |> Sanbase.Math.to_integer(), @@ -217,12 +225,38 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do end end + defp extract_price_points_for_interval( + "TOTAL_MARKET" = total_market, + {from_unix, to_unix} = interval + ) do + "https://api.coinmarketcap.com/data-api/v3/global-metrics/quotes/historical?format=chart&interval=5m&timeEnd=#{to_unix}&timeStart=#{from_unix}" + |> get() + |> case do + {:ok, %Tesla.Env{status: 429} = resp} -> + wait_rate_limit(resp) + extract_price_points_for_interval(total_market, interval) + + {:ok, %Tesla.Env{status: 200, body: body}} -> + json_to_price_points(body, total_market, interval) + + {:ok, %Tesla.Env{status: status}} -> + error_msg = "[CMC] Error fetching data for #{total_market}. Status code: #{status}" + Logger.error(error_msg) + {:error, error_msg} + + {:error, error} -> + error_msg = "[CMC] Error fetching data for #{total_market}. Reason: #{inspect(error)}" + Logger.error(error_msg) + {:error, error_msg} + end + end + defp extract_price_points_for_interval( "TOTAL_MARKET" = total_market, {from_unix, to_unix} = interval ) do Logger.info(""" - [CMC] Extracting price points for TOTAL_MARKET and interval [#{DateTime.from_unix!(from_unix)} - #{DateTime.from_unix!(to_unix)}] + [CMC] Extracting price points for #{total_market} and interval [#{DateTime.from_unix!(from_unix)} - #{DateTime.from_unix!(to_unix)}] """) "/v1.1/global-metrics/quotes/historical?format=chart&interval=5m&time_start=#{from_unix}&time_end=#{to_unix}" @@ -236,12 +270,12 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do json_to_price_points(body, total_market, interval) {:ok, %Tesla.Env{status: status}} -> - error_msg = "[CMC] Error fetching data for TOTAL_MARKET. Status code: #{status}" + error_msg = "[CMC] Error fetching data for #{total_market}. Status code: #{status}" Logger.error(error_msg) {:error, error_msg} {:error, error} -> - error_msg = "[CMC] Error fetching data for TOTAL_MARKET. Reason: #{inspect(error)}" + error_msg = "[CMC] Error fetching data for #{total_market}. Reason: #{inspect(error)}" Logger.error(error_msg) {:error, error_msg} end diff --git a/lib/sanbase/external_services/rate_limiting/middleware.ex b/lib/sanbase/external_services/rate_limiting/middleware.ex index d110e0ef67..c7a52ebfd1 100644 --- a/lib/sanbase/external_services/rate_limiting/middleware.ex +++ b/lib/sanbase/external_services/rate_limiting/middleware.ex @@ -4,7 +4,7 @@ defmodule Sanbase.ExternalServices.RateLimiting.Middleware do alias Sanbase.ExternalServices.RateLimiting.Server def call(env, next, options) do - Server.wait(Keyword.get(options, :name)) + # Server.wait(Keyword.get(options, :name)) Tesla.run(env, next) end diff --git a/test/sanbase/external_services/coinmarketcap/data/total_market_web_api_success.json b/test/sanbase/external_services/coinmarketcap/data/total_market_web_api_success.json index 8e7768f60c..73098ffd52 100644 --- a/test/sanbase/external_services/coinmarketcap/data/total_market_web_api_success.json +++ b/test/sanbase/external_services/coinmarketcap/data/total_market_web_api_success.json @@ -1,28 +1,63 @@ { - "status": { - "timestamp": "2019-12-20T09:29:01.436Z", - "error_code": 0, - "error_message": null, - "elapsed": 9, - "credit_count": 1, - "notice": null - }, "data": { - "2018-01-02T00:00:00.000Z": [ - 615633059840, - 27340439552 - ], - "2018-01-03T00:00:00.000Z": [ - 673426702336, - 43249201152 - ], - "2018-01-04T00:00:00.000Z": [ - 739147579392, - 52711313408 - ], - "2018-01-05T00:00:00.000Z": [ - 755427835904, - 67713847296 + "quotes": [ + { + "timestamp": "2023-08-21T12:35:00.000Z", + "searchInterval": "5", + "btcDominance": 48.0836, + "ethDominance": 19.0837, + "activeCryptocurrencies": 9553, + "activeExchanges": 663, + "activeMarketPairs": 63424, + "quote": [ + { + "name": "USD", + "timestamp": "2023-08-21T12:35:00.000Z", + "totalMarketCap": 1053345319615.14, + "totalVolume24H": 24975771712.74, + "totalVolume24HReported": 88842480163.87, + "altcoinVolume24H": 14141463173.83, + "altcoinVolume24HReported": 53743544558.18, + "altcoinMarketCap": 546859232684.86, + "originId": "1253416" + } + ], + "totalCryptocurrencies": 26575, + "totalExchanges": 6585, + "score": 1692621300000 + }, + { + "timestamp": "2023-08-21T12:40:00.000Z", + "searchInterval": "5", + "btcDominance": 48.0949, + "ethDominance": 19.088, + "activeCryptocurrencies": 9553, + "activeExchanges": 663, + "activeMarketPairs": 63424, + "quote": [ + { + "name": "USD", + "timestamp": "2023-08-21T12:40:00.000Z", + "totalMarketCap": 1053950205502.36, + "totalVolume24H": 24998401100.74, + "totalVolume24HReported": 89033858863.01, + "altcoinVolume24H": 14156727577.48, + "altcoinVolume24HReported": 53805498718.86, + "altcoinMarketCap": 547053824537.45, + "originId": "1253421" + } + ], + "totalCryptocurrencies": 26575, + "totalExchanges": 6587, + "score": 1692621600000 + } ] + }, + "status": { + "timestamp": "2023-08-21T12:54:32.540Z", + "error_code": "0", + "error_message": "SUCCESS", + "elapsed": "30", + "credit_count": 0 } -} \ No newline at end of file +} diff --git a/test/sanbase/external_services/coinmarketcap/web_api_test.exs b/test/sanbase/external_services/coinmarketcap/web_api_test.exs index 5d04c9c1b9..99bf253565 100644 --- a/test/sanbase/external_services/coinmarketcap/web_api_test.exs +++ b/test/sanbase/external_services/coinmarketcap/web_api_test.exs @@ -31,7 +31,7 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do } end) - WebApi.fetch_and_store_prices(context.project, ~U[2023-07-19 00:00:00Z]) + :ok = WebApi.fetch_and_store_prices(context.project, ~U[2023-07-19 00:00:00Z]) prices = Sanbase.InMemoryKafka.Producer.get_state() |> Map.get("asset_prices") filtered_record = @@ -56,7 +56,7 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do from_datetime = ~U[2023-07-19 00:00:00Z] - WebApi.fetch_and_store_prices(context.project, from_datetime) + :ok = WebApi.fetch_and_store_prices(context.project, from_datetime) state = Sanbase.InMemoryKafka.Producer.get_state() prices = state["asset_prices"] assert length(prices) > 0 @@ -100,16 +100,16 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do } end) - WebApi.fetch_and_store_prices("TOTAL_MARKET", ~U[2018-01-01 00:00:00Z]) + :ok = WebApi.fetch_and_store_prices("TOTAL_MARKET", ~U[2018-01-01 00:00:00Z]) state = Sanbase.InMemoryKafka.Producer.get_state() prices = state["asset_prices"] assert length(prices) > 0 - record = - {"coinmarketcap_TOTAL_MARKET_2018-01-03T00:00:00.000Z", - "{\"marketcap_usd\":673426702336,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1514937600,\"volume_usd\":43249201152}"} + assert {"coinmarketcap_TOTAL_MARKET_2023-08-21T12:35:00.000Z", + "{\"marketcap_usd\":1053345319615,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1692621300,\"volume_usd\":24975771713}"} in prices - assert record in prices + assert {"coinmarketcap_TOTAL_MARKET_2023-08-21T12:35:00.000Z", + "{\"marketcap_usd\":1053345319615,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1692621300,\"volume_usd\":24975771713}"} in prices end end