diff --git a/Gemfile b/Gemfile index 9c9a05c4d..b5116dd3f 100644 --- a/Gemfile +++ b/Gemfile @@ -16,6 +16,7 @@ gem 'statsd-ruby', '>= 1.3.0', :require => 'statsd' # Test deps group :test do + gem 'mock_redis', '>= 0.17.0' gem 'rack-test', '>= 0.6' gem 'rspec', '>= 3.2' gem 'simplecov', '>= 0.11.2' diff --git a/lib/vmpooler/api/v1.rb b/lib/vmpooler/api/v1.rb index 3c8c8aaa8..22a0ef11a 100644 --- a/lib/vmpooler/api/v1.rb +++ b/lib/vmpooler/api/v1.rb @@ -55,6 +55,7 @@ def return_vm_to_ready_state(template, vm) def account_for_starting_vm(template, vm) backend.sadd('vmpooler__running__' + template, vm) + backend.sadd('vmpooler__migrating__' + template, vm) backend.hset('vmpooler__active__' + template, vm, Time.now) backend.hset('vmpooler__vm__' + vm, 'checkout', Time.now) diff --git a/lib/vmpooler/pool_manager.rb b/lib/vmpooler/pool_manager.rb index 53895bb4a..939d7bc54 100644 --- a/lib/vmpooler/pool_manager.rb +++ b/lib/vmpooler/pool_manager.rb @@ -20,39 +20,58 @@ def initialize(config, logger, redis, metrics) end # Check the state of a VM - def check_pending_vm(vm, pool, timeout) + def check_pending_vm(vm, pool, timeout, vsphere) Thread.new do - _check_pending_vm(vm, pool, timeout) + _check_pending_vm(vm, pool, timeout, vsphere) end end - def _check_pending_vm(vm, pool, timeout) - host = $vsphere[pool].find_vm(vm) - - if host + def open_socket(host, domain=nil, timeout=5, port=22, &block) + Timeout.timeout(timeout) do + target_host = host + target_host = "#{host}.#{domain}" if domain + sock = TCPSocket.new target_host, port begin - Timeout.timeout(5) do - TCPSocket.new vm, 22 - end - move_pending_vm_to_ready(vm, pool, host) - rescue - fail_pending_vm(vm, pool, timeout) + yield sock if block_given? + ensure + sock.close end - else - fail_pending_vm(vm, pool, timeout) end end - def fail_pending_vm(vm, pool, timeout) - clone_stamp = $redis.hget('vmpooler__vm__' + vm, 'clone') + def _check_pending_vm(vm, pool, timeout, vsphere) + host = vsphere.find_vm(vm) + + if ! host + fail_pending_vm(vm, pool, timeout, false) + return + end + open_socket vm + move_pending_vm_to_ready(vm, pool, host) + rescue + fail_pending_vm(vm, pool, timeout) + end - if (clone_stamp) && - (((Time.now - Time.parse(clone_stamp)) / 60) > timeout) + def remove_nonexistent_vm(vm, pool) + $redis.srem("vmpooler__pending__#{pool}", vm) + $logger.log('d', "[!] [#{pool}] '#{vm}' no longer exists. Removing from pending.") + end - $redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm) + def fail_pending_vm(vm, pool, timeout, exists=true) + clone_stamp = $redis.hget("vmpooler__vm__#{vm}", 'clone') + return if ! clone_stamp - $logger.log('d', "[!] [#{pool}] '#{vm}' marked as 'failed' after #{timeout} minutes") + time_since_clone = (Time.now - Time.parse(clone_stamp)) / 60 + if time_since_clone > timeout + if exists + $redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm) + $logger.log('d', "[!] [#{pool}] '#{vm}' marked as 'failed' after #{timeout} minutes") + else + remove_nonexistent_vm(vm, pool) + end end + rescue => err + $logger.log('d', "Fail pending VM failed with an error: #{err}") end def move_pending_vm_to_ready(vm, pool, host) @@ -76,7 +95,7 @@ def move_pending_vm_to_ready(vm, pool, host) end end - def check_ready_vm(vm, pool, ttl) + def check_ready_vm(vm, pool, ttl, vsphere) Thread.new do if ttl > 0 if (((Time.now - host.runtime.bootTime) / 60).to_s[/^\d+\.\d{1}/].to_f) > ttl @@ -94,8 +113,7 @@ def check_ready_vm(vm, pool, ttl) $redis.hset('vmpooler__vm__' + vm, 'check', Time.now) - host = $vsphere[pool].find_vm(vm) || - $vsphere[pool].find_vm_heavy(vm)[vm] + host = vsphere.find_vm(vm) if host if @@ -124,26 +142,26 @@ def check_ready_vm(vm, pool, ttl) end begin - Timeout.timeout(5) do - TCPSocket.new vm, 22 - end + open_socket vm rescue if $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, removed from 'ready' queue") + else + $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, and failed to remove from 'ready' queue") end end end end end - def check_running_vm(vm, pool, ttl) + def check_running_vm(vm, pool, ttl, vsphere) Thread.new do - _check_running_vm(vm, pool, ttl) + _check_running_vm(vm, pool, ttl, vsphere) end end - def _check_running_vm(vm, pool, ttl) - host = $vsphere[pool].find_vm(vm) + def _check_running_vm(vm, pool, ttl, vsphere) + host = vsphere.find_vm(vm) if host queue_from, queue_to = 'running', 'completed' @@ -167,101 +185,105 @@ def move_vm_queue(pool, vm, queue_from, queue_to, msg) end # Clone a VM - def clone_vm(template, folder, datastore, target) + def clone_vm(template, folder, datastore, target, vsphere) Thread.new do - vm = {} + begin + vm = {} - if template =~ /\// - templatefolders = template.split('/') - vm['template'] = templatefolders.pop - end + if template =~ /\// + templatefolders = template.split('/') + vm['template'] = templatefolders.pop + end - if templatefolders - vm[vm['template']] = $vsphere[vm['template']].find_folder(templatefolders.join('/')).find(vm['template']) - else - fail 'Please provide a full path to the template' - end + if templatefolders + vm[vm['template']] = vsphere.find_folder(templatefolders.join('/')).find(vm['template']) + else + fail 'Please provide a full path to the template' + end - if vm['template'].length == 0 - fail "Unable to find template '#{vm['template']}'!" - end + if vm['template'].length == 0 + fail "Unable to find template '#{vm['template']}'!" + end - # Generate a randomized hostname - o = [('a'..'z'), ('0'..'9')].map(&:to_a).flatten - vm['hostname'] = $config[:config]['prefix'] + o[rand(25)] + (0...14).map { o[rand(o.length)] }.join + # Generate a randomized hostname + o = [('a'..'z'), ('0'..'9')].map(&:to_a).flatten + vm['hostname'] = $config[:config]['prefix'] + o[rand(25)] + (0...14).map { o[rand(o.length)] }.join + + # Add VM to Redis inventory ('pending' pool) + $redis.sadd('vmpooler__pending__' + vm['template'], vm['hostname']) + $redis.hset('vmpooler__vm__' + vm['hostname'], 'clone', Time.now) + $redis.hset('vmpooler__vm__' + vm['hostname'], 'template', vm['template']) + + # Annotate with creation time, origin template, etc. + # Add extraconfig options that can be queried by vmtools + configSpec = RbVmomi::VIM.VirtualMachineConfigSpec( + annotation: JSON.pretty_generate( + name: vm['hostname'], + created_by: $config[:vsphere]['username'], + base_template: vm['template'], + creation_timestamp: Time.now.utc + ), + extraConfig: [ + { key: 'guestinfo.hostname', + value: vm['hostname'] + } + ] + ) + + # Choose a clone target + if target + $clone_target = vsphere.find_least_used_host(target) + elsif $config[:config]['clone_target'] + $clone_target = vsphere.find_least_used_host($config[:config]['clone_target']) + end - # Add VM to Redis inventory ('pending' pool) - $redis.sadd('vmpooler__pending__' + vm['template'], vm['hostname']) - $redis.hset('vmpooler__vm__' + vm['hostname'], 'clone', Time.now) - $redis.hset('vmpooler__vm__' + vm['hostname'], 'template', vm['template']) + # Put the VM in the specified folder and resource pool + relocateSpec = RbVmomi::VIM.VirtualMachineRelocateSpec( + datastore: vsphere.find_datastore(datastore), + host: $clone_target, + diskMoveType: :moveChildMostDiskBacking + ) - # Annotate with creation time, origin template, etc. - # Add extraconfig options that can be queried by vmtools - configSpec = RbVmomi::VIM.VirtualMachineConfigSpec( - annotation: JSON.pretty_generate( - name: vm['hostname'], - created_by: $config[:vsphere]['username'], - base_template: vm['template'], - creation_timestamp: Time.now.utc - ), - extraConfig: [ - { key: 'guestinfo.hostname', - value: vm['hostname'] - } - ] - ) - - # Choose a clone target - if target - $clone_target = $vsphere[vm['template']].find_least_used_host(target) - elsif $config[:config]['clone_target'] - $clone_target = $vsphere[vm['template']].find_least_used_host($config[:config]['clone_target']) - end - - # Put the VM in the specified folder and resource pool - relocateSpec = RbVmomi::VIM.VirtualMachineRelocateSpec( - datastore: $vsphere[vm['template']].find_datastore(datastore), - host: $clone_target, - diskMoveType: :moveChildMostDiskBacking - ) - - # Create a clone spec - spec = RbVmomi::VIM.VirtualMachineCloneSpec( - location: relocateSpec, - config: configSpec, - powerOn: true, - template: false - ) - - # Clone the VM - $logger.log('d', "[ ] [#{vm['template']}] '#{vm['hostname']}' is being cloned from '#{vm['template']}'") + # Create a clone spec + spec = RbVmomi::VIM.VirtualMachineCloneSpec( + location: relocateSpec, + config: configSpec, + powerOn: true, + template: false + ) - begin - start = Time.now - vm[vm['template']].CloneVM_Task( - folder: $vsphere[vm['template']].find_folder(folder), - name: vm['hostname'], - spec: spec - ).wait_for_completion - finish = '%.2f' % (Time.now - start) + # Clone the VM + $logger.log('d', "[ ] [#{vm['template']}] '#{vm['hostname']}' is being cloned from '#{vm['template']}'") + + begin + start = Time.now + vm[vm['template']].CloneVM_Task( + folder: vsphere.find_folder(folder), + name: vm['hostname'], + spec: spec + ).wait_for_completion + finish = '%.2f' % (Time.now - start) - $redis.hset('vmpooler__clone__' + Date.today.to_s, vm['template'] + ':' + vm['hostname'], finish) - $redis.hset('vmpooler__vm__' + vm['hostname'], 'clone_time', finish) + $redis.hset('vmpooler__clone__' + Date.today.to_s, vm['template'] + ':' + vm['hostname'], finish) + $redis.hset('vmpooler__vm__' + vm['hostname'], 'clone_time', finish) - $logger.log('s', "[+] [#{vm['template']}] '#{vm['hostname']}' cloned from '#{vm['template']}' in #{finish} seconds") - rescue - $logger.log('s', "[!] [#{vm['template']}] '#{vm['hostname']}' clone appears to have failed") - $redis.srem('vmpooler__pending__' + vm['template'], vm['hostname']) - end + $logger.log('s', "[+] [#{vm['template']}] '#{vm['hostname']}' cloned from '#{vm['template']}' in #{finish} seconds") + rescue => err + $logger.log('s', "[!] [#{vm['template']}] '#{vm['hostname']}' clone failed with an error: #{err}") + $redis.srem('vmpooler__pending__' + vm['template'], vm['hostname']) + end - $redis.decr('vmpooler__tasks__clone') + $redis.decr('vmpooler__tasks__clone') - $metrics.timing("clone.#{vm['template']}", finish) + $metrics.timing("clone.#{vm['template']}", finish) + rescue => err + $logger.log('s', "[!] [#{vm['template']}] '#{vm['hostname']}' failed while preparing to clone with an error: #{err}") + end end end # Destroy a VM - def destroy_vm(vm, pool) + def destroy_vm(vm, pool, vsphere) Thread.new do $redis.srem('vmpooler__completed__' + pool, vm) $redis.hdel('vmpooler__active__' + pool, vm) @@ -270,8 +292,7 @@ def destroy_vm(vm, pool) # Auto-expire metadata key $redis.expire('vmpooler__vm__' + vm, ($config[:redis]['data_ttl'].to_i * 60 * 60)) - host = $vsphere[pool].find_vm(vm) || - $vsphere[pool].find_vm_heavy(vm)[vm] + host = vsphere.find_vm(vm) if host start = Time.now @@ -294,15 +315,14 @@ def destroy_vm(vm, pool) end end - def create_vm_disk(vm, disk_size) + def create_vm_disk(vm, disk_size, vsphere) Thread.new do - _create_vm_disk(vm, disk_size) + _create_vm_disk(vm, disk_size, vsphere) end end - def _create_vm_disk(vm, disk_size) - host = $vsphere['disk_manager'].find_vm(vm) || - $vsphere['disk_manager'].find_vm_heavy(vm)[vm] + def _create_vm_disk(vm, disk_size, vsphere) + host = vsphere.find_vm(vm) if (host) && ((! disk_size.nil?) && (! disk_size.empty?) && (disk_size.to_i > 0)) $logger.log('s', "[ ] [disk_manager] '#{vm}' is attaching a #{disk_size}gb disk") @@ -319,7 +339,7 @@ def _create_vm_disk(vm, disk_size) end if ((! datastore.nil?) && (! datastore.empty?)) - $vsphere['disk_manager'].add_disk(host, disk_size, datastore) + vsphere.add_disk(host, disk_size, datastore) rdisks = $redis.hget('vmpooler__vm__' + vm, 'disk') disks = rdisks ? rdisks.split(':') : [] @@ -335,15 +355,14 @@ def _create_vm_disk(vm, disk_size) end end - def create_vm_snapshot(vm, snapshot_name) + def create_vm_snapshot(vm, snapshot_name, vsphere) Thread.new do - _create_vm_snapshot(vm, snapshot_name) + _create_vm_snapshot(vm, snapshot_name, vsphere) end end - def _create_vm_snapshot(vm, snapshot_name) - host = $vsphere['snapshot_manager'].find_vm(vm) || - $vsphere['snapshot_manager'].find_vm_heavy(vm)[vm] + def _create_vm_snapshot(vm, snapshot_name, vsphere) + host = vsphere.find_vm(vm) if (host) && ((! snapshot_name.nil?) && (! snapshot_name.empty?)) $logger.log('s', "[ ] [snapshot_manager] '#{vm}' is being snapshotted") @@ -365,18 +384,17 @@ def _create_vm_snapshot(vm, snapshot_name) end end - def revert_vm_snapshot(vm, snapshot_name) + def revert_vm_snapshot(vm, snapshot_name, vsphere) Thread.new do - _revert_vm_snapshot(vm, snapshot_name) + _revert_vm_snapshot(vm, snapshot_name, vsphere) end end - def _revert_vm_snapshot(vm, snapshot_name) - host = $vsphere['snapshot_manager'].find_vm(vm) || - $vsphere['snapshot_manager'].find_vm_heavy(vm)[vm] + def _revert_vm_snapshot(vm, snapshot_name, vsphere) + host = vsphere.find_vm(vm) if host - snapshot = $vsphere['snapshot_manager'].find_snapshot(host, snapshot_name) + snapshot = vsphere.find_snapshot(host, snapshot_name) if snapshot $logger.log('s', "[ ] [snapshot_manager] '#{vm}' is being reverted to snapshot '#{snapshot_name}'") @@ -395,23 +413,23 @@ def _revert_vm_snapshot(vm, snapshot_name) def check_disk_queue $logger.log('d', "[*] [disk_manager] starting worker thread") - $vsphere['disk_manager'] ||= Vmpooler::VsphereHelper.new + $vsphere['disk_manager'] ||= Vmpooler::VsphereHelper.new $config[:vsphere] $threads['disk_manager'] = Thread.new do loop do - _check_disk_queue + _check_disk_queue $vsphere['disk_manager'] sleep(5) end end end - def _check_disk_queue + def _check_disk_queue(vsphere) vm = $redis.spop('vmpooler__tasks__disk') unless vm.nil? begin vm_name, disk_size = vm.split(':') - create_vm_disk(vm_name, disk_size) + create_vm_disk(vm_name, disk_size, vsphere) rescue $logger.log('s', "[!] [disk_manager] disk creation appears to have failed") end @@ -421,23 +439,23 @@ def _check_disk_queue def check_snapshot_queue $logger.log('d', "[*] [snapshot_manager] starting worker thread") - $vsphere['snapshot_manager'] ||= Vmpooler::VsphereHelper.new + $vsphere['snapshot_manager'] ||= Vmpooler::VsphereHelper.new $config[:vsphere] $threads['snapshot_manager'] = Thread.new do loop do - _check_snapshot_queue + _check_snapshot_queue $vsphere['snapshot_manager'] sleep(5) end end end - def _check_snapshot_queue + def _check_snapshot_queue(vsphere) vm = $redis.spop('vmpooler__tasks__snapshot') unless vm.nil? begin vm_name, snapshot_name = vm.split(':') - create_vm_snapshot(vm_name, snapshot_name) + create_vm_snapshot(vm_name, snapshot_name, vsphere) rescue $logger.log('s', "[!] [snapshot_manager] snapshot appears to have failed") end @@ -448,31 +466,100 @@ def _check_snapshot_queue unless vm.nil? begin vm_name, snapshot_name = vm.split(':') - revert_vm_snapshot(vm_name, snapshot_name) + revert_vm_snapshot(vm_name, snapshot_name, vsphere) rescue $logger.log('s', "[!] [snapshot_manager] snapshot revert appears to have failed") end end end + def migration_limit(migration_limit) + # Returns migration_limit setting when enabled + return false if migration_limit == 0 || ! migration_limit + migration_limit if migration_limit >= 1 + end + + def migrate_vm(vm, pool, vsphere) + Thread.new do + _migrate_vm(vm, pool, vsphere) + end + end + + def _migrate_vm(vm, pool, vsphere) + begin + $redis.srem('vmpooler__migrating__' + pool, vm) + vm_object = vsphere.find_vm(vm) + parent_host, parent_host_name = get_vm_host_info(vm_object) + migration_limit = migration_limit $config[:config]['migration_limit'] + migration_count = $redis.scard('vmpooler__migration') + + if ! migration_limit + $logger.log('s', "[ ] [#{pool}] '#{vm}' is running on #{parent_host_name}") + return + else + if migration_count >= migration_limit + $logger.log('s', "[ ] [#{pool}] '#{vm}' is running on #{parent_host_name}. No migration will be evaluated since the migration_limit has been reached") + return + else + $redis.sadd('vmpooler__migration', vm) + host, host_name = vsphere.find_least_used_compatible_host(vm_object) + if host == parent_host + $logger.log('s', "[ ] [#{pool}] No migration required for '#{vm}' running on #{parent_host_name}") + else + finish = migrate_vm_and_record_timing(vm_object, vm, host, vsphere) + $logger.log('s', "[>] [#{pool}] '#{vm}' migrated from #{parent_host_name} to #{host_name} in #{finish} seconds") + end + remove_vmpooler_migration_vm(pool, vm) + end + end + rescue => err + $logger.log('s', "[x] [#{pool}] '#{vm}' migration failed with an error: #{err}") + remove_vmpooler_migration_vm(pool, vm) + end + end + + def get_vm_host_info(vm_object) + parent_host = vm_object.summary.runtime.host + [parent_host, parent_host.name] + end + + def remove_vmpooler_migration_vm(pool, vm) + begin + $redis.srem('vmpooler__migration', vm) + rescue => err + $logger.log('s', "[x] [#{pool}] '#{vm}' removal from vmpooler__migration failed with an error: #{err}") + end + end + + def migrate_vm_and_record_timing(vm_object, vm_name, host, vsphere) + start = Time.now + vsphere.migrate_vm_host(vm_object, host) + finish = '%.2f' % (Time.now - start) + $metrics.timing("migrate.#{vm_name}", finish) + checkout_to_migration = '%.2f' % (Time.now - Time.parse($redis.hget("vmpooler__vm__#{vm_name}", 'checkout'))) + $redis.hset("vmpooler__vm__#{vm_name}", 'migration_time', finish) + $redis.hset("vmpooler__vm__#{vm_name}", 'checkout_to_migration', checkout_to_migration) + finish + end + def check_pool(pool) $logger.log('d', "[*] [#{pool['name']}] starting worker thread") - $vsphere[pool['name']] ||= Vmpooler::VsphereHelper.new + $vsphere[pool['name']] ||= Vmpooler::VsphereHelper.new $config[:vsphere] $threads[pool['name']] = Thread.new do loop do - _check_pool(pool) + _check_pool(pool, $vsphere[pool['name']]) sleep(5) end end end - def _check_pool(pool) + def _check_pool(pool, vsphere) # INVENTORY inventory = {} begin - base = $vsphere[pool['name']].find_folder(pool['folder']) + base = vsphere.find_folder(pool['folder']) base.childEntity.each do |vm| if @@ -480,7 +567,8 @@ def _check_pool(pool) (! $redis.sismember('vmpooler__ready__' + pool['name'], vm['name'])) && (! $redis.sismember('vmpooler__pending__' + pool['name'], vm['name'])) && (! $redis.sismember('vmpooler__completed__' + pool['name'], vm['name'])) && - (! $redis.sismember('vmpooler__discovered__' + pool['name'], vm['name'])) + (! $redis.sismember('vmpooler__discovered__' + pool['name'], vm['name'])) && + (! $redis.sismember('vmpooler__migrating__' + pool['name'], vm['name'])) $redis.sadd('vmpooler__discovered__' + pool['name'], vm['name']) @@ -493,82 +581,97 @@ def _check_pool(pool) end # RUNNING - $redis.smembers('vmpooler__running__' + pool['name']).each do |vm| + $redis.smembers("vmpooler__running__#{pool['name']}").each do |vm| if inventory[vm] begin - check_running_vm(vm, pool['name'], $redis.hget('vmpooler__vm__' + vm, 'lifetime') || $config[:config]['vm_lifetime'] || 12) + vm_lifetime = $redis.hget('vmpooler__vm__' + vm, 'lifetime') || $config[:config]['vm_lifetime'] || 12 + check_running_vm(vm, pool['name'], vm_lifetime, vsphere) rescue end end end # READY - $redis.smembers('vmpooler__ready__' + pool['name']).each do |vm| + $redis.smembers("vmpooler__ready__#{pool['name']}").each do |vm| if inventory[vm] begin - check_ready_vm(vm, pool['name'], pool['ready_ttl'] || 0) + check_ready_vm(vm, pool['name'], pool['ready_ttl'] || 0, vsphere) rescue end end end # PENDING - $redis.smembers('vmpooler__pending__' + pool['name']).each do |vm| + $redis.smembers("vmpooler__pending__#{pool['name']}").each do |vm| + pool_timeout = pool['timeout'] || $config[:config]['timeout'] || 15 if inventory[vm] begin - check_pending_vm(vm, pool['name'], pool['timeout'] || $config[:config]['timeout'] || 15) + check_pending_vm(vm, pool['name'], pool_timeout, vsphere) rescue end + else + fail_pending_vm(vm, pool['name'], pool_timeout, false) end end # COMPLETED - $redis.smembers('vmpooler__completed__' + pool['name']).each do |vm| + $redis.smembers("vmpooler__completed__#{pool['name']}").each do |vm| if inventory[vm] begin - destroy_vm(vm, pool['name']) + destroy_vm(vm, pool['name'], vsphere) rescue $logger.log('s', "[!] [#{pool['name']}] '#{vm}' destroy appears to have failed") - $redis.srem('vmpooler__completed__' + pool['name'], vm) - $redis.hdel('vmpooler__active__' + pool['name'], vm) - $redis.del('vmpooler__vm__' + vm) + $redis.srem("vmpooler__completed__#{pool['name']}", vm) + $redis.hdel("vmpooler__active__#{pool['name']}", vm) + $redis.del("vmpooler__vm__#{vm}") end else $logger.log('s', "[!] [#{pool['name']}] '#{vm}' not found in inventory, removed from 'completed' queue") - $redis.srem('vmpooler__completed__' + pool['name'], vm) - $redis.hdel('vmpooler__active__' + pool['name'], vm) - $redis.del('vmpooler__vm__' + vm) + $redis.srem("vmpooler__completed__#{pool['name']}", vm) + $redis.hdel("vmpooler__active__#{pool['name']}", vm) + $redis.del("vmpooler__vm__#{vm}") end end # DISCOVERED - $redis.smembers('vmpooler__discovered__' + pool['name']).each do |vm| + $redis.smembers("vmpooler__discovered__#{pool['name']}").each do |vm| %w(pending ready running completed).each do |queue| - if $redis.sismember('vmpooler__' + queue + '__' + pool['name'], vm) + if $redis.sismember("vmpooler__#{queue}__#{pool['name']}", vm) $logger.log('d', "[!] [#{pool['name']}] '#{vm}' found in '#{queue}', removed from 'discovered' queue") - $redis.srem('vmpooler__discovered__' + pool['name'], vm) + $redis.srem("vmpooler__discovered__#{pool['name']}", vm) end end - if $redis.sismember('vmpooler__discovered__' + pool['name'], vm) - $redis.smove('vmpooler__discovered__' + pool['name'], 'vmpooler__completed__' + pool['name'], vm) + if $redis.sismember("vmpooler__discovered__#{pool['name']}", vm) + $redis.smove("vmpooler__discovered__#{pool['name']}", "vmpooler__completed__#{pool['name']}", vm) + end + end + + # MIGRATIONS + $redis.smembers("vmpooler__migrating__#{pool['name']}").each do |vm| + if inventory[vm] + begin + migrate_vm(vm, pool['name'], vsphere) + rescue => err + $logger.log('s', "[x] [#{pool['name']}] '#{vm}' failed to migrate: #{err}") + end end end # REPOPULATE - ready = $redis.scard('vmpooler__ready__' + pool['name']) - total = $redis.scard('vmpooler__pending__' + pool['name']) + ready + ready = $redis.scard("vmpooler__ready__#{pool['name']}") + total = $redis.scard("vmpooler__pending__#{pool['name']}") + ready - $metrics.gauge('ready.' + pool['name'], $redis.scard('vmpooler__ready__' + pool['name'])) - $metrics.gauge('running.' + pool['name'], $redis.scard('vmpooler__running__' + pool['name'])) + $metrics.gauge("ready.#{pool['name']}", $redis.scard("vmpooler__ready__#{pool['name']}")) + $metrics.gauge("running.#{pool['name']}", $redis.scard("vmpooler__running__#{pool['name']}")) - if $redis.get('vmpooler__empty__' + pool['name']) + if $redis.get("vmpooler__empty__#{pool['name']}") unless ready == 0 - $redis.del('vmpooler__empty__' + pool['name']) + $redis.del("vmpooler__empty__#{pool['name']}") end else if ready == 0 - $redis.set('vmpooler__empty__' + pool['name'], 'true') + $redis.set("vmpooler__empty__#{pool['name']}", 'true') $logger.log('s', "[!] [#{pool['name']}] is empty") end end @@ -583,10 +686,11 @@ def _check_pool(pool) pool['template'], pool['folder'], pool['datastore'], - pool['clone_target'] + pool['clone_target'], + vsphere ) - rescue - $logger.log('s', "[!] [#{pool['name']}] clone appears to have failed") + rescue => err + $logger.log('s', "[!] [#{pool['name']}] clone failed during check_pool with an error: #{err}") $redis.decr('vmpooler__tasks__clone') end end diff --git a/lib/vmpooler/vsphere_helper.rb b/lib/vmpooler/vsphere_helper.rb index 622b66dba..5d06b64dd 100644 --- a/lib/vmpooler/vsphere_helper.rb +++ b/lib/vmpooler/vsphere_helper.rb @@ -6,22 +6,27 @@ class VsphereHelper DISK_TYPE = 'thin' DISK_MODE = 'persistent' - def initialize(_vInfo = {}) - config_file = File.expand_path('vmpooler.yaml') - vsphere = YAML.load_file(config_file)[:vsphere] - - @connection = RbVmomi::VIM.connect host: vsphere['server'], - user: vsphere['username'], - password: vsphere['password'], - insecure: true + def initialize(credentials) + $credentials = credentials end - def add_disk(vm, size, datastore) + def ensure_connected(connection, credentials) begin - @connection.serviceInstance.CurrentTime + connection.serviceInstance.CurrentTime rescue - initialize + connect_to_vsphere $credentials end + end + + def connect_to_vsphere(credentials) + @connection = RbVmomi::VIM.connect host: credentials['server'], + user: credentials['username'], + password: credentials['password'], + insecure: credentials['insecure'] || true + end + + def add_disk(vm, size, datastore) + ensure_connected @connection, $credentials return false unless size.to_i > 0 @@ -71,22 +76,14 @@ def add_disk(vm, size, datastore) end def find_datastore(datastorename) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials datacenter = @connection.serviceInstance.find_datacenter datacenter.find_datastore(datastorename) end def find_device(vm, deviceName) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials vm.config.hardware.device.each do |device| return device if device.deviceInfo.label == deviceName @@ -96,11 +93,7 @@ def find_device(vm, deviceName) end def find_disk_controller(vm) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials devices = find_disk_devices(vm) @@ -114,11 +107,7 @@ def find_disk_controller(vm) end def find_disk_devices(vm) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials devices = {} @@ -146,11 +135,7 @@ def find_disk_devices(vm) end def find_disk_unit_number(vm, controller) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials used_unit_numbers = [] available_unit_numbers = [] @@ -175,11 +160,7 @@ def find_disk_unit_number(vm, controller) end def find_folder(foldername) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials datacenter = @connection.serviceInstance.find_datacenter base = datacenter.vmFolder @@ -196,39 +177,89 @@ def find_folder(foldername) base end - def find_least_used_host(cluster) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize + # Returns an array containing cumulative CPU and memory utilization of a host, and its object reference + # Params: + # +model+:: CPU arch version to match on + # +limit+:: Hard limit for CPU or memory utilization beyond which a host is excluded for deployments + def get_host_utilization(host, model=nil, limit=90) + if model + return nil unless host_has_cpu_model? host, model end + return nil if host.runtime.inMaintenanceMode + return nil unless host.overallStatus == 'green' + + cpu_utilization = cpu_utilization_for host + memory_utilization = memory_utilization_for host - hosts = {} - hosts_sort = {} + return nil if cpu_utilization > limit + return nil if memory_utilization > limit + [ cpu_utilization + memory_utilization, host ] + end + + def host_has_cpu_model?(host, model) + get_host_cpu_arch_version(host) == model + end + + def get_host_cpu_arch_version(host) + cpu_model = host.hardware.cpuPkg[0].description + cpu_model_parts = cpu_model.split() + arch_version = cpu_model_parts[4] + arch_version + end + + def cpu_utilization_for(host) + cpu_usage = host.summary.quickStats.overallCpuUsage + cpu_size = host.summary.hardware.cpuMhz * host.summary.hardware.numCpuCores + (cpu_usage.to_f / cpu_size.to_f) * 100 + end + + def memory_utilization_for(host) + memory_usage = host.summary.quickStats.overallMemoryUsage + memory_size = host.summary.hardware.memorySize / 1024 / 1024 + (memory_usage.to_f / memory_size.to_f) * 100 + end + + def find_least_used_host(cluster) + ensure_connected @connection, $credentials + + cluster_object = find_cluster(cluster) + target_hosts = get_cluster_host_utilization(cluster_object) + least_used_host = target_hosts.sort[0][1] + least_used_host + end + + def find_cluster(cluster) datacenter = @connection.serviceInstance.find_datacenter - datacenter.hostFolder.children.each do |folder| - next unless folder.name == cluster - folder.host.each do |host| - if - (host.overallStatus == 'green') && - (!host.runtime.inMaintenanceMode) - - hosts[host.name] = host - hosts_sort[host.name] = host.vm.length - end - end + datacenter.hostFolder.children.find { |cluster_object| cluster_object.name == cluster } + end + + def get_cluster_host_utilization(cluster) + cluster_hosts = [] + cluster.host.each do |host| + host_usage = get_host_utilization(host) + cluster_hosts << host_usage if host_usage end + cluster_hosts + end - hosts[hosts_sort.sort_by { |_k, v| v }[0][0]] + def find_least_used_compatible_host(vm) + ensure_connected @connection, $credentials + + source_host = vm.summary.runtime.host + model = get_host_cpu_arch_version(source_host) + cluster = source_host.parent + target_hosts = [] + cluster.host.each do |host| + host_usage = get_host_utilization(host, model) + target_hosts << host_usage if host_usage + end + target_host = target_hosts.sort[0][1] + [target_host, target_host.name] end def find_pool(poolname) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials datacenter = @connection.serviceInstance.find_datacenter base = datacenter.hostFolder @@ -257,21 +288,18 @@ def find_snapshot(vm, snapshotname) end def find_vm(vmname) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials + find_vm_light(vmname) || find_vm_heavy(vmname)[vmname] + end + + def find_vm_light(vmname) + ensure_connected @connection, $credentials @connection.searchIndex.FindByDnsName(vmSearch: true, dnsName: vmname) end def find_vm_heavy(vmname) - begin - @connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials vmname = vmname.is_a?(Array) ? vmname : [vmname] containerView = get_base_vm_container_from @connection @@ -321,11 +349,7 @@ def find_vm_heavy(vmname) end def find_vmdks(vmname, datastore) - begin - connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials disks = [] @@ -344,11 +368,7 @@ def find_vmdks(vmname, datastore) end def get_base_vm_container_from(connection) - begin - connection.serviceInstance.CurrentTime - rescue - initialize - end + ensure_connected @connection, $credentials viewManager = connection.serviceContent.viewManager viewManager.CreateContainerView( @@ -372,6 +392,11 @@ def get_snapshot_list(tree, snapshotname) snapshot end + def migrate_vm_host(vm, host) + relospec = RbVmomi::VIM.VirtualMachineRelocateSpec(host: host) + vm.RelocateVM_Task(spec: relospec).wait_for_completion + end + def close @connection.close end diff --git a/spec/helpers.rb b/spec/helpers.rb index 712cdab0f..9005ec57e 100644 --- a/spec/helpers.rb +++ b/spec/helpers.rb @@ -50,11 +50,21 @@ def create_pending_vm(template, name, token = nil) redis.hset("vmpooler__vm__#{name}", "template", template) end -def create_vm(name, token = nil) - redis.hset("vmpooler__vm__#{name}", 'checkout', Time.now) - if token - redis.hset("vmpooler__vm__#{name}", 'token:token', token) - end +def create_vm(name, token = nil, redis_handle = nil) + redis_db = redis_handle ? redis_handle : redis + redis_db.hset("vmpooler__vm__#{name}", 'checkout', Time.now) + redis_db.hset("vmpooler__vm__#{name}", 'token:token', token) if token +end + +def create_migrating_vm(name, pool, redis_handle = nil) + redis_db = redis_handle ? redis_handle : redis + redis_db.hset("vmpooler__vm__#{name}", 'checkout', Time.now) + redis_db.sadd("vmpooler__migrating__#{pool}", name) +end + +def add_vm_to_migration_set(name, redis_handle = nil) + redis_db = redis_handle ? redis_handle : redis + redis_db.sadd('vmpooler__migration', name) end def fetch_vm(vm) diff --git a/spec/vmpooler/pool_manager_migration_spec.rb b/spec/vmpooler/pool_manager_migration_spec.rb new file mode 100644 index 000000000..9fd491b01 --- /dev/null +++ b/spec/vmpooler/pool_manager_migration_spec.rb @@ -0,0 +1,87 @@ +require 'spec_helper' +require 'mock_redis' +require 'time' + +describe 'Pool Manager' do + let(:logger) { double('logger') } + let(:redis) { MockRedis.new } + let(:metrics) { Vmpooler::DummyStatsd.new } + let(:config) { + { + config: { + 'site_name' => 'test pooler', + 'migration_limit' => 2, + vsphere: { + 'server' => 'vsphere.puppet.com', + 'username' => 'vmpooler@vsphere.local', + 'password' => '', + 'insecure' => true + }, + pools: [ {'name' => 'pool1', 'size' => 5, 'folder' => 'pool1_folder'} ], + statsd: { 'prefix' => 'stats_prefix'}, + pool_names: [ 'pool1' ] + } + } + } + let(:pool) { config[:config][:pools][0]['name'] } + let(:vm) { + { + 'name' => 'vm1', + 'host' => 'host1', + 'template' => pool, + } + } + + describe '#_migrate_vm' do + let(:vsphere) { double(pool) } + let(:pooler) { Vmpooler::PoolManager.new(config, logger, redis, metrics) } + context 'evaluates VM for migration and logs host' do + before do + create_migrating_vm vm['name'], pool, redis + allow(vsphere).to receive(:find_vm).and_return(vm) + allow(pooler).to receive(:get_vm_host_info).and_return([{'name' => 'host1'}, 'host1']) + end + + it 'logs VM host when migration is disabled' do + config[:config]['migration_limit'] = nil + + expect(redis.sismember("vmpooler__migrating__#{pool}", vm['name'])).to be true + expect(logger).to receive(:log).with('s', "[ ] [#{pool}] '#{vm['name']}' is running on #{vm['host']}") + + pooler._migrate_vm(vm['name'], pool, vsphere) + + expect(redis.sismember("vmpooler__migrating__#{pool}", vm['name'])).to be false + end + + it 'verifies that migration_limit greater than or equal to migrations in progress and logs host' do + add_vm_to_migration_set vm['name'], redis + add_vm_to_migration_set 'vm2', redis + + expect(logger).to receive(:log).with('s', "[ ] [#{pool}] '#{vm['name']}' is running on #{vm['host']}. No migration will be evaluated since the migration_limit has been reached") + + pooler._migrate_vm(vm['name'], pool, vsphere) + end + + it 'verifies that migration_limit is less than migrations in progress and logs old host, new host and migration time' do + allow(vsphere).to receive(:find_least_used_compatible_host).and_return([{'name' => 'host2'}, 'host2']) + allow(vsphere).to receive(:migrate_vm_host) + + expect(redis.hget("vmpooler__vm__#{vm['name']}", 'migration_time')) + expect(redis.hget("vmpooler__vm__#{vm['name']}", 'checkout_to_migration')) + expect(logger).to receive(:log).with('s', "[>] [#{pool}] '#{vm['name']}' migrated from #{vm['host']} to host2 in 0.00 seconds") + + pooler._migrate_vm(vm['name'], pool, vsphere) + end + + it 'fails when no suitable host can be found' do + error = 'ArgumentError: No target host found' + allow(vsphere).to receive(:find_least_used_compatible_host) + allow(vsphere).to receive(:migrate_vm_host).and_raise(error) + + expect(logger).to receive(:log).with('s', "[x] [#{pool}] '#{vm['name']}' migration failed with an error: #{error}") + + pooler._migrate_vm(vm['name'], pool, vsphere) + end + end + end +end diff --git a/spec/vmpooler/pool_manager_spec.rb b/spec/vmpooler/pool_manager_spec.rb index c5695788d..6bf00bd72 100644 --- a/spec/vmpooler/pool_manager_spec.rb +++ b/spec/vmpooler/pool_manager_spec.rb @@ -24,10 +24,9 @@ context 'host not in pool' do it 'calls fail_pending_vm' do - allow(pool_helper).to receive(:find_vm).and_return(nil) + allow(vsphere).to receive(:find_vm).and_return(nil) allow(redis).to receive(:hget) - expect(redis).to receive(:hget).with(String, 'clone').once - subject._check_pending_vm(vm, pool, timeout) + subject._check_pending_vm(vm, pool, timeout, vsphere) end end @@ -36,16 +35,14 @@ let(:tcpsocket) { double('TCPSocket') } it 'calls move_pending_vm_to_ready' do - stub_const("TCPSocket", tcpsocket) - - allow(pool_helper).to receive(:find_vm).and_return(vm_finder) + allow(subject).to receive(:open_socket).and_return(true) + allow(vsphere).to receive(:find_vm).and_return(vm_finder) allow(vm_finder).to receive(:summary).and_return(nil) - allow(tcpsocket).to receive(:new).and_return(true) expect(vm_finder).to receive(:summary).once expect(redis).not_to receive(:hget).with(String, 'clone') - subject._check_pending_vm(vm, pool, timeout) + subject._check_pending_vm(vm, pool, timeout, vsphere) end end end @@ -156,16 +153,16 @@ end it 'does nothing with nil host' do - allow(pool_helper).to receive(:find_vm).and_return(nil) + allow(vsphere).to receive(:find_vm).and_return(nil) expect(redis).not_to receive(:smove) - subject._check_running_vm(vm, pool, timeout) + subject._check_running_vm(vm, pool, timeout, vsphere) end context 'valid host' do let(:vm_host) { double('vmhost') } it 'does not move vm when not poweredOn' do - allow(pool_helper).to receive(:find_vm).and_return vm_host + allow(vsphere).to receive(:find_vm).and_return vm_host allow(vm_host).to receive(:runtime).and_return true allow(vm_host).to receive_message_chain(:runtime, :powerState).and_return 'poweredOff' @@ -173,11 +170,11 @@ expect(redis).not_to receive(:smove) expect(logger).not_to receive(:log).with('d', "[!] [#{pool}] '#{vm}' appears to be powered off or dead") - subject._check_running_vm(vm, pool, timeout) + subject._check_running_vm(vm, pool, timeout, vsphere) end it 'moves vm when poweredOn, but past TTL' do - allow(pool_helper).to receive(:find_vm).and_return vm_host + allow(vsphere).to receive(:find_vm).and_return vm_host allow(vm_host).to receive(:runtime).and_return true allow(vm_host).to receive_message_chain(:runtime, :powerState).and_return 'poweredOn' @@ -185,7 +182,7 @@ expect(redis).to receive(:smove) expect(logger).to receive(:log).with('d', "[!] [#{pool}] '#{vm}' reached end of TTL after #{timeout} hours") - subject._check_running_vm(vm, pool, timeout) + subject._check_running_vm(vm, pool, timeout, vsphere) end end end @@ -228,6 +225,7 @@ allow(redis).to receive(:smembers).with('vmpooler__running__pool1').and_return([]) allow(redis).to receive(:smembers).with('vmpooler__completed__pool1').and_return([]) allow(redis).to receive(:smembers).with('vmpooler__discovered__pool1').and_return([]) + allow(redis).to receive(:smembers).with('vmpooler__migrating__pool1').and_return([]) allow(redis).to receive(:set) allow(redis).to receive(:get).with('vmpooler__tasks__clone').and_return(0) allow(redis).to receive(:get).with('vmpooler__empty__pool1').and_return(nil) @@ -240,7 +238,7 @@ allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(0) expect(logger).to receive(:log).with('s', "[!] [pool1] is empty") - subject._check_pool(config[:pools][0]) + subject._check_pool(config[:pools][0], vsphere) end end end @@ -277,7 +275,7 @@ expect(metrics).to receive(:gauge).with('ready.pool1', 1) expect(metrics).to receive(:gauge).with('running.pool1', 5) - subject._check_pool(config[:pools][0]) + subject._check_pool(config[:pools][0], vsphere) end it 'increments metrics when ready with 0 when pool empty' do @@ -288,7 +286,7 @@ expect(metrics).to receive(:gauge).with('ready.pool1', 0) expect(metrics).to receive(:gauge).with('running.pool1', 5) - subject._check_pool(config[:pools][0]) + subject._check_pool(config[:pools][0], vsphere) end end end @@ -307,13 +305,13 @@ let(:vm_host) { double('vmhost') } it 'creates a snapshot' do - expect(pool_helper).to receive(:find_vm).and_return vm_host + expect(vsphere).to receive(:find_vm).and_return vm_host expect(logger).to receive(:log) expect(vm_host).to receive_message_chain(:CreateSnapshot_Task, :wait_for_completion) expect(redis).to receive(:hset).with('vmpooler__vm__testvm', 'snapshot:testsnapshot', Time.now.to_s) expect(logger).to receive(:log) - subject._create_vm_snapshot('testvm', 'testsnapshot') + subject._create_vm_snapshot('testvm', 'testsnapshot', vsphere) end end end @@ -333,13 +331,13 @@ let(:vm_snapshot) { double('vmsnapshot') } it 'reverts a snapshot' do - expect(pool_helper).to receive(:find_vm).and_return vm_host - expect(pool_helper).to receive(:find_snapshot).and_return vm_snapshot + expect(vsphere).to receive(:find_vm).and_return vm_host + expect(vsphere).to receive(:find_snapshot).and_return vm_snapshot expect(logger).to receive(:log) expect(vm_snapshot).to receive_message_chain(:RevertToSnapshot_Task, :wait_for_completion) expect(logger).to receive(:log) - subject._revert_vm_snapshot('testvm', 'testsnapshot') + subject._revert_vm_snapshot('testvm', 'testsnapshot', vsphere) end end end @@ -357,7 +355,7 @@ expect(redis).to receive(:spop).with('vmpooler__tasks__snapshot') expect(redis).to receive(:spop).with('vmpooler__tasks__snapshot-revert') - subject._check_snapshot_queue + subject._check_snapshot_queue(vsphere) end end end diff --git a/vmpooler.yaml.example b/vmpooler.yaml.example index 4e5489123..c83808c12 100644 --- a/vmpooler.yaml.example +++ b/vmpooler.yaml.example @@ -225,6 +225,13 @@ # If set, prefixes all created VMs with this string. This should include # a separator. # (optional; default: '') +# +# - migration_limit +# When set to any value greater than 0 enable VM migration at checkout. +# When enabled this capability will evaluate a VM for migration when it is requested +# in an effort to maintain a more even distribution of load across compute resources. +# The migration_limit ensures that no more than n migrations will be evaluated at any one time +# and greatly reduces the possibilty of VMs ending up bunched together on a particular host. # Example: