From ed0cdf8da1754056bbf24b0b4e54752579b65ce8 Mon Sep 17 00:00:00 2001 From: Chris Parker Date: Tue, 22 Sep 2020 01:10:38 -0500 Subject: [PATCH] Updated to 1.12.0 --- .idea/jarRepositories.xml | 35 +++ README.md | 68 +++-- docs/.gitignore | 62 ----- docs/.gitkeep | 0 docs/Gemfile | 28 -- docs/Gemfile.lock | 253 ------------------ docs/_config.yml | 36 --- docs/index.md | 126 --------- nifi-sqllookup-services-api-nar/pom.xml | 2 +- nifi-sqllookup-services-api/pom.xml | 4 +- nifi-sqllookup-services-nar/pom.xml | 2 +- nifi-sqllookup-services/pom.xml | 9 +- .../sqllookup/AbstractSQLLookupService.java | 102 +++---- .../nifi/sqllookup/SQLLookupService.java | 22 +- .../SQLNamedParameterJdbcTemplate.java | 1 + .../sqllookup/SQLRecordLookupService.java | 50 ++-- .../nifi/sqllookup/SQLResultSetRecordSet.java | 79 +++++- .../nifi/sqllookup/cache/Cache2kAdapter.java | 5 +- .../nifi/sqllookup/cache/CacheAdapter.java | 15 +- .../nifi/sqllookup/cache/CaffeineAdapter.java | 2 +- .../nifi/sqllookup/cache/GuavaAdapter.java | 46 ++++ .../AbstractSQLLookupServiceTest.java | 18 +- .../nifi/sqllookup/TestProcessor.java | 11 +- .../nifi/sqllookup/TestSQLLookupService.java | 7 +- .../TestSQLLookupServiceInQuery.java | 4 +- .../TestSQLLookupServiceWithCache.java | 6 +- .../TestSQLLookupServiceWithCache2k.java | 9 +- .../TestSQLLookupServiceWithGuava.java | 135 ++++++++++ .../sqllookup/TestSQLRecordLookupService.java | 36 +-- .../TestSQLRecordLookupServiceWithCache.java | 8 +- ...TestSQLRecordLookupServiceWithCache2k.java | 41 +-- .../TestSQLRecordLookupServiceWithGuava.java | 127 +++++++++ .../TestSQLRecordLookupServiceWithSchema.java | 17 +- pom.xml | 21 +- 34 files changed, 683 insertions(+), 704 deletions(-) create mode 100644 .idea/jarRepositories.xml delete mode 100644 docs/.gitignore delete mode 100644 docs/.gitkeep delete mode 100644 docs/Gemfile delete mode 100644 docs/Gemfile.lock delete mode 100644 docs/_config.yml delete mode 100644 docs/index.md create mode 100644 nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/GuavaAdapter.java create mode 100644 nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithGuava.java create mode 100644 nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithGuava.java diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml new file mode 100644 index 0000000..86d686a --- /dev/null +++ b/.idea/jarRepositories.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index f736eb6..4569c97 100644 --- a/README.md +++ b/README.md @@ -2,37 +2,73 @@ [![Build Status](https://travis-ci.org/mrcsparker/nifi-sqllookup-services-bundle.svg?branch=master)](https://travis-ci.org/mrcsparker/nifi-sqllookup-services-bundle) -[![Download](https://api.bintray.com/packages/mrcsparker/maven/nifi-sqllookup-services-bundle/images/download.svg)](https://bintray.com/mrcsparker/maven/nifi-sqllookup-services-bundle/_latestVersion) + -**NiFI SQL Lookup Service** is a SQL-based lookup service. It allows you to enrich your flowfiles with any jdbc-compliant data store. +- [Apache NiFi SQL Lookup Service](#apache-nifi-sql-lookup-service) + - [About](#about) + - [Simple Setup](#simple-setup) + - [SQL Query Support](#sql-query-support) + - [Caching](#caching) + - [Supported caches](#supported-caches) + - [Latest release](#latest-release) + - [Articles on using NiFi lookup services](#articles-on-using-nifi-lookup-services) -It includes LookupRecord and LookupAttribute controllers. + -These controllers were designed to be flexible and fast. They support: +## About -* _Named Parameters_: `SELECT name FROM foo WHERE value = :value` -* _SQL IN queries_: `SELECT name FROM foo WHERE value IN(:values)` -* _Multiple lookup values_: `SELECT name FROM foo WHERE value IN(:values) AND sequence = :sequence AND catalog = :catalog` -* _Caching_: LookupRecord and LookupAttribute both keep a cache of the most accessed objects. This is configurable in the controller settings. +**NiFI SQL Lookup Service** is a SQL-based lookup service for [Apache NiFi](https://nifi.apache.org). It allows you to enrich your flowfiles with any jdbc-compliant data store, including: -## Simple Setup +- PostgreSQL +- Oracle +- MySQL +- MS SQL Server +- SQLite +- ... and many more + +It includes both [LookupRecord](http://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.LookupRecord/) and [LookupAttribute](https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.LookupAttribute/) controllers. -* [Download Apache NiFi](https://nifi.apache.org/download.html) +These controllers were designed to be _flexible_ and _fast_. + +## Simple Setup -* Compile and install `nifi-sqllookup-bundle` +- [Download Apache NiFi](https://nifi.apache.org/download.html) +- Compile and install `nifi-sqllookup-bundle` ```bash > cd nifi-sqllookup-bundle > mvn package -> cp ./nifi-sqllookup-services-nar/target/nifi-sqllookup-services-nar-1.10.0.nar /NIFI_INSTALL/lib/ -> cp ./nifi-sqllookup-services-api-nar/target/nifi-sqllookup-services-api-nar-1.10.0.nar /NIFI_INSTALL/lib/ +> cp ./nifi-sqllookup-services-nar/target/nifi-sqllookup-services-nar-1.12.0.nar /NIFI_INSTALL/lib/ +> cp ./nifi-sqllookup-services-api-nar/target/nifi-sqllookup-services-api-nar-1.12.0.nar /NIFI_INSTALL/lib/ ``` -* Start NiFi +- Start NiFi + +## SQL Query Support + +This service supports multiple query types: + +- _Named Parameters_: `SELECT name FROM foo WHERE value = :value` +- _SQL IN queries_: `SELECT name FROM foo WHERE value IN(:values)` +- _Multiple lookup values_: `SELECT name FROM foo WHERE value IN(:values) AND sequence = :sequence AND catalog = :catalog` + +## Caching + +The goal of this service is to return values quickly. It has a built-in cache so that your database doesn't get overwhelmed. + +This is configurable in the controller settings. + +### Supported caches + +These caches are all built-in to this service. Select your preferable cache in the controller settings. + +- [Caffeine](https://github.com/ben-manes/caffeine) - default cache. +- [Cache2k](https://cache2k.org) +- [Guava](https://github.com/google/guava/wiki/CachesExplained) -## Full Documentation +You can also select the number of items that you want to cache. The caches all keep the most accessed items available by default. -[https://mrcsparker.github.io/nifi-sqllookup-services-bundle](https://mrcsparker.github.io/nifi-sqllookup-services-bundle) +If you don't know which to choose, just go with the default. ## Latest release diff --git a/docs/.gitignore b/docs/.gitignore deleted file mode 100644 index 9a93e11..0000000 --- a/docs/.gitignore +++ /dev/null @@ -1,62 +0,0 @@ - -# Created by https://www.gitignore.io/api/ruby,jekyll - -### Jekyll ### -_site/ -.sass-cache/ -.jekyll-metadata - -### Ruby ### -*.gem -*.rbc -/.config -/coverage/ -/InstalledFiles -/pkg/ -/spec/reports/ -/spec/examples.txt -/test/tmp/ -/test/version_tmp/ -/tmp/ - -# Used by dotenv library to load environment variables. -# .env - -## Specific to RubyMotion: -.dat* -.repl_history -build/ -*.bridgesupport -build-iPhoneOS/ -build-iPhoneSimulator/ - -## Specific to RubyMotion (use of CocoaPods): -# -# We recommend against adding the Pods directory to your .gitignore. However -# you should judge for yourself, the pros and cons are mentioned at: -# https://guides.cocoapods.org/using/using-cocoapods.html#should-i-check-the-pods-directory-into-source-control -# -# vendor/Pods/ - -## Documentation cache and generated files: -/.yardoc/ -/_yardoc/ -/doc/ -/rdoc/ - -## Environment normalization: -/.bundle/ -/vendor/bundle -/lib/bundler/man/ - -# for a library or gem, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# Gemfile.lock -# .ruby-version -# .ruby-gemset - -# unless supporting rvm < 1.11.0 or doing something fancy, ignore this: -.rvmrc - - -# End of https://www.gitignore.io/api/ruby,jekyll diff --git a/docs/.gitkeep b/docs/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/docs/Gemfile b/docs/Gemfile deleted file mode 100644 index 0fc53fa..0000000 --- a/docs/Gemfile +++ /dev/null @@ -1,28 +0,0 @@ -source "https://rubygems.org" - -# Hello! This is where you manage which Jekyll version is used to run. -# When you want to use a different version, change it below, save the -# file and run `bundle install`. Run Jekyll with `bundle exec`, like so: -# -# bundle exec jekyll serve -# -# This will help ensure the proper Jekyll version is running. -# Happy Jekylling! -gem "jekyll", "~> 3.8.5" - -# This is the default theme for new Jekyll sites. You may change this to anything you like. -gem "minima", "~> 2.5.1" - -gem "faraday", "< 1.0" - -# If you have any plugins, put them here! -group :jekyll_plugins do - gem "jekyll-feed", "~> 0.13.0" - gem "github-pages", "203" -end - -# Windows does not include zoneinfo files, so bundle the tzinfo-data gem -gem "tzinfo-data", platforms: [:mingw, :mswin, :x64_mingw, :jruby] - -# Performance-booster for watching directories on Windows -gem "wdm", "~> 0.1.1" if Gem.win_platform? \ No newline at end of file diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock deleted file mode 100644 index 8cb1037..0000000 --- a/docs/Gemfile.lock +++ /dev/null @@ -1,253 +0,0 @@ -GEM - remote: https://rubygems.org/ - specs: - activesupport (6.0.2.1) - concurrent-ruby (~> 1.0, >= 1.0.2) - i18n (>= 0.7, < 2) - minitest (~> 5.1) - tzinfo (~> 1.1) - zeitwerk (~> 2.2) - addressable (2.7.0) - public_suffix (>= 2.0.2, < 5.0) - coffee-script (2.4.1) - coffee-script-source - execjs - coffee-script-source (1.11.1) - colorator (1.1.0) - commonmarker (0.17.13) - ruby-enum (~> 0.5) - concurrent-ruby (1.1.5) - dnsruby (1.61.3) - addressable (~> 2.5) - em-websocket (0.5.1) - eventmachine (>= 0.12.9) - http_parser.rb (~> 0.6.0) - ethon (0.12.0) - ffi (>= 1.3.0) - eventmachine (1.2.7) - execjs (2.7.0) - faraday (0.17.3) - multipart-post (>= 1.2, < 3) - ffi (1.12.1) - forwardable-extended (2.6.0) - gemoji (3.0.1) - github-pages (203) - github-pages-health-check (= 1.16.1) - jekyll (= 3.8.5) - jekyll-avatar (= 0.7.0) - jekyll-coffeescript (= 1.1.1) - jekyll-commonmark-ghpages (= 0.1.6) - jekyll-default-layout (= 0.1.4) - jekyll-feed (= 0.13.0) - jekyll-gist (= 1.5.0) - jekyll-github-metadata (= 2.12.1) - jekyll-mentions (= 1.5.1) - jekyll-optional-front-matter (= 0.3.2) - jekyll-paginate (= 1.1.0) - jekyll-readme-index (= 0.3.0) - jekyll-redirect-from (= 0.15.0) - jekyll-relative-links (= 0.6.1) - jekyll-remote-theme (= 0.4.1) - jekyll-sass-converter (= 1.5.2) - jekyll-seo-tag (= 2.6.1) - jekyll-sitemap (= 1.4.0) - jekyll-swiss (= 1.0.0) - jekyll-theme-architect (= 0.1.1) - jekyll-theme-cayman (= 0.1.1) - jekyll-theme-dinky (= 0.1.1) - jekyll-theme-hacker (= 0.1.1) - jekyll-theme-leap-day (= 0.1.1) - jekyll-theme-merlot (= 0.1.1) - jekyll-theme-midnight (= 0.1.1) - jekyll-theme-minimal (= 0.1.1) - jekyll-theme-modernist (= 0.1.1) - jekyll-theme-primer (= 0.5.4) - jekyll-theme-slate (= 0.1.1) - jekyll-theme-tactile (= 0.1.1) - jekyll-theme-time-machine (= 0.1.1) - jekyll-titles-from-headings (= 0.5.3) - jemoji (= 0.11.1) - kramdown (= 1.17.0) - liquid (= 4.0.3) - mercenary (~> 0.3) - minima (= 2.5.1) - nokogiri (>= 1.10.4, < 2.0) - rouge (= 3.13.0) - terminal-table (~> 1.4) - github-pages-health-check (1.16.1) - addressable (~> 2.3) - dnsruby (~> 1.60) - octokit (~> 4.0) - public_suffix (~> 3.0) - typhoeus (~> 1.3) - html-pipeline (2.12.3) - activesupport (>= 2) - nokogiri (>= 1.4) - http_parser.rb (0.6.0) - i18n (0.9.5) - concurrent-ruby (~> 1.0) - jekyll (3.8.5) - addressable (~> 2.4) - colorator (~> 1.0) - em-websocket (~> 0.5) - i18n (~> 0.7) - jekyll-sass-converter (~> 1.0) - jekyll-watch (~> 2.0) - kramdown (~> 1.14) - liquid (~> 4.0) - mercenary (~> 0.3.3) - pathutil (~> 0.9) - rouge (>= 1.7, < 4) - safe_yaml (~> 1.0) - jekyll-avatar (0.7.0) - jekyll (>= 3.0, < 5.0) - jekyll-coffeescript (1.1.1) - coffee-script (~> 2.2) - coffee-script-source (~> 1.11.1) - jekyll-commonmark (1.3.1) - commonmarker (~> 0.14) - jekyll (>= 3.7, < 5.0) - jekyll-commonmark-ghpages (0.1.6) - commonmarker (~> 0.17.6) - jekyll-commonmark (~> 1.2) - rouge (>= 2.0, < 4.0) - jekyll-default-layout (0.1.4) - jekyll (~> 3.0) - jekyll-feed (0.13.0) - jekyll (>= 3.7, < 5.0) - jekyll-gist (1.5.0) - octokit (~> 4.2) - jekyll-github-metadata (2.12.1) - jekyll (~> 3.4) - octokit (~> 4.0, != 4.4.0) - jekyll-mentions (1.5.1) - html-pipeline (~> 2.3) - jekyll (>= 3.7, < 5.0) - jekyll-optional-front-matter (0.3.2) - jekyll (>= 3.0, < 5.0) - jekyll-paginate (1.1.0) - jekyll-readme-index (0.3.0) - jekyll (>= 3.0, < 5.0) - jekyll-redirect-from (0.15.0) - jekyll (>= 3.3, < 5.0) - jekyll-relative-links (0.6.1) - jekyll (>= 3.3, < 5.0) - jekyll-remote-theme (0.4.1) - addressable (~> 2.0) - jekyll (>= 3.5, < 5.0) - rubyzip (>= 1.3.0) - jekyll-sass-converter (1.5.2) - sass (~> 3.4) - jekyll-seo-tag (2.6.1) - jekyll (>= 3.3, < 5.0) - jekyll-sitemap (1.4.0) - jekyll (>= 3.7, < 5.0) - jekyll-swiss (1.0.0) - jekyll-theme-architect (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-cayman (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-dinky (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-hacker (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-leap-day (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-merlot (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-midnight (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-minimal (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-modernist (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-primer (0.5.4) - jekyll (> 3.5, < 5.0) - jekyll-github-metadata (~> 2.9) - jekyll-seo-tag (~> 2.0) - jekyll-theme-slate (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-tactile (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-theme-time-machine (0.1.1) - jekyll (~> 3.5) - jekyll-seo-tag (~> 2.0) - jekyll-titles-from-headings (0.5.3) - jekyll (>= 3.3, < 5.0) - jekyll-watch (2.2.1) - listen (~> 3.0) - jemoji (0.11.1) - gemoji (~> 3.0) - html-pipeline (~> 2.2) - jekyll (>= 3.0, < 5.0) - kramdown (1.17.0) - liquid (4.0.3) - listen (3.2.1) - rb-fsevent (~> 0.10, >= 0.10.3) - rb-inotify (~> 0.9, >= 0.9.10) - mercenary (0.3.6) - mini_portile2 (2.4.0) - minima (2.5.1) - jekyll (>= 3.5, < 5.0) - jekyll-feed (~> 0.9) - jekyll-seo-tag (~> 2.1) - minitest (5.14.0) - multipart-post (2.1.1) - nokogiri (1.10.7) - mini_portile2 (~> 2.4.0) - octokit (4.15.0) - faraday (>= 0.9) - sawyer (~> 0.8.0, >= 0.5.3) - pathutil (0.16.2) - forwardable-extended (~> 2.6) - public_suffix (3.1.1) - rb-fsevent (0.10.3) - rb-inotify (0.10.1) - ffi (~> 1.0) - rouge (3.13.0) - ruby-enum (0.7.2) - i18n - rubyzip (2.0.0) - safe_yaml (1.0.5) - sass (3.7.4) - sass-listen (~> 4.0.0) - sass-listen (4.0.0) - rb-fsevent (~> 0.9, >= 0.9.4) - rb-inotify (~> 0.9, >= 0.9.7) - sawyer (0.8.2) - addressable (>= 2.3.5) - faraday (> 0.8, < 2.0) - terminal-table (1.8.0) - unicode-display_width (~> 1.1, >= 1.1.1) - thread_safe (0.3.6) - typhoeus (1.3.1) - ethon (>= 0.9.0) - tzinfo (1.2.6) - thread_safe (~> 0.1) - unicode-display_width (1.6.1) - zeitwerk (2.2.2) - -PLATFORMS - ruby - -DEPENDENCIES - faraday (< 1.0) - github-pages (= 203) - jekyll (~> 3.8.5) - jekyll-feed (~> 0.13.0) - minima (~> 2.5.1) - tzinfo-data - -BUNDLED WITH - 1.17.2 diff --git a/docs/_config.yml b/docs/_config.yml deleted file mode 100644 index 4b21f07..0000000 --- a/docs/_config.yml +++ /dev/null @@ -1,36 +0,0 @@ -# Welcome to Jekyll! -# -# This config file is meant for settings that affect your whole blog, values -# which you are expected to set up once and rarely edit after that. If you find -# yourself editing this file very often, consider using Jekyll's data files -# feature for the data you need to update frequently. -# -# For technical reasons, this file is *NOT* reloaded automatically when you use -# 'bundle exec jekyll serve'. If you change this file, please restart the server process. - -# Site settings -# These are used to personalize your new site. If you look in the HTML files, -# you will see them accessed via {{ site.title }}, {{ site.email }}, and so on. -# You can create any custom variable you would like, and they will be accessible -# in the templates via {{ site.myvariable }}. -title: NIFI SQL Lookup Services Bundle -email: mrcsparker@gmail.com -description: >- # this means to ignore newlines until "baseurl:" - Apache NIFI SQL Lookup Service -show_downloads: true - -# Build settings -markdown: kramdown -theme: jekyll-theme-midnight - -# Exclude from processing. -# The following items will not be processed, by default. Create a custom list -# to override the default setting. -# exclude: -# - Gemfile -# - Gemfile.lock -# - node_modules -# - vendor/bundle/ -# - vendor/cache/ -# - vendor/gems/ -# - vendor/ruby/ diff --git a/docs/index.md b/docs/index.md deleted file mode 100644 index c3d9b39..0000000 --- a/docs/index.md +++ /dev/null @@ -1,126 +0,0 @@ ---- -layout: default -title: Documentation ---- - -NiFi SQL Lookup Services Bundle allows you to so SQL for your NiFi LookupRecord and LookupAttribute needs. - -For background, read: - -* [Data flow enrichment with NiFi part 1 : LookupRecord processor](https://community.hortonworks.com/articles/138632/data-flow-enrichment-with-nifi-lookuprecord-proces.html) -* [Data flow enrichment with NiFi part 2 : LookupAttribute processor](https://community.hortonworks.com/articles/140231/data-flow-enrichment-with-nifi-part-2-lookupattrib.html) -* [Data flow enrichment with NiFi part 3: LookupRecord with MongoDB](https://community.hortonworks.com/articles/146198/data-flow-enrichment-with-nifi-part-3-lookuprecord.html) - -Compiling -========== - -Before you use the service you are going to need to compile it. - -__First__ grab the source code from github: - -```sh ->> git clone https://github.com/mrcsparker/nifi-sqllookup-services-bundle.git -``` - -__Second__ compile the code: - -```sh ->> cd nifi-sqllookup-services-bundle ->> mvn package -``` - -__Third__ add the compiled NiFi nars to your local NiFi install: - -```sh ->> cp ./nifi-sqllookup-services-nar/target/nifi-sqllookup-services-nar-1.10.0.nar /PATH/TO/NIFI/lib ->> cp ./nifi-sqllookup-services-api-nar/target/nifi-sqllookup-services-api-nar-1.10.0.nar /PATH/TO/NIFI/lib -``` -__Finally__ startup NiFi: - -```sh ->> cd /PATH/TO/NIFI ->> ./bin/nifi.sh run -``` - -You are ready to go! - -Using the SQL Lookup Services Bundle -==================================== - -There are two NiFi controllers in the SQL Lookup Services bundle: - -1. _LookupAttribute_: look up a single column from a SQL query and assign it as an attribute to a FlowFile -2. _LookupRecord_: look up an entire row from a SQL query and add it to the contents of a FlowFile - -In this case, we are going to go over the _LookupRecord_ controller (_SQLRecordLookupService_). - -We are going to use PostgreSQL for the backend data store and the [MovieLens](https://grouplens.org/datasets/movielens/) data. - -### 1. Download the MovieLens `ml-latest.zip` file - -### 2. Create a database named `movielens` - -### 3. Create the following schemas in `movielens`: - -```sql -create table movies ( - movie_id int, - title varchar(200), - genres varchar(1000), - primary key(movie_id) -); - -create table ratings ( - user_id int, - movie_id int, - rating float, - timestamp bigint, - primary key(user_id, movie_id) -); - -create table tags ( - user_id int, - movie_id int, - tag varchar(255), - timestamp bigint, - primary key(user_id, movie_id, tag) -); - -create table links ( - movie_id int, - imdb_id int, - tmdb_id int, - primary key(movie_id) -); - -create table genome_tags ( - tag_id int, - tag varchar(100), - primary key(tag_id) -); - -create table genome_scores ( - movie_id int, - tag_id int, - relevance float, - primary key(movie_id, tag_id) -); -``` - -### 4. Unzip and, using psql, copy in the MovieLens data: - -```sh -movielens=# \copy movies from './ml-latest/movies.csv' delimiter ',' CSV header; -movielens=# \copy ratings from './ml-latest/ratings.csv' delimiter ',' CSV header; -movielens=# \copy tags from './ml-latest/tags.csv' delimiter ',' CSV header; -movielens=# \copy links from './ml-latest/links.csv' delimiter ',' CSV header; -movielens=# \copy genome_tags from './ml-latest/genome-tags.csv' delimiter ',' CSV header; -movielens=# \copy genome_scores from './ml-latest/genome-scores.csv' delimiter ',' CSV header; -``` - - - - - - - diff --git a/nifi-sqllookup-services-api-nar/pom.xml b/nifi-sqllookup-services-api-nar/pom.xml index d25cc36..6ba30c8 100644 --- a/nifi-sqllookup-services-api-nar/pom.xml +++ b/nifi-sqllookup-services-api-nar/pom.xml @@ -23,7 +23,7 @@ com.mrcsparker nifi-sqllookup-services-bundle - 1.10.0 + 1.12.0 nifi-sqllookup-services-api-nar diff --git a/nifi-sqllookup-services-api/pom.xml b/nifi-sqllookup-services-api/pom.xml index dac7022..4f74e85 100644 --- a/nifi-sqllookup-services-api/pom.xml +++ b/nifi-sqllookup-services-api/pom.xml @@ -23,7 +23,7 @@ com.mrcsparker nifi-sqllookup-services-bundle - 1.10.0 + 1.12.0 nifi-sqllookup-services-api @@ -38,7 +38,7 @@ org.apache.nifi nifi-lookup-service-api - 1.10.0 + 1.12.0 provided diff --git a/nifi-sqllookup-services-nar/pom.xml b/nifi-sqllookup-services-nar/pom.xml index dc02c2b..30e2a25 100644 --- a/nifi-sqllookup-services-nar/pom.xml +++ b/nifi-sqllookup-services-nar/pom.xml @@ -18,7 +18,7 @@ com.mrcsparker nifi-sqllookup-services-bundle - 1.10.0 + 1.12.0 nifi-sqllookup-services-nar diff --git a/nifi-sqllookup-services/pom.xml b/nifi-sqllookup-services/pom.xml index bf80c23..bef37ab 100644 --- a/nifi-sqllookup-services/pom.xml +++ b/nifi-sqllookup-services/pom.xml @@ -13,14 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. --> - 4.0.0 com.mrcsparker nifi-sqllookup-services-bundle - 1.10.0 + 1.12.0 nifi-sqllookup-services @@ -73,6 +73,11 @@ ${cache2k.version} pom + + com.google.guava + guava + ${guava.version} + org.springframework spring-jdbc diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupService.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupService.java index 7ad9845..9bef4e0 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupService.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupService.java @@ -21,55 +21,62 @@ abstract class AbstractSQLLookupService extends AbstractControllerService implements LookupService { static final PropertyDescriptor CONNECTION_POOL = - new PropertyDescriptor.Builder() - .name("connection-pool") - .displayName("Connection Pool") - .description("Specifies the JDBC connection pool used to connect to the database.") - .identifiesControllerService(DBCPService.class) - .required(true) - .build(); + new PropertyDescriptor.Builder() + .name("connection-pool") + .displayName("Connection Pool") + .description("Specifies the JDBC connection pool used to connect to the database.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); static final PropertyDescriptor SQL_QUERY = - new PropertyDescriptor.Builder() - .name("sql-query") - .displayName("SQL Query") - .description("SQL Query") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() - .name("max-wait-time") - .description("The maximum amount of time allowed for a running SQL select query " - + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") - .defaultValue("0 seconds") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) - .build(); - - static final AllowableValue CACHING_LIBRARY_CAFFEINE = new AllowableValue("Caffeine", "Caffeine", "Use Caffeine"); - - static final AllowableValue CACHING_LIBRARY_CACHE2k = new AllowableValue("Cache2k", "Cache2k", "Use Cache2k"); - - static final PropertyDescriptor CACHING_LIBRARY = new PropertyDescriptor.Builder() - .name("caching-library") - .displayName("Caching library") - .description("Library to use for caching.") - .allowableValues(CACHING_LIBRARY_CAFFEINE, CACHING_LIBRARY_CACHE2k) - .defaultValue(CACHING_LIBRARY_CAFFEINE.getValue()) - .required(true) - .build(); - - static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() - .name("cache-size") - .displayName("Cache size") - .description("Size of the lookup cache.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .build(); + new PropertyDescriptor.Builder() + .name("sql-query") + .displayName("SQL Query") + .description("SQL Query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor QUERY_TIMEOUT = + new PropertyDescriptor.Builder() + .name("max-wait-time") + .description("The maximum amount of time allowed for a running SQL select query " + + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + static final AllowableValue CACHING_LIBRARY_CAFFEINE = + new AllowableValue("Caffeine", "Caffeine", "Use Caffeine"); + + static final AllowableValue CACHING_LIBRARY_CACHE2k = + new AllowableValue("Cache2k", "Cache2k", "Use Cache2k"); + + static final AllowableValue CACHING_LIBRARY_GUAVA = + new AllowableValue("Guava", "Guava", "Use Guava"); + + static final PropertyDescriptor CACHING_LIBRARY = + new PropertyDescriptor.Builder() + .name("caching-library") + .displayName("Caching library").description("Library to use for caching.") + .allowableValues(CACHING_LIBRARY_CAFFEINE, CACHING_LIBRARY_CACHE2k, CACHING_LIBRARY_GUAVA) + .defaultValue(CACHING_LIBRARY_CAFFEINE.getValue()) + .required(true) + .build(); + + static final PropertyDescriptor CACHE_SIZE = + new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache size") + .description("Size of the lookup cache.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); String sqlQuery; Integer queryTimeout; @@ -100,7 +107,6 @@ public Optional lookup(Map coordinates) throws LookupFailureE abstract Optional cacheLookup(Map coordinates) throws LookupFailureException; - @Override public Set getRequiredKeys() { return Collections.emptySet(); diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLLookupService.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLLookupService.java index cd88b44..d4b6ce7 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLLookupService.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLLookupService.java @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import com.mrcsparker.nifi.sqllookup.cache.Cache2kAdapter; import com.mrcsparker.nifi.sqllookup.cache.CaffeineAdapter; +import com.mrcsparker.nifi.sqllookup.cache.GuavaAdapter; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; @@ -39,14 +41,13 @@ public class SQLLookupService extends AbstractSQLLookupService { public static final PropertyDescriptor LOOKUP_VALUE_COLUMN = - new PropertyDescriptor.Builder() - .name("lookup-value-column") - .displayName("Lookup Value Column") - .description("Lookup value column.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); + new PropertyDescriptor.Builder() + .name("lookup-value-column") + .displayName("Lookup Value Column") + .description("Lookup value column.") + .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); static final Logger LOG = LoggerFactory.getLogger(SQLLookupService.class); private final List propertyDescriptors; private String lookupValue; @@ -70,6 +71,7 @@ protected List getSupportedPropertyDescriptors() { @Override Optional databaseLookup(Map coordinates) throws LookupFailureException { final DataSource dataSource = new BasicDataSource() { + @Override public Connection getConnection() throws SQLException { return dbcpService.getConnection(); @@ -130,8 +132,10 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE if (cachingLibrary.equals("Caffeine")) { cache = new CaffeineAdapter<>(cacheSize); - } else { + } else if (cachingLibrary.equals("Cache2k")) { cache = new Cache2kAdapter<>(cacheSize, String.class); + } else { + cache = new GuavaAdapter<>(cacheSize); } } diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLNamedParameterJdbcTemplate.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLNamedParameterJdbcTemplate.java index b5c93ad..31628d1 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLNamedParameterJdbcTemplate.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLNamedParameterJdbcTemplate.java @@ -7,6 +7,7 @@ import javax.sql.DataSource; public class SQLNamedParameterJdbcTemplate extends NamedParameterJdbcTemplate { + public SQLNamedParameterJdbcTemplate(DataSource dataSource) { super(dataSource); } diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLRecordLookupService.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLRecordLookupService.java index 3b5abcc..229ed20 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLRecordLookupService.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLRecordLookupService.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import com.mrcsparker.nifi.sqllookup.cache.Cache2kAdapter; @@ -44,25 +45,24 @@ import java.sql.SQLException; import java.util.*; -@Tags({"dbcp", "database", "lookup", "record", "sql", "cache"}) -@CapabilityDescription( - "Provides a lookup service based around DBCP." -) +@Tags({ "dbcp", "database", "lookup", "record", "sql", "cache" }) +@CapabilityDescription("Provides a lookup service based around DBCP.") public class SQLRecordLookupService extends AbstractSQLLookupService { static final Logger LOG = LoggerFactory.getLogger(SQLRecordLookupService.class); static final PropertyDescriptor USE_JDBC_TYPES = - new PropertyDescriptor.Builder() - .name("use-jdbc-types") - .displayName("Use JDBC types") - .description("Use Built-in JDBC to Record type conversion.\n" + - "If this is not selected it will use the NIFI ResultRecordSet to convert into a Record.\n" + - "Use this if you are returning array types from a SQL query.") - .defaultValue("false") - .allowableValues("true", "false") - .required(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .build(); + new PropertyDescriptor.Builder() + .name("use-jdbc-types") + .displayName("Use JDBC types") + .description("Use Built-in JDBC to Record type conversion.\n" + + "If this is not selected it will use the NIFI ResultRecordSet to convert into a Record.\n" + + "Use this if you are returning array types from a SQL query.") + .defaultValue("false") + .allowableValues("true", "false") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + private final List propertyDescriptors; public SQLRecordLookupService() { @@ -93,6 +93,7 @@ public Class getValueType() { private PreparedStatementCreator setupPreparedStatementCreator(Map coordinates) { final DataSource dataSource = new BasicDataSource() { + @Override public Connection getConnection() throws SQLException { return dbcpService.getConnection(); @@ -111,17 +112,19 @@ public Connection getConnection() throws SQLException { @Override Optional databaseLookup(Map coordinates) throws LookupFailureException { - if (useJDBCTypes) { + if (Boolean.TRUE.equals(useJDBCTypes)) { return jdbcDatabaseLookup(coordinates); } return resultRecordSetDatabaseLookup(coordinates); } - private Optional resultRecordSetDatabaseLookup(Map coordinates) throws LookupFailureException { + private Optional resultRecordSetDatabaseLookup(Map coordinates) + throws LookupFailureException { PreparedStatementCreator preparedStatementCreator = setupPreparedStatementCreator(coordinates); - try (final Connection connection = dbcpService.getConnection(); - final PreparedStatement preparedStatement = preparedStatementCreator.createPreparedStatement(connection)) { + try (final Connection connection = dbcpService + .getConnection(); final PreparedStatement preparedStatement = preparedStatementCreator + .createPreparedStatement(connection)) { preparedStatement.setQueryTimeout(queryTimeout); preparedStatement.execute(); @@ -134,7 +137,7 @@ private Optional resultRecordSetDatabaseLookup(Map coord } } catch (final ProcessException | SQLException e) { - getLogger().error("Error during lookup: {}", new Object[]{coordinates.toString()}, e); + getLogger().error("Error during lookup: {}", new Object[] { coordinates.toString() }, e); throw new LookupFailureException(e); } catch (final NullPointerException | IOException e) { return Optional.empty(); @@ -144,8 +147,9 @@ private Optional resultRecordSetDatabaseLookup(Map coord private Optional jdbcDatabaseLookup(Map coordinates) throws LookupFailureException { PreparedStatementCreator preparedStatementCreator = setupPreparedStatementCreator(coordinates); - try (final Connection connection = dbcpService.getConnection(); - final PreparedStatement preparedStatement = preparedStatementCreator.createPreparedStatement(connection)) { + try (final Connection connection = dbcpService + .getConnection(); final PreparedStatement preparedStatement = preparedStatementCreator + .createPreparedStatement(connection)) { preparedStatement.setQueryTimeout(queryTimeout); preparedStatement.execute(); @@ -158,7 +162,7 @@ private Optional jdbcDatabaseLookup(Map coordinates) thr } } catch (final ProcessException | SQLException e) { - getLogger().error("Error during lookup: {}", new Object[]{coordinates.toString()}, e); + getLogger().error("Error during lookup: {}", new Object[] { coordinates.toString() }, e); throw new LookupFailureException(e); } catch (final NullPointerException | IOException e) { return Optional.empty(); diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLResultSetRecordSet.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLResultSetRecordSet.java index b3a0cc8..b4f84ac 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLResultSetRecordSet.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/SQLResultSetRecordSet.java @@ -7,14 +7,26 @@ import java.io.Closeable; import java.io.IOException; +import java.math.BigDecimal; import java.math.BigInteger; import java.sql.*; +import java.util.Date; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; public class SQLResultSetRecordSet implements RecordSet, Closeable { + private static final Logger logger = LoggerFactory.getLogger(SQLResultSetRecordSet.class); + + private static final String STRING_CLASS_NAME = String.class.getName(); + private static final String INT_CLASS_NAME = Integer.class.getName(); + private static final String LONG_CLASS_NAME = Long.class.getName(); + private static final String DATE_CLASS_NAME = Date.class.getName(); + private static final String DOUBLE_CLASS_NAME = Double.class.getName(); + private static final String FLOAT_CLASS_NAME = Float.class.getName(); + private static final String BIGDECIMAL_CLASS_NAME = BigDecimal.class.getName(); + private final ResultSet rs; private final RecordSchema schema; private final Set rsColumnNames; @@ -55,7 +67,8 @@ private static RecordSchema createSchema(final ResultSet rs, final RecordSchema return new SimpleRecordSchema(fields); } - private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, final RecordSchema readerSchema) throws SQLException { + private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex, + final RecordSchema readerSchema) throws SQLException { switch (sqlType) { case Types.ARRAY: // The JDBC API does not allow us to know what the base type of an array is through the metadata. @@ -78,6 +91,10 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final case Types.LONGVARBINARY: case Types.VARBINARY: return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case Types.NUMERIC: + case Types.DECIMAL: + return RecordFieldType.DECIMAL.getDecimalDataType(rs.getMetaData().getPrecision(columnIndex), + rs.getMetaData().getScale(columnIndex)); case Types.OTHER: { // If we have no records to inspect, we can't really know its schema so we simply use the default data type. if (rs.isAfterLast()) { @@ -95,11 +112,13 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final final Object obj = rs.getObject(columnIndex); if (!(obj instanceof Record)) { - final List dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, - RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME, - RecordFieldType.TIMESTAMP) - .map(RecordFieldType::getDataType) - .collect(Collectors.toList()); + final List dataTypes = Stream + .of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, + RecordFieldType.CHAR, RecordFieldType.DATE, RecordFieldType.DECIMAL, + RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, + RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, + RecordFieldType.TIME, RecordFieldType.TIMESTAMP) + .map(RecordFieldType::getDataType).collect(Collectors.toList()); return RecordFieldType.CHOICE.getChoiceDataType(dataTypes); } @@ -118,7 +137,15 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final } } - return getFieldType(sqlType).getDataType(); + final RecordFieldType fieldType = getFieldType(sqlType, + rs.getMetaData().getColumnClassName(columnIndex)); + + if (RecordFieldType.DECIMAL.equals(fieldType)) { + final BigDecimal bigDecimalValue = rs.getBigDecimal(columnIndex); + return fieldType.getDecimalDataType(bigDecimalValue.precision(), bigDecimalValue.scale()); + } else { + return fieldType.getDataType(); + } } } } @@ -128,6 +155,7 @@ private static DataType getArrayBaseType(final Array array) throws SQLException if (arrayValue == null) { return RecordFieldType.STRING.getDataType(); } + if (arrayValue instanceof byte[]) { return RecordFieldType.BYTE.getDataType(); } @@ -143,6 +171,9 @@ private static DataType getArrayBaseType(final Array array) throws SQLException if (arrayValue instanceof short[]) { return RecordFieldType.SHORT.getDataType(); } + if (arrayValue instanceof byte[]) { + return RecordFieldType.BYTE.getDataType(); + } if (arrayValue instanceof float[]) { return RecordFieldType.FLOAT.getDataType(); } @@ -190,6 +221,10 @@ private static DataType getArrayBaseType(final Array array) throws SQLException if (valueToLookAt instanceof Double) { return RecordFieldType.DOUBLE.getDataType(); } + if (valueToLookAt instanceof BigDecimal) { + final BigDecimal bigDecimal = (BigDecimal) valueToLookAt; + return RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), bigDecimal.scale()); + } if (valueToLookAt instanceof Boolean) { return RecordFieldType.BOOLEAN.getDataType(); } @@ -199,6 +234,9 @@ private static DataType getArrayBaseType(final Array array) throws SQLException if (valueToLookAt instanceof BigInteger) { return RecordFieldType.BIGINT.getDataType(); } + if (valueToLookAt instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } if (valueToLookAt instanceof java.sql.Time) { return RecordFieldType.TIME.getDataType(); } @@ -217,7 +255,7 @@ private static DataType getArrayBaseType(final Array array) throws SQLException return RecordFieldType.STRING.getDataType(); } - private static RecordFieldType getFieldType(final int sqlType) { + private static RecordFieldType getFieldType(final int sqlType, final String valueClassName) { switch (sqlType) { case Types.BIGINT: case Types.ROWID: @@ -229,9 +267,10 @@ private static RecordFieldType getFieldType(final int sqlType) { return RecordFieldType.CHAR; case Types.DATE: return RecordFieldType.DATE; + case Types.NUMERIC: case Types.DECIMAL: + return RecordFieldType.DECIMAL; case Types.DOUBLE: - case Types.NUMERIC: case Types.REAL: return RecordFieldType.DOUBLE; case Types.FLOAT: @@ -251,6 +290,28 @@ private static RecordFieldType getFieldType(final int sqlType) { return RecordFieldType.STRING; case Types.OTHER: case Types.JAVA_OBJECT: + if (STRING_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.STRING; + } + if (INT_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.INT; + } + if (LONG_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.LONG; + } + if (DATE_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.DATE; + } + if (FLOAT_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.FLOAT; + } + if (DOUBLE_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.DOUBLE; + } + if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) { + return RecordFieldType.DECIMAL; + } + return RecordFieldType.RECORD; case Types.TIME: case Types.TIME_WITH_TIMEZONE: diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/Cache2kAdapter.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/Cache2kAdapter.java index 9e9756f..a138840 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/Cache2kAdapter.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/Cache2kAdapter.java @@ -5,13 +5,12 @@ import java.util.concurrent.ConcurrentMap; -public class Cache2kAdapter extends CacheAdapter { +public class Cache2kAdapter implements CacheAdapter { private final Cache cache; public Cache2kAdapter(Integer cacheSize, Class valueType) { - cache = Cache2kBuilder.of(String.class, valueType) - .entryCapacity(cacheSize).build(); + cache = Cache2kBuilder.of(String.class, valueType).entryCapacity(cacheSize).build(); } @Override diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CacheAdapter.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CacheAdapter.java index 9b833ab..f4e57d6 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CacheAdapter.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CacheAdapter.java @@ -2,16 +2,17 @@ import java.util.concurrent.ConcurrentMap; -public abstract class CacheAdapter { - public abstract T get(String key); +public interface CacheAdapter { - public abstract void set(String key, T value); + T get(String key); - public abstract void delete(String key); + void set(String key, T value); - public abstract long estimatedSize(); + void delete(String key); - public abstract ConcurrentMap asMap(); + long estimatedSize(); - public abstract void cleanUp(); + ConcurrentMap asMap(); + + void cleanUp(); } diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CaffeineAdapter.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CaffeineAdapter.java index 41a4585..547c96b 100644 --- a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CaffeineAdapter.java +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/CaffeineAdapter.java @@ -5,7 +5,7 @@ import java.util.concurrent.ConcurrentMap; -public class CaffeineAdapter extends CacheAdapter { +public class CaffeineAdapter implements CacheAdapter { private final Cache cache; diff --git a/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/GuavaAdapter.java b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/GuavaAdapter.java new file mode 100644 index 0000000..50c943f --- /dev/null +++ b/nifi-sqllookup-services/src/main/java/com/mrcsparker/nifi/sqllookup/cache/GuavaAdapter.java @@ -0,0 +1,46 @@ +package com.mrcsparker.nifi.sqllookup.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.ConcurrentMap; + +public class GuavaAdapter implements CacheAdapter { + + Cache cache; + + public GuavaAdapter(Integer cacheSize) { + cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build(); + } + + @Override + public T get(String key) { + return cache.getIfPresent(key); + } + + @Override + public void set(String key, T value) { + cache.put(key, value); + } + + @Override + public void delete(String key) { + cache.invalidate(key); + } + + @Override + public long estimatedSize() { + return cache.size(); + } + + @Override + public ConcurrentMap asMap() { + return cache.asMap(); + } + + @Override + public void cleanUp() { + cache.invalidateAll(); + cache.cleanUp(); + } +} diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupServiceTest.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupServiceTest.java index 56743ab..32a30e2 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupServiceTest.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/AbstractSQLLookupServiceTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.controller.AbstractControllerService; @@ -46,7 +47,6 @@ void setupDB() throws Exception { dbLocation.delete(); } - // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcpService")).getConnection(); Statement stmt = con.createStatement(); @@ -57,15 +57,13 @@ void setupDB() throws Exception { sqle.printStackTrace(); } - stmt.execute("CREATE TABLE IF NOT EXISTS TEST_LOOKUP_DB " + - "( " + - " id serial not null constraint test_lookup_db_pk primary key, " + - " name VARCHAR(30), " + - " value VARCHAR(255), " + - " period INT DEFAULT 1, " + - " address VARCHAR(255), " + - " price FLOAT(52) DEFAULT 0.00 " + - ")"); + stmt.execute("CREATE TABLE IF NOT EXISTS TEST_LOOKUP_DB ( " + + " id serial not null constraint test_lookup_db_pk primary key, " + + " name VARCHAR(30), " + + " value VARCHAR(255), " + + " period INT DEFAULT 1, " + + " address VARCHAR(255), " + + " price FLOAT(52) DEFAULT 0.00) "); stmt.execute("insert into TEST_LOOKUP_DB (name, value, period, address, price) VALUES ('495304346258559', 'Wildfire at Midnight', 7, '94384 Stroman Pike', 48.66)"); stmt.execute("insert into TEST_LOOKUP_DB (name, value, period, address, price) VALUES ('456148015917293', 'The Wealth of Nations', 9, '3743 Amanda Mountain', 359.92)"); diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestProcessor.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestProcessor.java index a93e7db..494e6e5 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestProcessor.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestProcessor.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.components.PropertyDescriptor; @@ -27,6 +28,7 @@ import java.util.List; class TestProcessor extends AbstractProcessor { + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } @@ -34,12 +36,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro @Override protected List getSupportedPropertyDescriptors() { List properties = new ArrayList<>(); - properties.add(new PropertyDescriptor.Builder() - .name("LookupService test processor") - .description("LookupService test processor") - .identifiesControllerService(LookupService.class) - .required(true) - .build()); + properties.add(new PropertyDescriptor.Builder().name("LookupService test processor") + .description("LookupService test processor").identifiesControllerService(LookupService.class) + .required(true).build()); return properties; } } diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupService.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupService.java index d4f5ed7..09e3abd 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupService.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupService.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -53,7 +54,8 @@ public void before() throws Exception { sqlLookupService = new SQLLookupService(); runner.addControllerService("SQLRecordLookupService", sqlLookupService); runner.setProperty(sqlLookupService, SQLLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); runner.setProperty(sqlLookupService, SQLLookupService.LOOKUP_VALUE_COLUMN, "VALUE"); runner.enableControllerService(dbcpService); runner.enableControllerService(sqlLookupService); @@ -104,7 +106,8 @@ public void testSimpleLookup2() throws Exception { @Test public void testMultiValueLookup0() throws Exception { runner.disableControllerService(sqlLookupService); - runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name AND address = :address"); + runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name AND address = :address"); runner.assertValid(sqlLookupService); runner.enableControllerService(sqlLookupService); diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceInQuery.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceInQuery.java index b0acf74..62fb087 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceInQuery.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceInQuery.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertTrue; public class TestSQLLookupServiceInQuery extends AbstractSQLLookupServiceTest { + static final Logger LOG = LoggerFactory.getLogger(TestSQLLookupService.class); private SQLLookupService sqlNamedLookupService; @@ -33,7 +34,8 @@ public void before() throws Exception { sqlNamedLookupService = new SQLLookupService(); runner.addControllerService("SQLRecordLookupService", sqlNamedLookupService); runner.setProperty(sqlNamedLookupService, SQLLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlNamedLookupService, SQLLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name IN (:name) ORDER BY name ASC"); + runner.setProperty(sqlNamedLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name IN (:name) ORDER BY name ASC"); runner.setProperty(sqlNamedLookupService, SQLLookupService.LOOKUP_VALUE_COLUMN, "VALUE"); runner.enableControllerService(dbcpService); runner.enableControllerService(sqlNamedLookupService); diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache.java index b9a60f3..3fab9a5 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -29,6 +30,7 @@ import static org.junit.Assert.*; public class TestSQLLookupServiceWithCache extends AbstractSQLLookupServiceTest { + private SQLLookupService sqlLookupService; @Before @@ -47,7 +49,8 @@ public void before() throws Exception { sqlLookupService = new SQLLookupService(); runner.addControllerService("SQLRecordLookupService", sqlLookupService); runner.setProperty(sqlLookupService, SQLLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); runner.setProperty(sqlLookupService, SQLLookupService.LOOKUP_VALUE_COLUMN, "VALUE"); runner.setProperty(sqlLookupService, SQLLookupService.CACHE_SIZE, "10"); runner.enableControllerService(dbcpService); @@ -125,5 +128,4 @@ public void testRecordLookupEmpty() throws Exception { assertFalse(key.isPresent()); } - } diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache2k.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache2k.java index 1ca283d..6c0729b 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache2k.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithCache2k.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -29,6 +30,7 @@ import static org.junit.Assert.*; public class TestSQLLookupServiceWithCache2k extends AbstractSQLLookupServiceTest { + private SQLLookupService sqlLookupService; @Before @@ -47,9 +49,11 @@ public void before() throws Exception { sqlLookupService = new SQLLookupService(); runner.addControllerService("SQLRecordLookupService", sqlLookupService); runner.setProperty(sqlLookupService, SQLLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); runner.setProperty(sqlLookupService, SQLLookupService.LOOKUP_VALUE_COLUMN, "VALUE"); - runner.setProperty(sqlLookupService, SQLLookupService.CACHING_LIBRARY, SQLLookupService.CACHING_LIBRARY_CACHE2k); + runner.setProperty(sqlLookupService, SQLLookupService.CACHING_LIBRARY, + SQLLookupService.CACHING_LIBRARY_CACHE2k); runner.setProperty(sqlLookupService, SQLLookupService.CACHE_SIZE, "10"); runner.enableControllerService(dbcpService); runner.enableControllerService(sqlLookupService); @@ -126,5 +130,4 @@ public void testRecordLookupEmpty() throws Exception { assertFalse(key.isPresent()); } - } diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithGuava.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithGuava.java new file mode 100644 index 0000000..91249a7 --- /dev/null +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLLookupServiceWithGuava.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mrcsparker.nifi.sqllookup; + +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.*; + +public class TestSQLLookupServiceWithGuava extends AbstractSQLLookupServiceTest { + + private SQLLookupService sqlLookupService; + + @Before + public void before() throws Exception { + TestProcessor testProcessor = new TestProcessor(); + runner = TestRunners.newTestRunner(testProcessor); + + // setup mock DBCP Service + DBCPService dbcpService = new DBCPServiceSimpleImpl(); + Map dbcpProperties = new HashMap<>(); + + runner.addControllerService("dbcpService", dbcpService, dbcpProperties); + runner.assertValid(dbcpService); + + // setup SQLRecordLookupService + sqlLookupService = new SQLLookupService(); + runner.addControllerService("SQLRecordLookupService", sqlLookupService); + runner.setProperty(sqlLookupService, SQLLookupService.CONNECTION_POOL, "dbcpService"); + runner.setProperty(sqlLookupService, SQLLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlLookupService, SQLRecordLookupService.CACHING_LIBRARY, + SQLRecordLookupService.CACHING_LIBRARY_GUAVA); + runner.setProperty(sqlLookupService, SQLLookupService.LOOKUP_VALUE_COLUMN, "VALUE"); + runner.setProperty(sqlLookupService, SQLLookupService.CACHING_LIBRARY, + SQLLookupService.CACHING_LIBRARY_CACHE2k); + runner.setProperty(sqlLookupService, SQLLookupService.CACHE_SIZE, "10"); + runner.enableControllerService(dbcpService); + runner.enableControllerService(sqlLookupService); + + setupDB(); + } + + @Test + public void testOnDisabled() throws Exception { + sqlLookupService.onDisabled(); + assertEquals(sqlLookupService.cache.asMap().size(), 0); + } + + @Test + public void testRecordLookup() throws Exception { + Map criteria = new HashMap<>(); + criteria.put("name", "458006613841984"); + + assertEquals(sqlLookupService.getCacheSize(), 0); + + for (int i = 0; i <= 10; i++) { + final Optional get1 = sqlLookupService.lookup(criteria); + assertTrue(get1.isPresent()); + assertEquals("The Glory and the Dream", get1.get()); + + assertEquals(sqlLookupService.getCacheSize(), 1); + } + } + + @Test + public void testRecordLookupMaxCaches() throws Exception { + assertEquals(sqlLookupService.getCacheSize(), 0); + + sqlLookupService.lookup(Collections.singletonMap("name", "458006613841984")); + assertEquals(sqlLookupService.getCacheSize(), 1); + + sqlLookupService.lookup(Collections.singletonMap("name", "456148015917293")); + assertEquals(sqlLookupService.getCacheSize(), 2); + + sqlLookupService.lookup(Collections.singletonMap("name", "526924199146123")); + assertEquals(sqlLookupService.getCacheSize(), 3); + + sqlLookupService.lookup(Collections.singletonMap("name", "860683959429897")); + assertEquals(sqlLookupService.getCacheSize(), 4); + + sqlLookupService.lookup(Collections.singletonMap("name", "528661513839698")); + assertEquals(sqlLookupService.getCacheSize(), 5); + + sqlLookupService.lookup(Collections.singletonMap("name", "355663598958946")); + assertEquals(sqlLookupService.getCacheSize(), 6); + + sqlLookupService.lookup(Collections.singletonMap("name", "911753660676323")); + assertEquals(sqlLookupService.getCacheSize(), 7); + + sqlLookupService.lookup(Collections.singletonMap("name", "997417069743624")); + assertEquals(sqlLookupService.getCacheSize(), 8); + + sqlLookupService.lookup(Collections.singletonMap("name", "986873446696583")); + assertEquals(sqlLookupService.getCacheSize(), 9); + + sqlLookupService.lookup(Collections.singletonMap("name", "990409804141864")); + assertEquals(sqlLookupService.getCacheSize(), 10); + + sqlLookupService.lookup(Collections.singletonMap("name", "990409804141864")); + assertEquals(sqlLookupService.getCacheSize(), 10); + } + + @Test + public void testRecordLookupEmpty() throws Exception { + Optional key = sqlLookupService.lookup(Collections.singletonMap("name", "is-a-null")); + assertFalse(key.isPresent()); + + key = sqlLookupService.lookup(Collections.singletonMap("name", "is-a-null")); + assertFalse(key.isPresent()); + } + +} diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupService.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupService.java index ab5762f..5dc8233 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupService.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupService.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -52,7 +53,8 @@ public void before() throws Exception { sqlRecordLookupService = new SQLRecordLookupService(); runner.addControllerService("SQLRecordLookupService", sqlRecordLookupService); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); runner.enableControllerService(dbcpService); runner.enableControllerService(sqlRecordLookupService); @@ -77,21 +79,24 @@ public void testOnDisabled() throws Exception { @Test public void testSimpleLookup0() throws Exception { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "547897511298456")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "547897511298456")); assertTrue(get1.isPresent()); assertEquals("Consider the Lilies", get1.get().getAsString("VALUE")); } @Test public void testSimpleLookup1() throws Exception { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "867142279069316")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "867142279069316")); assertTrue(get1.isPresent()); assertEquals("The Needles Eye", get1.get().getAsString("VALUE")); } @Test public void testSimpleLookup2() throws Exception { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "443771414357476")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "443771414357476")); assertTrue(get1.isPresent()); assertEquals("Françoise Sagan", get1.get().getAsString("VALUE")); } @@ -110,7 +115,8 @@ public void testInvalidLookup() throws Exception { @Test public void testRecordLookup() throws Exception { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "443771414357476")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "443771414357476")); assertTrue(get1.isPresent()); assertEquals("443771414357476", get1.get().getAsString("NAME")); assertEquals("Françoise Sagan", get1.get().getAsString("VALUE")); @@ -123,18 +129,16 @@ public void testRecordLookup() throws Exception { public void testArrayLookup() throws Exception { runner.disableControllerService(sqlRecordLookupService); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.USE_JDBC_TYPES, "true"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, "SELECT array_agg(VALUE ORDER BY NAME DESC) AS LOTSA FROM TEST_LOOKUP_DB WHERE NAME IN(:name)"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT array_agg(VALUE ORDER BY NAME DESC) AS LOTSA FROM TEST_LOOKUP_DB WHERE NAME IN(:name)"); runner.assertValid(sqlRecordLookupService); runner.enableControllerService(sqlRecordLookupService); Map criteria = new HashMap<>(); - criteria.put("name", Arrays.asList( - "990192861112958", - "012470853914233", - "912066265194017" - )); - String[] stringArray = {"Cabbages and Kings", "A Handful of Dust", "In Death Ground"}; - Object[] resultArray = DataTypeUtils.convertRecordArrayToJavaArray(stringArray, RecordFieldType.STRING.getDataType()); + criteria.put("name", Arrays.asList("990192861112958", "012470853914233", "912066265194017")); + String[] stringArray = { "Cabbages and Kings", "A Handful of Dust", "In Death Ground" }; + Object[] resultArray = DataTypeUtils + .convertRecordArrayToJavaArray(stringArray, RecordFieldType.STRING.getDataType()); final Optional get1 = sqlRecordLookupService.lookup(criteria); assertTrue(get1.isPresent()); @@ -145,11 +149,13 @@ public void testArrayLookup() throws Exception { @Test public void testExpressionLanguage() throws Exception { runner.disableControllerService(sqlRecordLookupService); - runner.setProperty(sqlRecordLookupService, SQLLookupService.SQL_QUERY, "${literal(\"SELECT * FROM TEST_LOOKUP_DB WHERE name = \"):append(\":name\"):append(\";\")}"); + runner.setProperty(sqlRecordLookupService, SQLLookupService.SQL_QUERY, + "${literal(\"SELECT * FROM TEST_LOOKUP_DB WHERE name = \"):append(\":name\"):append(\";\")}"); runner.assertValid(sqlRecordLookupService); runner.enableControllerService(sqlRecordLookupService); - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "443771414357476")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "443771414357476")); assertTrue(get1.isPresent()); assertEquals("443771414357476", get1.get().getAsString("NAME")); assertEquals("Françoise Sagan", get1.get().getAsString("VALUE")); diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache.java index a4e66f0..d1d36c7 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -50,7 +51,8 @@ public void before() throws Exception { sqlRecordLookupService = new SQLRecordLookupService(); runner.addControllerService("SQLRecordLookupService", sqlRecordLookupService); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHE_SIZE, "10"); runner.enableControllerService(dbcpService); @@ -65,7 +67,8 @@ public void testRecordLookup() throws Exception { assertEquals(sqlRecordLookupService.getCacheSize(), 0); for (int i = 0; i <= 10; i++) { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "458006613841984")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "458006613841984")); assertTrue(get1.isPresent()); assertEquals("458006613841984", get1.get().getAsString("NAME")); assertEquals("The Glory and the Dream", get1.get().getAsString("VALUE")); @@ -119,5 +122,4 @@ public void testRecordLookupMaxCaches() throws Exception { assertEquals(sqlRecordLookupService.getCacheSize(), 10); } - } diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache2k.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache2k.java index 63a38c0..b2fb4d2 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache2k.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithCache2k.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.mrcsparker.nifi.sqllookup; import org.apache.nifi.dbcp.DBCPService; @@ -50,8 +51,10 @@ public void before() throws Exception { sqlRecordLookupService = new SQLRecordLookupService(); runner.addControllerService("SQLRecordLookupService", sqlRecordLookupService); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHING_LIBRARY, SQLRecordLookupService.CACHING_LIBRARY_CACHE2k); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHING_LIBRARY, + SQLRecordLookupService.CACHING_LIBRARY_CACHE2k); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHE_SIZE, "10"); runner.enableControllerService(dbcpService); @@ -63,10 +66,11 @@ public void before() throws Exception { @Test public void testRecordLookup() throws Exception { - assertEquals(sqlRecordLookupService.getCacheSize(), 0); + assertEquals(0, sqlRecordLookupService.getCacheSize()); for (int i = 0; i <= 10; i++) { - final Optional get1 = sqlRecordLookupService.lookup(Collections.singletonMap("name", "458006613841984")); + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "458006613841984")); assertTrue(get1.isPresent()); assertEquals("458006613841984", get1.get().getAsString("NAME")); assertEquals("The Glory and the Dream", get1.get().getAsString("VALUE")); @@ -74,51 +78,50 @@ public void testRecordLookup() throws Exception { assertEquals("84164 Gleason Branch", get1.get().getAsString("ADDRESS")); assertEquals(300.34, get1.get().getAsDouble("PRICE"), 1.0); - assertEquals(sqlRecordLookupService.getCacheSize(), 1); + assertEquals(1, sqlRecordLookupService.getCacheSize()); } } @Test public void testRecordLookupMaxCaches() throws Exception { - assertEquals(sqlRecordLookupService.getCacheSize(), 0); + assertEquals(0, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "458006613841984")); - assertEquals(sqlRecordLookupService.getCacheSize(), 1); + assertEquals(1, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "456148015917293")); - assertEquals(sqlRecordLookupService.getCacheSize(), 2); + assertEquals(2, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "526924199146123")); - assertEquals(sqlRecordLookupService.getCacheSize(), 3); + assertEquals(3, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "860683959429897")); - assertEquals(sqlRecordLookupService.getCacheSize(), 4); + assertEquals(4, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "528661513839698")); - assertEquals(sqlRecordLookupService.getCacheSize(), 5); + assertEquals(5, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "355663598958946")); - assertEquals(sqlRecordLookupService.getCacheSize(), 6); + assertEquals(6, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "911753660676323")); - assertEquals(sqlRecordLookupService.getCacheSize(), 7); + assertEquals(7, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "997417069743624")); - assertEquals(sqlRecordLookupService.getCacheSize(), 8); + assertEquals(8, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "986873446696583")); - assertEquals(sqlRecordLookupService.getCacheSize(), 9); + assertEquals(9, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); - assertEquals(sqlRecordLookupService.getCacheSize(), 10); + assertEquals(10, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); - assertEquals(sqlRecordLookupService.getCacheSize(), 10); + assertEquals(10, sqlRecordLookupService.getCacheSize()); sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); - assertEquals(sqlRecordLookupService.getCacheSize(), 10); + assertEquals(10, sqlRecordLookupService.getCacheSize()); } - } diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithGuava.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithGuava.java new file mode 100644 index 0000000..833d43c --- /dev/null +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithGuava.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mrcsparker.nifi.sqllookup; + +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSQLRecordLookupServiceWithGuava extends AbstractSQLLookupServiceTest { + + private SQLRecordLookupService sqlRecordLookupService; + + @Before + public void before() throws Exception { + TestProcessor testProcessor = new TestProcessor(); + runner = TestRunners.newTestRunner(testProcessor); + + // setup mock DBCP Service + DBCPService dbcpService = new DBCPServiceSimpleImpl(); + Map dbcpProperties = new HashMap<>(); + + runner.addControllerService("dbcpService", dbcpService, dbcpProperties); + runner.assertValid(dbcpService); + + // setup SQLRecordLookupService + sqlRecordLookupService = new SQLRecordLookupService(); + runner.addControllerService("SQLRecordLookupService", sqlRecordLookupService); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CONNECTION_POOL, "dbcpService"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT * FROM TEST_LOOKUP_DB WHERE name = :name"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHING_LIBRARY, + SQLRecordLookupService.CACHING_LIBRARY_GUAVA); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CACHE_SIZE, "10"); + + runner.enableControllerService(dbcpService); + runner.enableControllerService(sqlRecordLookupService); + + setupDB(); + } + + @Test + public void testRecordLookup() throws Exception { + + assertEquals(sqlRecordLookupService.getCacheSize(), 0); + + for (int i = 0; i <= 10; i++) { + final Optional get1 = sqlRecordLookupService + .lookup(Collections.singletonMap("name", "458006613841984")); + assertTrue(get1.isPresent()); + assertEquals("458006613841984", get1.get().getAsString("NAME")); + assertEquals("The Glory and the Dream", get1.get().getAsString("VALUE")); + assertEquals(2, get1.get().getAsInt("PERIOD").intValue()); + assertEquals("84164 Gleason Branch", get1.get().getAsString("ADDRESS")); + assertEquals(300.34, get1.get().getAsDouble("PRICE"), 1.0); + + assertEquals(sqlRecordLookupService.getCacheSize(), 1); + } + } + + @Test + public void testRecordLookupMaxCaches() throws Exception { + + assertEquals(sqlRecordLookupService.getCacheSize(), 0); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "458006613841984")); + assertEquals(sqlRecordLookupService.getCacheSize(), 1); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "456148015917293")); + assertEquals(sqlRecordLookupService.getCacheSize(), 2); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "526924199146123")); + assertEquals(sqlRecordLookupService.getCacheSize(), 3); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "860683959429897")); + assertEquals(sqlRecordLookupService.getCacheSize(), 4); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "528661513839698")); + assertEquals(sqlRecordLookupService.getCacheSize(), 5); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "355663598958946")); + assertEquals(sqlRecordLookupService.getCacheSize(), 6); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "911753660676323")); + assertEquals(sqlRecordLookupService.getCacheSize(), 7); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "997417069743624")); + assertEquals(sqlRecordLookupService.getCacheSize(), 8); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "986873446696583")); + assertEquals(sqlRecordLookupService.getCacheSize(), 9); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); + assertEquals(sqlRecordLookupService.getCacheSize(), 10); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); + assertEquals(sqlRecordLookupService.getCacheSize(), 10); + + sqlRecordLookupService.lookup(Collections.singletonMap("name", "990409804141864")); + assertEquals(sqlRecordLookupService.getCacheSize(), 10); + } + +} diff --git a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithSchema.java b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithSchema.java index 1c525d9..8fd1ad3 100644 --- a/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithSchema.java +++ b/nifi-sqllookup-services/src/test/java/com/mrcsparker/nifi/sqllookup/TestSQLRecordLookupServiceWithSchema.java @@ -31,18 +31,22 @@ public void setup() throws Exception { runner = TestRunners.newTestRunner(LookupRecord.class); runner.setProperty("period", "/period"); - final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/period-names.avsc"))); + final String inputSchemaText = new String( + Files.readAllBytes(Paths.get("src/test/resources/period-names.avsc"))); JsonTreeReader recordReader = new JsonTreeReader(); runner.addControllerService("reader", recordReader); - runner.setProperty(recordReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(recordReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, + SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(recordReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); runner.assertValid(recordReader); runner.enableControllerService(recordReader); - final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/period-names.avsc"))); + final String outputSchemaText = new String( + Files.readAllBytes(Paths.get("src/test/resources/period-names.avsc"))); JsonRecordSetWriter recordWriter = new JsonRecordSetWriter(); runner.addControllerService("writer", recordWriter); - runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, + SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(recordWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); runner.setProperty(recordWriter, "Pretty Print JSON", "true"); runner.setProperty(recordWriter, "Schema Write Strategy", "full-schema-attribute"); @@ -61,7 +65,8 @@ public void setup() throws Exception { sqlRecordLookupService = new SQLRecordLookupService(); runner.addControllerService("SQLRecordLookupService", sqlRecordLookupService); runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.CONNECTION_POOL, "dbcpService"); - runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, "SELECT array_agg(VALUE ORDER BY NAME DESC) AS names FROM TEST_LOOKUP_DB WHERE PERIOD IN(:period)"); + runner.setProperty(sqlRecordLookupService, SQLRecordLookupService.SQL_QUERY, + "SELECT array_agg(VALUE ORDER BY NAME DESC) AS names FROM TEST_LOOKUP_DB WHERE PERIOD IN(:period)"); runner.assertValid(sqlRecordLookupService); runner.enableControllerService(sqlRecordLookupService); @@ -70,7 +75,7 @@ public void setup() throws Exception { @Test public void testCorrectKeys() throws Exception { - assertEquals(sqlRecordLookupService.getRequiredKeys(), Collections.emptySet()); + assertEquals(Collections.emptySet(), sqlRecordLookupService.getRequiredKeys()); } @Test diff --git a/pom.xml b/pom.xml index e91357a..9388342 100644 --- a/pom.xml +++ b/pom.xml @@ -19,30 +19,31 @@ org.apache.nifi nifi-nar-bundles - 1.10.0 + 1.12.0 1.8 1.8 3.1.0 - 1.10.0 - 2.8.1 - 1.2.4.Final + 1.12.0 + 2.8.5 + 1.4.0.Final + 28.0-jre 1.4.200 - 4.12 + 4.13 1.7.30 - 0.8.5 - 3.2.4 + 0.8.6 + 3.5.11 2.7.0 - 5.2.3.RELEASE - 2.4.1 + 5.2.9.RELEASE + 2.5.1 com.mrcsparker nifi-sqllookup-services-bundle pom - 1.10.0 + 1.12.0 nifi-sqllookup-services