-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorker.cs
68 lines (56 loc) · 2.7 KB
/
Worker.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
using System.Text;
using System.Text.Json;
using Hackernews_Fetcher.Services;
using AutoMapper;
using Hackernews_Fetcher.Models;
using RabbitMQ.Client;
namespace Hackernews_Fetcher;
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IApiConnector _apiConnector;
private readonly IConnectionFactory _connectionFactory;
private readonly IMapper _mapper;
public Worker(ILogger<Worker> logger,
IApiConnector apiConnector,
IConnectionFactory connectionFactory,
IMapper mapper)
{
_logger = logger;
_apiConnector = apiConnector;
_connectionFactory = connectionFactory;
_mapper = mapper;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting fetching...");
var connection = await _connectionFactory.CreateConnectionAsync(stoppingToken);
var channel = await connection.CreateChannelAsync(cancellationToken: stoppingToken);
const string exchangeName = "feed";
const string queueName = "stories";
const string routingKey = "feed.stories";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, cancellationToken: stoppingToken);
await channel.QueueDeclareAsync(queueName, exclusive: false, durable: true, cancellationToken: stoppingToken);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, cancellationToken: stoppingToken);
var jsonSerializerOptions = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
while(!stoppingToken.IsCancellationRequested)
{
await foreach (var storyDto in _apiConnector.GetNewStoriesAsync().WithCancellation(stoppingToken))
{
if (storyDto is null) continue;
var serializedMessageBody = JsonSerializer.Serialize(_mapper.Map<StoryPublishDto>(storyDto), jsonSerializerOptions);
var messageBody = Encoding.UTF8.GetBytes(serializedMessageBody);
var publishProperties = new BasicProperties
{
Persistent = true,
DeliveryMode = DeliveryModes.Persistent
};
await channel.BasicPublishAsync(exchangeName, routingKey, mandatory: true, body: messageBody, basicProperties: publishProperties, cancellationToken: stoppingToken);
}
await Task.Delay(TimeSpan.FromHours(3), stoppingToken);
}
}
}