Skip to content

Commit

Permalink
Use the runner version that supports the cli
Browse files Browse the repository at this point in the history
  • Loading branch information
oskarskog committed Jun 6, 2024
1 parent 9060938 commit 0bc2618
Showing 1 changed file with 63 additions and 24 deletions.
87 changes: 63 additions & 24 deletions src/main/java/io/ecraft/SqlRunner.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package io.ecraft;

import jdk.jpackage.internal.Log;
import org.apache.hadoop.fs.FileUtil;
import java.util.zip.*;
import java.util.*;
import java.util.stream.Collectors;
import java.io.*;
import java.nio.file.*;
import org.apache.commons.io.FilenameUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -19,16 +25,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


public class SqlRunner {
private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class);

Expand All @@ -44,21 +40,22 @@ public static void main(String[] args) throws Exception {
throw new Exception("Exactly one argument is expected.");
}

// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// LOG.info("Checkpoint storage: {}",env.getCheckpointConfig().getCheckpointStorage());
// LOG.info("Checkpointing mode: {}", env.getCheckpointConfig().getCheckpointingMode());
// LOG.info("Checkpointing interval: {}", env.getCheckpointInterval());
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

CatalogStore catalogStore = new FileCatalogStore(System.getenv("TABLE_CATALOG_STORE_FILE_PATH"));
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.withCatalogStore(catalogStore)
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.useCatalog("hive");
String name = "hive";
String defaultDatabase = "default";
String hiveConfDir = "/conf/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

LOG.debug("Current catalog: {}", tableEnv.getCurrentCatalog());
Expand All @@ -69,10 +66,37 @@ public static void main(String[] args) throws Exception {
LOG.debug(" - {}", t);
}

Path sqlPath = new Path(args[0]);
FileSystem fs = sqlPath.getFileSystem();
FSDataInputStream inputStream = fs.open(sqlPath);
InputStreamReader reader = new InputStreamReader(inputStream);
// Read the tar file from azure blob store to a local file
Path remoteArchivePath = new Path(args[0]);
FileSystem remoteArchiveFs = remoteArchivePath.getFileSystem();
FSDataInputStream remoteArchiveStream = remoteArchiveFs.open(remoteArchivePath);
// We name everything after the full name of the archive without extension (including hashes)
String jobName = FilenameUtils.getBaseName(remoteArchivePath.getName());

// Make sure we have the directory for the job files
Files.createDirectories(Paths.get("/tmp/"+ jobName));

// Download the file into the directory
String archiveDownloadPath = "/tmp/"+ jobName + "/" + remoteArchivePath.getName();
FileOutputStream fos = new FileOutputStream(archiveDownloadPath);
transferTo(remoteArchiveStream, fos);

// Uncompress the contents of the zip file to a local directory
ZipFile zipFile = new ZipFile(archiveDownloadPath);
Enumeration<? extends ZipEntry> zipFileEntries = zipFile.entries();
while(zipFileEntries.hasMoreElements()) {
ZipEntry entry = zipFileEntries.nextElement();
FileOutputStream zipEntryOutputStream = new FileOutputStream("/tmp/" + jobName + "/" + entry.getName());
InputStream zipInputStream = zipFile.getInputStream(entry);
transferTo(zipInputStream, zipEntryOutputStream);
}

// Read the sql file
String sqlName = remoteArchivePath.getName().substring(0, remoteArchivePath.getName().lastIndexOf("-")) + ".sql";
Path sqlPath = new Path("/tmp/" + jobName + "/" + sqlName);
FileSystem sqlFs = sqlPath.getFileSystem();
FSDataInputStream sqlInputStream = sqlFs.open(sqlPath);
InputStreamReader reader = new InputStreamReader(sqlInputStream);
String script = new BufferedReader(reader).lines().parallel().collect(Collectors.joining("\n"));

List<String> statements = parseStatements(script, SqlRunner.loadEnvironment());
Expand All @@ -83,6 +107,21 @@ public static void main(String[] args) throws Exception {
}
}

public static void transferTo(InputStream input, OutputStream output) throws IOException {
try {
byte[] buffer = new byte[1024];
int len = input.read(buffer);
while (len != -1) {
output.write(buffer, 0, len);
len = input.read(buffer);
}
input.close();
output.close();
} catch (IOException e) {
LOG.debug("Failed transferTo:\n{}", e.getMessage());
throw e;
}
}


public static Map<String, String> loadEnvironment() {
Expand Down

0 comments on commit 0bc2618

Please sign in to comment.