Skip to content

Commit

Permalink
upgrade to ruby 3 to use async
Browse files Browse the repository at this point in the history
Use async for concurrency during IO
  • Loading branch information
wr0ngway committed Jul 6, 2021
1 parent f72bbd7 commit 1b82bad
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.7.3
3.0.1
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ruby:2.7-alpine AS base
FROM ruby:3.0-alpine AS base

ENV APP_DIR="/srv/app" \
BUNDLE_PATH="/srv/bundler" \
Expand Down
3 changes: 2 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ gem 'clamp'
gem 'graphql-client'
gem 'kubeclient'
gem 'liquid'
gem 'yaml-safe_load_stream'
gem 'yaml-safe_load_stream', git: "https://github.com/wr0ngway/yaml-safe_load_stream.git", branch: "ruby_3"
gem 'async'
22 changes: 19 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
GIT
remote: https://github.com/wr0ngway/yaml-safe_load_stream.git
revision: 3c8bcd30369ac87c42ef04e6e2f6727f44936233
branch: ruby_3
specs:
yaml-safe_load_stream (0.1.2)

GEM
remote: https://rubygems.org/
specs:
Expand All @@ -9,12 +16,18 @@ GEM
zeitwerk (~> 2.3)
addressable (2.7.0)
public_suffix (>= 2.0.2, < 5.0)
async (1.29.1)
console (~> 1.10)
nio4r (~> 2.3)
timers (~> 4.1)
byebug (11.1.3)
clamp (1.3.2)
codecov (0.5.2)
simplecov (>= 0.15, < 0.22)
coderay (1.1.3)
concurrent-ruby (1.1.9)
console (1.13.1)
fiber-local
crack (0.4.5)
rexml
diff-lcs (1.4.4)
Expand All @@ -25,6 +38,7 @@ GEM
ffi-compiler (1.0.1)
ffi (>= 1.0.0)
rake
fiber-local (1.0.0)
gem_logger (0.3.0)
activesupport
graphql (1.12.12)
Expand Down Expand Up @@ -64,6 +78,7 @@ GEM
minitest (5.14.4)
multi_json (1.15.0)
netrc (0.11.0)
nio4r (2.5.7)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
Expand Down Expand Up @@ -98,6 +113,7 @@ GEM
simplecov_json_formatter (~> 0.1)
simplecov-html (0.12.3)
simplecov_json_formatter (0.1.3)
timers (4.3.3)
tzinfo (2.0.4)
concurrent-ruby (~> 1.0)
unf (0.1.4)
Expand All @@ -108,14 +124,14 @@ GEM
addressable (>= 2.3.6)
crack (>= 0.3.2)
hashdiff (>= 0.4.0, < 2.0.0)
yaml-safe_load_stream (0.1.1)
zeitwerk (2.4.2)

PLATFORMS
ruby

DEPENDENCIES
activesupport
async
clamp
codecov
gem_logger
Expand All @@ -130,7 +146,7 @@ DEPENDENCIES
simplecov
vcr
webmock
yaml-safe_load_stream
yaml-safe_load_stream!

BUNDLED WITH
2.1.4
2.2.15
156 changes: 84 additions & 72 deletions lib/kubetruth/etl.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'benchmark'
require 'yaml'
require 'async'
require 'yaml/safe_load_stream'
using YAMLSafeLoadStream

Expand Down Expand Up @@ -83,96 +84,107 @@ def load_config
primary_mappings = mappings_by_ns.delete(kubeapi.namespace)
raise Error.new("A default set of mappings is required in the namespace kubetruth is installed in (#{kubeapi.namespace})") unless primary_mappings

primary_config = Kubetruth::Config.new(primary_mappings.values)
logger.info {"Processing primary mappings for namespace '#{kubeapi.namespace}'"}
configs << primary_config
yield kubeapi.namespace, primary_config if block_given?
Async(annotation: "Primary Config: #{kubeapi.namespace}") do
primary_config = Kubetruth::Config.new(primary_mappings.values)
logger.info {"Processing primary mappings for namespace '#{kubeapi.namespace}'"}
configs << primary_config
yield kubeapi.namespace, primary_config if block_given?
end

mappings_by_ns.each do |namespace, ns_mappings|
secondary_mappings = primary_mappings.deep_merge(ns_mappings)
secondary_config = Kubetruth::Config.new(secondary_mappings.values)
logger.info {"Processing secondary mappings for namespace '#{namespace}'"}
configs << secondary_config
yield namespace, secondary_config if block_given?
Async(annotation: "Secondary Config: #{namespace}") do
secondary_mappings = primary_mappings.deep_merge(ns_mappings)
secondary_config = Kubetruth::Config.new(secondary_mappings.values)
logger.info {"Processing secondary mappings for namespace '#{namespace}'"}
configs << secondary_config
yield namespace, secondary_config if block_given?
end
end

configs
end

def apply
logger.warn("Performing dry-run") if @dry_run

load_config do |namespace, config|
project_collection = ProjectCollection.new

# Load all projects that are used
all_specs = [config.root_spec] + config.override_specs
project_selectors = all_specs.collect(&:project_selector)
included_projects = all_specs.collect(&:included_projects).flatten.uniq

project_collection.names.each do |project_name|
active = included_projects.any? {|p| p == project_name }
active ||= project_selectors.any? {|s| s =~ project_name }
if active
project_spec = config.spec_for_project(project_name)
project_collection.create_project(name: project_name, spec: project_spec)
Async(annotation: "ETL Event Loop") do
logger.warn("Performing dry-run") if @dry_run

load_config do |namespace, config|
project_collection = ProjectCollection.new

# Load all projects that are used
all_specs = [config.root_spec] + config.override_specs
project_selectors = all_specs.collect(&:project_selector)
included_projects = all_specs.collect(&:included_projects).flatten.uniq

project_collection.names.each do |project_name|
active = included_projects.any? {|p| p == project_name }
active ||= project_selectors.any? {|s| s =~ project_name }
if active
project_spec = config.spec_for_project(project_name)
project_collection.create_project(name: project_name, spec: project_spec)
end
end
end

project_collection.projects.values.each do |project|
project_collection.projects.values.each do |project|

match = project.name.match(project.spec.project_selector)
if match.nil?
logger.info "Skipping project '#{project.name}' as it does not match any selectors"
next
end
match = project.name.match(project.spec.project_selector)
if match.nil?
logger.info "Skipping project '#{project.name}' as it does not match any selectors"
next
end

if project.spec.skip
logger.info "Skipping project '#{project.name}'"
next
end
if project.spec.skip
logger.info "Skipping project '#{project.name}'"
next
end

# constructing the hash will cause any overrides to happen in the right
# order (includer wins over last included over first included)
params = project.all_parameters
parts = params.group_by(&:secret)
config_params, secret_params = (parts[false] || []), (parts[true] || [])
config_param_hash = params_to_hash(config_params)
secret_param_hash = params_to_hash(secret_params)

parameter_origins = project.parameter_origins
param_origins_parts = parameter_origins.group_by {|k, v| config_param_hash.has_key?(k) }
config_origins = Hash[param_origins_parts[true] || []]
secret_origins = Hash[param_origins_parts[false] || []]

project.spec.resource_templates.each_with_index do |pair, i|
template_name, template = *pair
logger.debug { "Processing template '#{template_name}' (#{i+1}/#{project.spec.resource_templates.size})" }
resource_yml = template.render(
template: template_name,
kubetruth_namespace: kubeapi.namespace,
mapping_namespace: namespace,
project: project.name,
project_heirarchy: project.heirarchy,
debug: logger.debug?,
parameters: config_param_hash,
parameter_origins: config_origins,
secrets: secret_param_hash,
secret_origins: secret_origins,
context: project.spec.context
)

template_id = "mapping: #{project.spec.name}, mapping_namespace: #{namespace}, project: #{project.name}, template: #{template_name}"
parsed_ymls = YAML.safe_load_stream(resource_yml, template_id)
logger.debug {"Skipping empty template"} if parsed_ymls.empty?
parsed_ymls.each do |parsed_yml|
kube_apply(parsed_yml)
Async(annotation: "Project: #{project.name}") do

# constructing the hash will cause any overrides to happen in the right
# order (includer wins over last included over first included)
params = project.all_parameters
parts = params.group_by(&:secret)
config_params, secret_params = (parts[false] || []), (parts[true] || [])
config_param_hash = params_to_hash(config_params)
secret_param_hash = params_to_hash(secret_params)

parameter_origins = project.parameter_origins
param_origins_parts = parameter_origins.group_by {|k, v| config_param_hash.has_key?(k) }
config_origins = Hash[param_origins_parts[true] || []]
secret_origins = Hash[param_origins_parts[false] || []]

project.spec.resource_templates.each_with_index do |pair, i|
template_name, template = *pair
logger.debug { "Processing template '#{template_name}' (#{i+1}/#{project.spec.resource_templates.size})" }
resource_yml = template.render(
template: template_name,
kubetruth_namespace: kubeapi.namespace,
mapping_namespace: namespace,
project: project.name,
project_heirarchy: project.heirarchy,
debug: logger.debug?,
parameters: config_param_hash,
parameter_origins: config_origins,
secrets: secret_param_hash,
secret_origins: secret_origins,
context: project.spec.context
)

template_id = "mapping: #{project.spec.name}, mapping_namespace: #{namespace}, project: #{project.name}, template: #{template_name}"
parsed_ymls = YAML.safe_load_stream(resource_yml, template_id)
logger.debug {"Skipping empty template"} if parsed_ymls.empty?
parsed_ymls.each do |parsed_yml|
Async(annotation: "Apply Template: #{template_id}") do
kube_apply(parsed_yml)
end
end

end
end

end
end
end

end

def params_to_hash(param_list)
Expand Down
22 changes: 11 additions & 11 deletions spec/kubetruth/etl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Kubetruth
let(:init_args) {{
kube_context: {}
}}
let(:etl) { described_class.new(init_args) }
let(:etl) { described_class.new(**init_args) }

def kubeapi
kapi = double(Kubetruth::KubeApi)
Expand Down Expand Up @@ -36,7 +36,7 @@ def kubeapi
end

it "is memoized" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
allow(Kubetruth::KubeApi).to receive(:new)
expect(etl.kubeapi).to equal(etl.kubeapi)
end
Expand All @@ -46,14 +46,14 @@ def kubeapi
describe "#interruptible_sleep" do

it "runs for interval without interruption" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
t = Time.now.to_f
etl.interruptible_sleep(0.2)
expect(Time.now.to_f - t).to be >= 0.2
end

it "can be interrupted" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
Thread.new do
sleep 0.1
etl.interrupt_sleep
Expand All @@ -70,7 +70,7 @@ def kubeapi
class ForceExit < Exception; end

it "runs with an interval" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)

watcher = double()
expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice
Expand All @@ -93,7 +93,7 @@ class ForceExit < Exception; end
end

it "isolates run loop from block failures" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)

watcher = double()
expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice
Expand All @@ -116,7 +116,7 @@ class ForceExit < Exception; end
end

it "interrupts sleep on watch event" do
etl = described_class.new(init_args)
etl = described_class.new(**init_args)

watcher = double()
notice = double("notice", type: "UPDATED", object: double("kube_resource"))
Expand Down Expand Up @@ -145,7 +145,7 @@ class ForceExit < Exception; end
it "raises if no primary" do
allow(@kubeapi).to receive(:namespace).and_return("primary-ns")
expect(@kubeapi).to receive(:get_project_mappings).and_return({})
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
expect { etl.load_config }.to raise_error(Kubetruth::Error, /A default set of mappings is required/)
end

Expand All @@ -159,7 +159,7 @@ class ForceExit < Exception; end
"override2" => Config::DEFAULT_SPEC.merge(scope: "override", name: "override2")
}
})
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
configs = etl.load_config
expect(configs.size).to eq(1)
expect(configs.first).to be_an_instance_of(Kubetruth::Config)
Expand All @@ -180,7 +180,7 @@ class ForceExit < Exception; end
"override1" => Config::DEFAULT_SPEC.merge(scope: "override", name: "override1")
}
})
etl = described_class.new(init_args)
etl = described_class.new(**init_args)
configs = etl.load_config
expect(configs.size).to eq(2)
expect(configs.first).to be_an_instance_of(Kubetruth::Config)
Expand All @@ -206,7 +206,7 @@ class ForceExit < Exception; end
"myroot" => Config::DEFAULT_SPEC.merge(scope: "root", name: "myroot", environment: "env3"),
}
})
etl = described_class.new(init_args)
etl = described_class.new(**init_args)

nses = ["primary-ns", "other-ns", "yetanother-ns"]
envs = ["default", "otherenv", "env3"]
Expand Down

0 comments on commit 1b82bad

Please sign in to comment.