Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

druid-sink.properties inside config package is not working #161

Closed
mhshimul opened this issue Apr 14, 2017 · 2 comments
Closed

druid-sink.properties inside config package is not working #161

mhshimul opened this issue Apr 14, 2017 · 2 comments

Comments

@mhshimul
Copy link

mhshimul commented Apr 14, 2017

I believe something has changed inside code for druid-sink.properties as I can see there is a new property connect.druid.sink.kcql inside DruidSinkConfig file. Need a working properties file sample.

@andrewstevenson
Copy link
Contributor

@mhshimul Try this, note that this sink hasn't been properly tested so let us know how you get on, contributions also welcome!

connector.class=com.datamountaineer.streamreactor.connect.druid.DruidSinkConnector
connect.druid.sink.config.file=my_druid_config_file
connect.druid.sink.kcql=INSERT INTO wikipedia SELECT page, robot AS bot, country FROM test```

@mhshimul
Copy link
Author

Now after setting this property, getting following exception

[2017-04-15 01:36:25,661] ERROR Task druid-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
java.lang.ExceptionInInitializerError
	at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:107)
	at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala)
	at com.datamountaineer.streamreactor.connect.druid.writer.DruidDbWriter$$anonfun$1.apply(DruidDbWriter.scala:44)
	at com.datamountaineer.streamreactor.connect.druid.writer.DruidDbWriter$$anonfun$1.apply(DruidDbWriter.scala:42)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.datamountaineer.streamreactor.connect.druid.writer.DruidDbWriter.<init>(DruidDbWriter.scala:41)
	at com.datamountaineer.streamreactor.connect.druid.DruidSinkTask.start(DruidSinkTask.scala:65)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.8.5
	at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
	at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
	at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:745)
	at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:70)
	at com.metamx.common.scala.Jackson$$anonfun$newObjectMapper$1.apply(Jackson.scala:68)
	at com.metamx.common.scala.Predef$EffectOps.withEffect(Predef.scala:44)
	at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:67)
	at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)
	at com.metamx.common.scala.Jackson$class.newObjectMapper(Jackson.scala:64)
	at com.metamx.common.scala.Jackson$.newObjectMapper(Jackson.scala:10)
	at com.metamx.common.scala.Jackson$class.$init$(Jackson.scala:14)
	at com.metamx.common.scala.Jackson$.<init>(Jackson.scala:10)
	at com.metamx.common.scala.Jackson$.<clinit>(Jackson.scala)
	... 20 more

lanbotdeployer pushed a commit that referenced this issue Nov 29, 2024
After processing files from the S3/GCP Storage source, this enables the feature of deleting or moving the files after they've been committed.

# New KCQL Configuration Options for Datalake Cloud Connectors

The following configuration options introduce post-processing capabilities for the AWS S3, GCP Storage, and (coming soon) Azure Datalake Gen 2 **source connectors**. These options allow the connector to manage source files after they are successfully processed, either by deleting the file or moving it to a new location in cloud storage.

In Kafka Connect, post-processing is triggered when the framework calls the `commitRecord` method after a source record is successfully processed. The configured action then determines how the source file is handled.

If no `post.process.action` is configured, **no post-processing will occur**, and the file will remain in its original location.

---

## KCQL Configuration Options

### 1. `post.process.action`
- **Description**: Defines the action to perform on a file after it has been processed.
- **Options**:
  - `DELETE` – Removes the file after processing.
  - `MOVE` – Relocates the file to a new location after processing.

### 2. `post.process.action.bucket`
- **Description**: Specifies the target bucket for files when using the `MOVE` action.
- **Applicability**: Only applies to the `MOVE` action.
- **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`.

### 3. `post.process.action.prefix`
- **Description**: Specifies a new prefix to replace the existing one for the file’s location when using the `MOVE` action. The file's path will remain unchanged except for the prefix.
- **Applicability**: Only applies to the `MOVE` action.
- **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`.

---

## Key Use Cases
- **DELETE**: Automatically removes source files to free up storage space and prevent redundant data from remaining in the bucket.
- **MOVE**: Organizes processed source files by relocating them to a different bucket or prefix, which is useful for archiving, categorizing, or preparing files for other workflows.

---

## Examples

### Example 1: Deleting Files After Processing
To configure the source connector to delete files after processing, use the following KCQL:

```kcql
INSERT INTO `my-bucket`
SELECT * FROM `my-topic`
PROPERTIES (
    'post.process.action'=`DELETE`
)
```


### Example 2: Moving Files After Processing
To configure the source connector to move files to a different bucket and prefix, use the following KCQL:

```kcql
INSERT INTO `my-bucket:archive/`
SELECT * FROM `my-topic`
PROPERTIES (
    'post.process.action'=`MOVE`,
    'post.process.action.bucket'=`archive-bucket`,
    'post.process.action.prefix'=`archive/`
)
```

In this example:
* The file is moved to `archive-bucket`.
* The prefix `archive/` is applied to the file’s path while keeping the rest of the path unchanged.

## Important Considerations

* Both `post.process.action.bucket` and `post.process.action.prefix` are mandatory when using the `MOVE` action.
* For the `DELETE` action, no additional configuration is required.
* If no `post.process.action` is configured, no post-processing will be applied, and the file will remain in its original location.




* * Configuration for Burn-After-Reading
* Implementing actions and storage interfaces.  Needs add tests.  The file move logic needs testing where it resolves the path - is this even the best configuration?
* Storage interface tests

* Address comment from review referencing this page on moving items in GCP: https://cloud.google.com/storage/docs/samples/storage-move-file

* * Adding temporary logging, fixing a bug with the Map equality not enabling prefixes to map to each other
* Fix Move action
* Fix prefix replace behaviour

* Changes to ensure error handling approach is correct

* Review fixes - remove S3 references

* Avoid variable shadowing

* Avoid variable shadowing

* add documentation

* CopyObjectResponse
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants