Skip to content

Commit

Permalink
Merge pull request #65 from katamotokosuke/add-auth-method-that-is-ke…
Browse files Browse the repository at this point in the history
…y-pair-authentication

Support key pair authentication
  • Loading branch information
u110 authored Nov 21, 2023
2 parents 41e976a + df15f33 commit a0b186f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ build/
.classpath
.project
config.yml
config.yaml
default_jdbc_driver/
/bin/
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Snowflake output plugin for Embulk loads records to Snowflake.
- **host**: database host name (string, required)
- **user**: database login user name (string, required)
- **password**: database login password (string, default: "")
- **privateKey**: database login using key-pair authentication(string, default: ""). This authentication method requires a 2048-bit (minimum) RSA key pair.
- **warehouse**: destination warehouse name (string, required)
- **database**: destination database name (string, required)
- **schema**: destination schema name (string, default: "public")
Expand Down Expand Up @@ -58,6 +59,10 @@ Snowflake output plugin for Embulk loads records to Snowflake.

## Build

## Not implement
- Passphrase for `privateKey` in key-pair authentication.


```
$ ./gradlew gem # -t to watch change of files and rebuild continuously
```
23 changes: 21 additions & 2 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import java.sql.Types;
import java.util.*;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskSource;
import org.embulk.output.jdbc.*;
import org.embulk.output.snowflake.SnowflakeCopyBatchInsert;
import org.embulk.output.snowflake.SnowflakeOutputConnection;
import org.embulk.output.snowflake.SnowflakeOutputConnector;
import org.embulk.output.snowflake.StageIdentifier;
import org.embulk.output.snowflake.StageIdentifierHolder;
import org.embulk.output.snowflake.PrivateKeyReader;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.OutputPlugin;
Expand All @@ -38,6 +40,10 @@ public interface SnowflakePluginTask extends PluginTask {
@ConfigDefault("\"\"")
public String getPassword();

@Config("privateKey")
@ConfigDefault("\"\"")
String getPrivateKey();

@Config("database")
public String getDatabase();

Expand Down Expand Up @@ -92,7 +98,17 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
Properties props = new Properties();

props.setProperty("user", t.getUser());
props.setProperty("password", t.getPassword());
if (!t.getPassword().isEmpty()) {
props.setProperty("password", t.getPassword());
} else if (!t.getPrivateKey().isEmpty()) {
try {
props.put("privateKey", PrivateKeyReader.get(t.getPrivateKey()));
} catch (IOException e) {
// Because the source of newConnection definition does not assume IOException, change it to ConfigException.
throw new ConfigException(e);
}
}

props.setProperty("warehouse", t.getWarehouse());
props.setProperty("db", t.getDatabase());
props.setProperty("schema", t.getSchema());
Expand Down Expand Up @@ -170,11 +186,14 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
@Override
protected void logConnectionProperties(String url, Properties props) {
Properties maskedProps = new Properties();
for (String key : props.stringPropertyNames()) {
for (Object keyObj : props.keySet()) {
String key = (String) keyObj;
if (key.equals("password")) {
maskedProps.setProperty(key, "***");
} else if (key.equals("proxyPassword")) {
maskedProps.setProperty(key, "***");
} else if (key.equals("privateKey")) {
maskedProps.setProperty(key, "***");
} else {
maskedProps.setProperty(key, props.getProperty(key));
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/org/embulk/output/snowflake/PrivateKeyReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.embulk.output.snowflake;

import net.snowflake.client.jdbc.internal.org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import net.snowflake.client.jdbc.internal.org.bouncycastle.jce.provider.BouncyCastleProvider;
import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.PEMParser;
import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;

import java.io.IOException;
import java.io.StringReader;
import java.security.PrivateKey;
import java.security.Security;

// ref: https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-configure#privatekey-property-in-connection-properties
public class PrivateKeyReader
{
public static PrivateKey get(String pemString) throws IOException {
Security.addProvider(new BouncyCastleProvider());
PEMParser pemParser = new PEMParser(new StringReader(pemString));
Object pemObject = pemParser.readObject();
pemParser.close();

PrivateKeyInfo privateKeyInfo;
if (pemObject instanceof PrivateKeyInfo) {
privateKeyInfo = (PrivateKeyInfo) pemObject;
} else {
throw new IllegalArgumentException("Provided PEM does not contain a valid Private Key");
}
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
return converter.getPrivateKey(privateKeyInfo);
}

}

0 comments on commit a0b186f

Please sign in to comment.