Class | MCollective::Client |
In: |
lib/mcollective/client.rb
|
Parent: | Object |
Helpers for writing clients that can talk to agents, do discovery and so forth
options | [RW] | |
stats | [RW] |
# File lib/mcollective/client.rb, line 6 6: def initialize(configfile) 7: @config = Config.instance 8: @config.loadconfig(configfile) unless @config.configured 9: 10: @connection = PluginManager["connector_plugin"] 11: @security = PluginManager["security_plugin"] 12: 13: @security.initiated_by = :client 14: @options = nil 15: @subscriptions = {} 16: 17: @connection.connect 18: end
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb, line 22 22: def collective 23: if @options[:collective].nil? 24: @config.main_collective 25: else 26: @options[:collective] 27: end 28: end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb, line 31 31: def disconnect 32: Log.debug("Disconnecting from the middleware") 33: @connection.disconnect 34: end
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
# File lib/mcollective/client.rb, line 113 113: def discover(filter, timeout, limit=0) 114: raise "Limit has to be an integer" unless limit.is_a?(Fixnum) 115: 116: begin 117: hosts = [] 118: Timeout.timeout(timeout) do 119: reqid = sendreq("ping", "discovery", filter) 120: Log.debug("Waiting #{timeout} seconds for discovery replies to request #{reqid}") 121: 122: loop do 123: reply = receive(reqid) 124: Log.debug("Got discovery reply from #{reply.payload[:senderid]}") 125: hosts << reply.payload[:senderid] 126: 127: return hosts if limit > 0 && hosts.size == limit 128: end 129: end 130: rescue Timeout::Error => e 131: rescue Exception => e 132: raise 133: ensure 134: unsubscribe("discovery", :reply) 135: end 136: 137: hosts.sort 138: end
Performs a discovery and then send a request, performs the passed block for each response
times = discovered_req("status", "mcollectived", options, client) {|resp| pp resp }
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 200 200: def discovered_req(body, agent, options=false) 201: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 202: 203: options = @options unless options 204: 205: STDOUT.sync = true 206: 207: print("Determining the amount of hosts matching filter for #{options[:disctimeout]} seconds .... ") 208: 209: begin 210: discovered_hosts = discover(options[:filter], options[:disctimeout]) 211: discovered = discovered_hosts.size 212: hosts_responded = [] 213: hosts_not_responded = discovered_hosts 214: 215: stat[:discoverytime] = Time.now.to_f - stat[:starttime] 216: 217: puts("#{discovered}\n\n") 218: rescue Interrupt 219: puts("Discovery interrupted.") 220: exit! 221: end 222: 223: raise("No matching clients found") if discovered == 0 224: 225: begin 226: Timeout.timeout(options[:timeout]) do 227: reqid = sendreq(body, agent, options[:filter]) 228: 229: (1..discovered).each do |c| 230: resp = receive(reqid) 231: 232: hosts_responded << resp.payload[:senderid] 233: hosts_not_responded.delete(resp.payload[:senderid]) if hosts_not_responded.include?(resp.payload[:senderid]) 234: 235: yield(resp.payload) 236: end 237: end 238: rescue Interrupt => e 239: rescue Timeout::Error => e 240: end 241: 242: stat[:totaltime] = Time.now.to_f - stat[:starttime] 243: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 244: stat[:responses] = hosts_responded.size 245: stat[:responsesfrom] = hosts_responded 246: stat[:noresponsefrom] = hosts_not_responded 247: stat[:discovered] = discovered 248: 249: @stats = stat 250: return stat 251: end
Prints out the stats returns from req and discovered_req in a nice way
# File lib/mcollective/client.rb, line 254 254: def display_stats(stats, options=false, caption="stomp call summary") 255: options = @options unless options 256: 257: if options[:verbose] 258: puts("\n---- #{caption} ----") 259: 260: if stats[:discovered] 261: puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 262: else 263: puts(" Nodes: #{stats[:responses]}") 264: end 265: 266: printf(" Start Time: %s\n", Time.at(stats[:starttime])) 267: printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 268: printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 269: printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 270: 271: else 272: if stats[:discovered] 273: printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 274: else 275: printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 276: end 277: end 278: 279: if stats[:noresponsefrom].size > 0 280: puts("\nNo response from:\n") 281: 282: stats[:noresponsefrom].each do |c| 283: puts if c % 4 == 1 284: printf("%30s", c) 285: end 286: 287: puts 288: end 289: end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you‘ve previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb, line 83 83: def receive(requestid = nil) 84: reply = nil 85: 86: begin 87: reply = @connection.receive 88: reply.type = :reply 89: reply.expected_msgid = requestid 90: 91: reply.decode! 92: 93: reply.payload[:senderid] = Digest::MD5.hexdigest(reply.payload[:senderid]) if ENV.include?("MCOLLECTIVE_ANON") 94: 95: raise(MsgDoesNotMatchRequestID, "Message reqid #{requestid} does not match our reqid #{reply.requestid}") unless reply.requestid == requestid 96: rescue SecurityValidationFailed => e 97: Log.warn("Ignoring a message that did not pass security validations") 98: retry 99: rescue MsgDoesNotMatchRequestID => e 100: Log.debug("Ignoring a message for some other client") 101: retry 102: end 103: 104: reply 105: end
Send a request, performs the passed block for each response
times = req("status", "mcollectived", options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb, line 148 148: def req(body, agent=nil, options=false, waitfor=0) 149: if body.is_a?(Message) 150: agent = body.agent 151: options = body.options 152: waitfor = body.discovered_hosts.size || 0 153: end 154: 155: stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 156: 157: options = @options unless options 158: 159: STDOUT.sync = true 160: 161: hosts_responded = 0 162: 163: begin 164: Timeout.timeout(options[:timeout]) do 165: reqid = sendreq(body, agent, options[:filter]) 166: 167: loop do 168: resp = receive(reqid) 169: 170: hosts_responded += 1 171: 172: yield(resp.payload) 173: 174: break if (waitfor != 0 && hosts_responded >= waitfor) 175: end 176: end 177: rescue Interrupt => e 178: rescue Timeout::Error => e 179: ensure 180: unsubscribe(agent, :reply) 181: end 182: 183: stat[:totaltime] = Time.now.to_f - stat[:starttime] 184: stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 185: stat[:responses] = hosts_responded 186: stat[:noresponsefrom] = [] 187: 188: @stats = stat 189: return stat 190: end
Sends a request and returns the generated request id, doesn‘t wait for responses and doesn‘t execute any passed in code blocks for responses
# File lib/mcollective/client.rb, line 38 38: def sendreq(msg, agent, filter = {}) 39: if msg.is_a?(Message) 40: request = msg 41: agent = request.agent 42: else 43: ttl = @options[:ttl] || @config.ttl 44: request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) 45: end 46: 47: request.encode! 48: 49: Log.debug("Sending request #{request.requestid} to the #{request.agent} agent with ttl #{request.ttl} in collective #{request.collective}") 50: 51: subscribe(agent, :reply) 52: 53: request.publish 54: 55: request.requestid 56: end
# File lib/mcollective/client.rb, line 58 58: def subscribe(agent, type) 59: unless @subscriptions.include?(agent) 60: subscription = Util.make_subscriptions(agent, type, collective) 61: Log.debug("Subscribing to #{type} target for agent #{agent}") 62: 63: Util.subscribe(subscription) 64: @subscriptions[agent] = 1 65: end 66: end
# File lib/mcollective/client.rb, line 68 68: def unsubscribe(agent, type) 69: if @subscriptions.include?(agent) 70: subscription = Util.make_subscriptions(agent, type, collective) 71: Log.debug("Unsubscribing #{type} target for #{agent}") 72: 73: Util.unsubscribe(subscription) 74: @subscriptions.delete(agent) 75: end 76: end