Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SaaS Crawler Module #5095

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

Conversation

san81
Copy link
Contributor

@san81 san81 commented Oct 21, 2024

Description

Introducing SaaS Source Plugins module and a base Jira Source plugin class

Issues Resolved

Resolves #4754

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…odule for all of the gradle sources

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
san81 and others added 8 commits October 21, 2024 12:32
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
full test coverage for base folder, spotless fixes
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 changed the title Saas sources module Saas Crawler Module Oct 22, 2024
@san81 san81 changed the title Saas Crawler Module SaaS Crawler Module Oct 22, 2024
.gitignore Outdated Show resolved Hide resolved
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.mashape.unirest:unirest-java:1.4.9'
implementation 'com.google.code.gson:gson:2.8.9'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this dependency? Please remove if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed them

implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation platform('org.junit:junit-bom:5.10.0')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need either of these two lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed them

enabled = false
}

repositories {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this block. You don't need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.plugins.source.saas.crawler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
package org.opensearch.dataprepper.plugins.source.saas.crawler;
package org.opensearch.dataprepper.plugins.source.saas_crawler;

Let's use this package name in the other files as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Modified the code to use this new package name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the discussion, changed this name to source_crawler


@Named
public class SaasPluginExecutorServiceProvider {
Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private static final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it

executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
}

//Constructor for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Javadoc comments here, not //.

/*
 * Constructor for testing
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converted

*/
public interface SaasSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for having this here? Why would we have a default worker count of 1 for all SaaS connectors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial thinking was that we will take this input from the pipeline yaml configuration itself and let the user define it based on their own enterprise package they have with their service provider. Because, this decides the concurrency (pressure) we create on their service. Default value 1 meaning, that we start with least pressure possible but that also means the total data extraction takes a lot of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For jira case, we are not planning to take this input from the customer. We will start with a reasonable default value suitable for Jira.

settings.gradle Outdated
@@ -186,3 +186,7 @@ include 'data-prepper-plugins:aws-lambda'
include 'data-prepper-plugin-schema'
include 'data-prepper-plugins:kinesis-source'
include 'data-prepper-plugins:opensearch-api-source'
include 'data-prepper-plugins:saas-source-plugins'
include 'data-prepper-plugins:saas-source-plugins:saas-crawler'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if saas-crawler is the right name here. What guides the use of SaaS here? It seems more like this is a crawler capability and not necessarily related to a SaaS product per-se.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is under saas-source-plugins and also not sure if any other source will make use of something like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my point is that this is not directly tied to SaaS, which can vary and is not even an entirely clear term. I was suggesting that we rename this to something like crawler-source-plugins.

Or are these API crawlers? Or REST crawlers?

  • crawler-source-plugins
  • rest-crawler-source-plugins
  • api-crawler-source-plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on the discussion, renamed it to source_crawler

… the review input

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source");
long updatedPollTime = 0;
log.info("Creating Partitions");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a log of logs throughout the PR that are info logs that look like they should be debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the log level for some and removed a few log statements.

updatedPollTime = Math.max(updatedPollTime, niUpdated);
log.info("updated poll time {}", updatedPollTime);
}
createPartition(itemInfoList, coordinator);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we passing a full list to this method? Can we just create each partition inline or do they all need to be stored first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are passing maxItemsPerPage number of items to this method. i.e. the page size in our paginated crawling. All the items in this page will go into one partition (or a work item). Like a partition per page.

} else {
// Unable to acquire other partitions.
// Probably we will introduce Global state in the future but for now, we don't expect to reach here.
throw new RuntimeException("Unable to acquire other partitions. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print out the partitionType here if we get to this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the exception message.

if(leaderPartition != null) {
// Extend the timeout
// will always be a leader until shutdown
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should catch exceptions that can come from this call so your thread doesn't shut down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped this statement around try catch now. I don't see that this method is throwing any exception though!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't most of the time but it can. This was a bug in Dynamo at one time (#4850)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamo store hit a 5xx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarifying 👍

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 requested a review from sb2k16 as a code owner October 22, 2024 22:40
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 requested a review from dlvenable October 23, 2024 00:18
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
graytaylor0
graytaylor0 previously approved these changes Oct 23, 2024
this.buffer = buffer;

boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.info("Leader partition creation status: {}", isPartitionCreated);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result in one of these logs whenever a new data prepper instance starts

Leader partition creation status: false

and isn't really helpful. Can be debug

processPartition(partition.get(), buffer, sourceConfig);

} else {
log.info("No partition available. Going to Sleep for a while ");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may also be a little noisy. Maybe a metric tracking this would be better?

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> listItems();


void setLastPollTime(long lastPollTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc comments for all API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 👍

@@ -0,0 +1,81 @@
package org.opensearch.dataprepper.plugins.source.saas_crawler.base;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you not see a need for common interface for all crawlers? I was thinking there would be an interface with some default implementation and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crawler relay on a source plugin specific iterator implementation and dispatch the work to source plugin specific client implementation. Crawler itself has generic logic for pagination.

}
itemInfoList.add(nextItem);
Map<String, String> metadata = nextItem.getMetadata();
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the fallback value of 0 or current time?

createPartition(itemInfoList, coordinator);
}while (itemInfoIterator.hasNext());
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime);
return updatedPollTime != 0 ? updatedPollTime : startTime;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, looks like it is falling back to current time here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

private final Crawler crawler;


@DataPrepperPluginConstructor
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think you use @DataPrepperPluginConstructor for abstract source classes. see ./data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. No use of this annotation here. Removed it.


@Override
public boolean areAcknowledgementsEnabled() {
return Source.super.areAcknowledgementsEnabled();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good idea. This should be left to the derived classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Removed this method. Each source plugin will implement their own version.

* contents itself which can be used to apply regex filtering, change data capture etc. general
* assumption here is that fetching metadata should be faster than fetching entire Item
*/
Map<String, String> metadata;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to make it Map<String, Object>, it is probably unrealistic to expect all of metadata values to be Strings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering each source may have different needs, agree to make it more generic. Converted the map as suggested.

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
private boolean initialized = false;

@JsonProperty("last_poll_time")
private Long lastPollTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a Java Instant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to introduce a custom deserializer and additional jackson dependencies but Yes, I change the type to Instant now

settings.gradle Outdated
@@ -186,3 +186,7 @@ include 'data-prepper-plugins:aws-lambda'
include 'data-prepper-plugin-schema'
include 'data-prepper-plugins:kinesis-source'
include 'data-prepper-plugins:opensearch-api-source'
include 'data-prepper-plugins:saas-source-plugins'
include 'data-prepper-plugins:saas-source-plugins:saas-crawler'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my point is that this is not directly tied to SaaS, which can vary and is not even an entirely clear term. I was suggesting that we rename this to something like crawler-source-plugins.

Or are these API crawlers? Or REST crawlers?

  • crawler-source-plugins
  • rest-crawler-source-plugins
  • api-crawler-source-plugins

}
createPartition(itemInfoList, coordinator);
}while (itemInfoIterator.hasNext());
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be reporting a metric for these times? You can use Micrometer's Timer for this.

Copy link
Contributor Author

@san81 san81 Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already capturing in the Jira source level. but anyway, added Timer here as well now.

* log events to keep and which ones to discard.
*/
@NonNull
Long eventTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an Instant here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted



@Getter
public abstract class ItemInfo {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a Java interface. It is more flexible and allows for adapting rather than having to convert.

public interface ItemInfo {
  String getItemId();
  Map<String, Object> getMetadata();
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted

@Named
public class SaasPluginExecutorServiceProvider {
private static final Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class);
public static final int DEFAULT_THREAD_COUNT = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final int DEFAULT_THREAD_COUNT = 50;
private static final int DEFAULT_THREAD_COUNT = 50;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to private

annotationProcessor 'org.projectlombok:lombok:1.18.30'
}

test {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need these lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -0,0 +1,30 @@
plugins {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need these lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we need these line when it is a library. I don't see any issue after removing them so removed 👍

pluginType = Source.class,
packagesToScan = {SaasCrawlerApplicationContextMarker.class, JiraSource.class}
)
public class JiraSource implements Source<Record<Event>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to inherit from SaasSourcePlugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, yes it will. But I didn't do that to limit the size of this PR.

implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'javax.inject:javax.inject:1'
implementation 'org.springframework:spring-web:5.3.39'
implementation 'org.springframework.retry:spring-retry:1.3.4'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this dependency? I don't see it in use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed, I am relaying on this library in the code. I removed in this PR and I will add it back when I add jira source code 👍

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Jira Connector - to seamlessly sync all the ticket details to OpenSearch
5 participants