-
-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handshake issue when getting replayid from async call to database #36
Comments
@kdcllc please have a look. I am trying to resolve this issue for the last 2 days. Any help will be much appreciated. Thank you so much for your response @kdcllc . I have updated the code in the issue and sharing here as well for your reference. var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
if (authResponse.AccessToken is not null)
{
var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
//DB call
var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);
var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
var readTimeOut = 120 * 1000;
var transportOptions = new Dictionary<string,object>(StringComparer.OrdinalIgnoreCase)
{
{ClientTransport.TIMEOUT_OPTION, readTimeOut},
{ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
};
var headers = new NameValueCollection{
{HttpRequestHeader.Authorization.ToString(),
$"Bearer {authResponse.AccessToken}"}};
var transport = new LongPollingTransport(transportOptions, headers);
var transports = new ClientTransport[]
{
transport
};
// Create a CometD client instance
var bayeuxClient = new BayeuxClient(endpointUrl, transports);
bayeuxClient.Handshake();
bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);
bayeuxClient.GetChannel(channel,lastUsedReplayId).Subscribe(new SalesForceMessageListener());
log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
} // CosmosHelperRepository Code Called from above function
public async Task<long> GetSaleForceLastUsedReplayId(string channel)
{
try
{
var dbExistingRecordList = await _cosmosHelper.GetQuery(x=> x.Pk == nameof(SalesForceReplayIdRunDetail) && x.ChannelName == channel);
var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
return lastUsedReplayId;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
} // Implementation of Cosmos helper GetQuery method called in above
// repository function
public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
{
var container = GetContainer();
var query = container.GetItemLinqQueryable<T>()
.Where(predicate);
var list = new List<T>();
using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());
return list;
} Just an FYI, I am able to get the replayId from cosmos db using the |
Hi @kdcllc, I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid. Code used in startup to resolve the dependency: builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>()))); Cosmos Repository Constructor: public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
{
_cosmosHelper = cosmosHelper;
} Cosmos Helper Constructor: public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
: base(cosmosClient, containerName, serializer, logger)
{
_cosmosClient = cosmosClient;
_databaseName = databaseName;
_containerName = containerName;
_feedIteratorProvider = feedIteratorProvider;
} |
@kdcllc @martin-podlubny @r-matias @jesbacon, |
@kathariyasunny16 What operating system are you using? |
@kdcllc |
Hello @kdcllc @martin-podlubny @r-matias , |
Hi @kdcllc @martin-podlubny @r-matias @jesbacon , |
@kathariyasunny16 when I use a lastReplayId or a -2 I get none of the missed responses. Is it working for you? What does your SalesForceMessageListener class look like? |
I am trying to get the replayid from the cosmos database before creating a Bayeux client object. I am not able to handshake when after bayeuxclient.Handshake(). The same code is working fine when I am not using the cosmos db call. Please find below code snippet.
Just an FYI, I am able to get the replayId from cosmos db using the above code but the handshake is not happening.
The text was updated successfully, but these errors were encountered: