Skip to content

Commit

Permalink
interrupt polling sleep on CRD changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wr0ngway committed Apr 29, 2021
1 parent 4f91947 commit d3aaaa5
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 33 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ the standard ways, e.g. `kubectl edit projectmapping kubetruth-root`. The
its `project_selector` pattern against the CloudTruth project names already
selected by the root `project_selector`.

Note that Kubetruth watches for changes to ProjectMappings, so touching any of
them wakes it up from a polling sleep. This makes it quick and easy to test out
configuration changes without having a short polling interval.

### Example Config

The `projectmapping` resource has a shortname of `pm` for convenience when using kubectl.
Expand All @@ -138,6 +142,8 @@ spec:
skip: true
EOF
```
Note that project imports are non-recursive, so if A imports B and B imports C,
then A will only get B's parameters.

To override the naming of kubernetes Resources on a per-Project basis:
```
Expand Down
2 changes: 1 addition & 1 deletion helm/kubetruth/crds/projectmapping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spec:
type: array
items:
type: string
description: Include the parameters from other projects into the selected ones
description: Include the parameters from other projects into the selected ones. This is non-recursive, so if A imports B and B imports C, then A will only get B's parameters
required:
- scope
additionalPrinterColumns:
Expand Down
19 changes: 3 additions & 16 deletions lib/kubetruth/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,10 @@ def execute
api_url: kube_url
}

etl = ETL.new(ct_context: ct_context, kube_context: kube_context)

while true

begin
etl.apply(dry_run: dry_run?)
rescue => e
logger.log_exception(e, "Failure while applying config transforms")
end

logger.debug("Poller sleeping for #{polling_interval}")
if dry_run?
break
else
sleep polling_interval
end
etl = ETL.new(ct_context: ct_context, kube_context: kube_context, dry_run: dry_run?)

etl.with_polling(polling_interval) do
etl.apply
end

end
Expand Down
57 changes: 52 additions & 5 deletions lib/kubetruth/etl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ class ETL
# From kubernetes error message
DNS_VALIDATION_RE = /^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$/

def initialize(ct_context:, kube_context:)
def initialize(ct_context:, kube_context:, dry_run: false)
@ct_context = ct_context
@kube_context = kube_context
@dry_run = dry_run
@kubeapis = {}
end

Expand All @@ -30,12 +31,60 @@ def kubeapi(namespace)
@kubeapis[namespace] ||= KubeApi.new(**@kube_context.merge(namespace: namespace))
end

def interruptible_sleep(interval)
@sleeper = Thread.current
Kernel.sleep interval
end

def interrupt_sleep
Thread.new { @sleeper&.run }
end

def with_polling(interval, &block)
while true

begin
watcher = kubeapi(@kube_context[:namespace]).watch_project_mappings

begin
thr = Thread.new do
logger.debug "Created watcher for CRD"
watcher.each do |notice|
logger.debug {"Interrupting polling sleep, CRD watcher woke up for: #{notice}"}
interrupt_sleep
break
end
logger.debug "CRD watcher exiting"
end

begin
block.call
rescue => e
logger.log_exception(e, "Failure while applying config transforms")
end

logger.debug("Poller sleeping for #{interval}")
interruptible_sleep(interval)
ensure
watcher.finish
thr.join
end

rescue => e
logger.log_exception(e, "Failure in watch/polling logic")
end

end
end

def load_config
mappings = kubeapi(@kube_context[:namespace]).get_project_mappings
Kubetruth::Config.new(mappings)
end

def apply(dry_run: false)
def apply
logger.warn("Performing dry-run") if @dry_run

config = load_config

projects = ctapi.project_names
Expand Down Expand Up @@ -99,9 +148,7 @@ def apply(dry_run: false)
config_param_hash = params_to_hash(config_params)
secret_param_hash = params_to_hash(secret_params)

if dry_run
logger.info("Performing dry-run")

if @dry_run
logger.info("Config maps that would be created are:")
logger.info(config_params.pretty_print_inspect)

Expand Down
7 changes: 7 additions & 0 deletions lib/kubetruth/kubeapi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,12 @@ def delete_secret(name)
def get_project_mappings
crdclient.get_project_mappings(namespace: namespace).collect(&:spec).collect(&:to_h)
end

def watch_project_mappings(&block)
existing = crdclient.get_project_mappings(namespace: namespace)
collection_version = existing.resourceVersion
crdclient.watch_project_mappings(namespace: namespace, resource_version: collection_version, &block)
end

end
end
14 changes: 4 additions & 10 deletions spec/kubetruth/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def all_usage(clazz, path=[])
--kube-token kt
--kube-url ku
--dry-run
--polling-interval 27
]
etl = double(ETL)
expect(ETL).to receive(:new).with(ct_context: {
Expand All @@ -105,19 +106,12 @@ def all_usage(clazz, path=[])
namespace: "kn",
token: "kt",
api_url: "ku"
}).and_return(etl)
expect(etl).to receive(:apply).with(dry_run: true)
},
dry_run: true).and_return(etl)
expect(etl).to receive(:with_polling).with(27)
cli.run(args)
end

it "polls at interval" do
etl = double(ETL)
expect(ETL).to receive(:new).and_return(etl)
expect(etl).to receive(:apply)
expect(cli).to receive(:sleep).with(27).and_raise(SystemExit)
expect { cli.run(%w[--api-key abc123 --polling-interval 27]) }.to raise_error(SystemExit)
end

end

end
Expand Down
100 changes: 99 additions & 1 deletion spec/kubetruth/etl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,103 @@ def kubeapi(ns)

end

describe "#interruptible_sleep" do

it "runs for interval without interruption" do
etl = described_class.new(init_args)
t = Time.now.to_f
etl.interruptible_sleep(0.2)
expect(Time.now.to_f - t).to be >= 0.2
end

it "can be interrupted" do
etl = described_class.new(init_args)
Thread.new do
sleep 0.1
etl.interrupt_sleep
end
t = Time.now.to_f
etl.interruptible_sleep(0.5)
expect(Time.now.to_f - t).to be < 0.2
end

end

describe "#with_polling" do

class ForceExit < Exception; end

it "runs with an interval" do
etl = described_class.new(init_args)

watcher = double()
expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice
expect(watcher).to receive(:each).twice
expect(watcher).to receive(:finish).twice
expect(etl).to receive(:apply).twice

count = 0
expect(etl).to receive(:interruptible_sleep).
with(0.2).twice { |m, *args| count += 1; raise ForceExit if count > 1 }

begin
etl.with_polling(0.2) do
etl.apply
end
rescue ForceExit
end
expect(count).to eq(2)

end

it "isolates run loop from block failures" do
etl = described_class.new(init_args)

watcher = double()
expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher).twice
expect(watcher).to receive(:each).twice
expect(watcher).to receive(:finish).twice
expect(etl).to receive(:apply).and_raise("fail").twice

count = 0
expect(etl).to receive(:interruptible_sleep).
with(0.2).twice { |m, *args| count += 1; raise ForceExit if count > 1 }

begin
etl.with_polling(0.2) do
etl.apply
end
rescue ForceExit
end
expect(count).to eq(2)

end

it "interrupts sleep on watch event" do
etl = described_class.new(init_args)

watcher = double()
notice = double("notice", type: "UPDATED", object: double("kube_resource"))
expect(@kubeapi).to receive(:watch_project_mappings).and_return(watcher)
expect(watcher).to receive(:each).and_yield(notice)
expect(watcher).to receive(:finish)
expect(etl).to receive(:apply)
expect(etl).to receive(:interrupt_sleep)

expect(etl).to receive(:interruptible_sleep).
with(0.2) { |m, *args| sleep(0.2); raise ForceExit }

begin
etl.with_polling(0.2) do
etl.apply
end
rescue ForceExit
end

end

end

describe "#load_config" do

it "loads config" do
Expand Down Expand Up @@ -371,12 +468,13 @@ def kubeapi(ns)
Parameter.new(key: "param1", value: "value1", secret: false),
Parameter.new(key: "param2", value: "value2", secret: true)
]
etl = described_class.new(init_args.merge(dry_run: true))
etl.load_config.root_spec.skip_secrets = true
expect(etl.ctapi).to receive(:project_names).and_return(["default"])
expect(etl).to receive(:get_params).and_return(params)
expect(etl).to_not receive(:apply_config_map)
expect(etl).to_not receive(:apply_secret)
etl.apply(dry_run: true)
etl.apply
expect(Logging.contents).to match("Performing dry-run")
end

Expand Down
49 changes: 49 additions & 0 deletions spec/kubetruth/kubeapi_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,55 @@ def apiserver; "https://127.0.0.1"; end
expect(crds.first.keys.sort).to eq(Kubetruth::Config::ProjectSpec.new.to_h.keys.sort)
end

it "can watch project mappings" do
skip("only works when vcr/webmock disabled")

test_mapping_name = "test-mapping-watch"
mapping_data = <<~EOF
apiVersion: kubetruth.cloudtruth.com/v1
kind: ProjectMapping
metadata:
name: #{test_mapping_name}
spec:
scope: override
project_selector: "^notme$"
EOF

# p kubeapi.crdclient.get_project_mappings(namespace: namespace).resourceVersion
# p kubeapi.crdclient.get_project_mappings(namespace: namespace).collect {|r| r.metadata.name }
# p kubeapi.get_project_mappings

watcher = kubeapi.watch_project_mappings
begin
Thread.new do
watcher.each do |notice|
# p notice.type
# p notice.object.metadata.name
# p notice.object
expect(notice.object.metadata.name).to eq(test_mapping_name)
break
end
end

sleep(1)

# need an admin token for this to work or temporarily add to
# projectmappings permissions on installed role
resource = Kubeclient::Resource.new
resource.metadata = {}
resource.metadata.name = test_mapping_name
resource.metadata.namespace = namespace
resource.spec = {scope: "override", project_selector: "^notme$"}
kubeapi.crdclient.create_project_mapping(resource)

# sysrun(%Q[minikube kubectl -- --namespace #{namespace} patch pm kubetruth-test-app-root --type json --patch '[{"op": "replace", "path": "/spec/included_projects", "value": ["Base"]}]'])
# sysrun(%Q[minikube kubectl -- --namespace #{namespace} apply -f -], stdin_data: mapping_data)
sleep(1)
ensure
watcher.finish
end
end

end

end
Expand Down

0 comments on commit d3aaaa5

Please sign in to comment.