-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SaaS Crawler Module #5095
base: main
Are you sure you want to change the base?
SaaS Crawler Module #5095
Conversation
…odule for all of the gradle sources Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
full test coverage for base folder, spotless fixes
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
implementation 'com.fasterxml.jackson.core:jackson-core' | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'com.mashape.unirest:unirest-java:1.4.9' | ||
implementation 'com.google.code.gson:gson:2.8.9' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this dependency? Please remove if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed them
data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Outdated
Show resolved
Hide resolved
implementation 'org.projectlombok:lombok:1.18.30' | ||
annotationProcessor 'org.projectlombok:lombok:1.18.30' | ||
|
||
testImplementation platform('org.junit:junit-bom:5.10.0') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need either of these two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed them
enabled = false | ||
} | ||
|
||
repositories { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this block. You don't need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -0,0 +1,7 @@ | |||
package org.opensearch.dataprepper.plugins.source.saas.crawler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package org.opensearch.dataprepper.plugins.source.saas.crawler; | |
package org.opensearch.dataprepper.plugins.source.saas_crawler; |
Let's use this package name in the other files as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Modified the code to use this new package name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the discussion, changed this name to source_crawler
|
||
@Named | ||
public class SaasPluginExecutorServiceProvider { | ||
Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be private static final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed it
executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); | ||
} | ||
|
||
//Constructor for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Javadoc comments here, not //
.
/*
* Constructor for testing
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
converted
*/ | ||
public interface SaasSourceConfig { | ||
|
||
int DEFAULT_NUMBER_OF_WORKERS = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the motivation for having this here? Why would we have a default worker count of 1 for all SaaS connectors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial thinking was that we will take this input from the pipeline yaml configuration itself and let the user define it based on their own enterprise package they have with their service provider. Because, this decides the concurrency (pressure) we create on their service. Default value 1 meaning, that we start with least pressure possible but that also means the total data extraction takes a lot of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For jira case, we are not planning to take this input from the customer. We will start with a reasonable default value suitable for Jira.
settings.gradle
Outdated
@@ -186,3 +186,7 @@ include 'data-prepper-plugins:aws-lambda' | |||
include 'data-prepper-plugin-schema' | |||
include 'data-prepper-plugins:kinesis-source' | |||
include 'data-prepper-plugins:opensearch-api-source' | |||
include 'data-prepper-plugins:saas-source-plugins' | |||
include 'data-prepper-plugins:saas-source-plugins:saas-crawler' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if saas-crawler
is the right name here. What guides the use of SaaS here? It seems more like this is a crawler capability and not necessarily related to a SaaS product per-se.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is under saas-source-plugins
and also not sure if any other source will make use of something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my point is that this is not directly tied to SaaS, which can vary and is not even an entirely clear term. I was suggesting that we rename this to something like crawler-source-plugins
.
Or are these API crawlers? Or REST crawlers?
crawler-source-plugins
rest-crawler-source-plugins
api-crawler-source-plugins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on the discussion, renamed it to source_crawler
… the review input Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> itemInfoIterator = client.listItems(); | ||
log.info("Starting to crawl the source"); | ||
long updatedPollTime = 0; | ||
log.info("Creating Partitions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a log of logs throughout the PR that are info
logs that look like they should be debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted the log level for some and removed a few log statements.
updatedPollTime = Math.max(updatedPollTime, niUpdated); | ||
log.info("updated poll time {}", updatedPollTime); | ||
} | ||
createPartition(itemInfoList, coordinator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing a full list to this method? Can we just create each partition inline or do they all need to be stored first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are passing maxItemsPerPage
number of items to this method. i.e. the page size in our paginated crawling. All the items in this page will go into one partition (or a work item). Like a partition per page.
} else { | ||
// Unable to acquire other partitions. | ||
// Probably we will introduce Global state in the future but for now, we don't expect to reach here. | ||
throw new RuntimeException("Unable to acquire other partitions. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe print out the partitionType here if we get to this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the exception message.
if(leaderPartition != null) { | ||
// Extend the timeout | ||
// will always be a leader until shutdown | ||
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should catch exceptions that can come from this call so your thread doesn't shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapped this statement around try catch now. I don't see that this method is throwing any exception though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't most of the time but it can. This was a bug in Dynamo at one time (#4850)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamo store hit a 5xx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for clarifying 👍
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
this.buffer = buffer; | ||
|
||
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); | ||
log.info("Leader partition creation status: {}", isPartitionCreated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will result in one of these logs whenever a new data prepper instance starts
Leader partition creation status: false
and isn't really helpful. Can be debug
processPartition(partition.get(), buffer, sourceConfig); | ||
|
||
} else { | ||
log.info("No partition available. Going to Sleep for a while "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may also be a little noisy. Maybe a metric tracking this would be better?
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Iterator<ItemInfo> listItems(); | ||
|
||
|
||
void setLastPollTime(long lastPollTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc comments for all API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added 👍
@@ -0,0 +1,81 @@ | |||
package org.opensearch.dataprepper.plugins.source.saas_crawler.base; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you not see a need for common interface for all crawlers? I was thinking there would be an interface with some default implementation and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crawler relay on a source plugin specific iterator implementation and dispatch the work to source plugin specific client implementation. Crawler itself has generic logic for pagination.
} | ||
itemInfoList.add(nextItem); | ||
Map<String, String> metadata = nextItem.getMetadata(); | ||
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the fallback value of 0 or current time?
createPartition(itemInfoList, coordinator); | ||
}while (itemInfoIterator.hasNext()); | ||
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime); | ||
return updatedPollTime != 0 ? updatedPollTime : startTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, looks like it is falling back to current time here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
private final Crawler crawler; | ||
|
||
|
||
@DataPrepperPluginConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think you use @DataPrepperPluginConstructor
for abstract source classes. see ./data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. No use of this annotation here. Removed it.
|
||
@Override | ||
public boolean areAcknowledgementsEnabled() { | ||
return Source.super.areAcknowledgementsEnabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a good idea. This should be left to the derived classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Removed this method. Each source plugin will implement their own version.
* contents itself which can be used to apply regex filtering, change data capture etc. general | ||
* assumption here is that fetching metadata should be faster than fetching entire Item | ||
*/ | ||
Map<String, String> metadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to make it Map<String, Object>
, it is probably unrealistic to expect all of metadata values to be Strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering each source may have different needs, agree to make it more generic. Converted the map as suggested.
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
private boolean initialized = false; | ||
|
||
@JsonProperty("last_poll_time") | ||
private Long lastPollTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a Java Instant
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to introduce a custom deserializer and additional jackson dependencies but Yes, I change the type to Instant
now
settings.gradle
Outdated
@@ -186,3 +186,7 @@ include 'data-prepper-plugins:aws-lambda' | |||
include 'data-prepper-plugin-schema' | |||
include 'data-prepper-plugins:kinesis-source' | |||
include 'data-prepper-plugins:opensearch-api-source' | |||
include 'data-prepper-plugins:saas-source-plugins' | |||
include 'data-prepper-plugins:saas-source-plugins:saas-crawler' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my point is that this is not directly tied to SaaS, which can vary and is not even an entirely clear term. I was suggesting that we rename this to something like crawler-source-plugins
.
Or are these API crawlers? Or REST crawlers?
crawler-source-plugins
rest-crawler-source-plugins
api-crawler-source-plugins
} | ||
createPartition(itemInfoList, coordinator); | ||
}while (itemInfoIterator.hasNext()); | ||
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be reporting a metric for these times? You can use Micrometer's Timer
for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already capturing in the Jira source level. but anyway, added Timer
here as well now.
* log events to keep and which ones to discard. | ||
*/ | ||
@NonNull | ||
Long eventTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use an Instant
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converted
|
||
|
||
@Getter | ||
public abstract class ItemInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a Java interface
. It is more flexible and allows for adapting rather than having to convert.
public interface ItemInfo {
String getItemId();
Map<String, Object> getMetadata();
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converted
@Named | ||
public class SaasPluginExecutorServiceProvider { | ||
private static final Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); | ||
public static final int DEFAULT_THREAD_COUNT = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final int DEFAULT_THREAD_COUNT = 50; | |
private static final int DEFAULT_THREAD_COUNT = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to private
annotationProcessor 'org.projectlombok:lombok:1.18.30' | ||
} | ||
|
||
test { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need these lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -0,0 +1,30 @@ | |||
plugins { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need these lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we need these line when it is a library. I don't see any issue after removing them so removed 👍
pluginType = Source.class, | ||
packagesToScan = {SaasCrawlerApplicationContextMarker.class, JiraSource.class} | ||
) | ||
public class JiraSource implements Source<Record<Event>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to inherit from SaasSourcePlugin
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, yes it will. But I didn't do that to limit the size of this PR.
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'javax.inject:javax.inject:1' | ||
implementation 'org.springframework:spring-web:5.3.39' | ||
implementation 'org.springframework.retry:spring-retry:1.3.4' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this dependency? I don't see it in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like we discussed, I am relaying on this library in the code. I removed in this PR and I will add it back when I add jira source code 👍
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Description
Introducing SaaS Source Plugins module and a base Jira Source plugin class
Issues Resolved
Resolves #4754
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.