Write SimpleRPC Client
编写SimpleRPC客户端
正如在SimpleRPC中所指出一样,可以使用mco rpc命令行工具来调用代理并让它们尽力以合理方式显示结果。如果这些不够可以写自己的代理。
如果你能坚持SimpleRPC的规则,SimpleRPC客户端能够做正常客户端所做的大多数事情并能使其变得很简单。
基本客户端
客户端大多数都是一个在代码中采用Ruby Mixin的帮助函数的分支,它提供以下:
准命令行选项来解析帮助输出
能够添加命令行选项
能够访问到代理和它的动作
帮助输出结果的工具
输出统计的工具
构建自己的过滤器的工具
在保留MCollective::Client功能的前提下附加其他功能
能够很简单也能很复杂
简单客户端示例
下面是一个为之前Helloworld代理所写的客户端,它调用helloworld代理并打印结果。
#!/usr/bin/ruby
require 'mcollective'
include MCollective::RPC
mc = rpcclient("helloworld")
printrpc mc.echo(:msg => "Welcome")
printrpcstats
mc.disconnect
将这个文件以hello.rb保存在libdir/mcollective/application/目录下,以mco hello执行即可。
[root@master application]# mco hello
* [ ============================================================> ] 1 / 1
Finished processing 1 / 1 hosts in 61.16 ms
这个执行之前说过正常比如说在1000台机器上它只会返回执行的进度和错误信息,如果想看到正确执行信息只需-v即可。
下面对上面的代码逐行解释:
include MCollective::RPC
mc = rpcclient("helloworld")
第一行很熟系的include很多帮助函数,我在查看源码时看到关于MCollective::RPC的介绍,这是一个使用RPC框架创建客户端和代理的标准接口工具集,标准兼容代理能够很容易创建像web 接口这样的通用客户端。
代码首先加载了很多其他模块,这些则是具体功能模块
autoload :Client, "mcollective/rpc/client"
autoload :Agent, "mcollective/rpc/agent"
autoload :Reply, "mcollective/rpc/reply"
autoload :Request, "mcollective/rpc/request"
autoload :Audit, "mcollective/rpc/audit"
autoload :Progress, "mcollective/rpc/progress"
autoload :Stats, "mcollective/rpc/stats"
autoload :DDL, "mcollective/rpc/ddl"
autoload :Result, "mcollective/rpc/result"
autoload :Helpers, "mcollective/rpc/helpers"
autoload :ActionRunner, "mcollective/rpc/actionrunner"
另外还有几个函数是我们写客户端需要用到的函数:
rpcoptions,rpcclient,printrpcstats,printrpc,empty_filter?,request,reply
第二行就为代理'helloworld'创建了新的client,之后你可已通过mc变量来访问。
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
printrpcstats
先是使用mc.echo调用echo这个特定动作,给它传入:msg的参数并将它返回出来。这个参数会随着不同动作而不同,可以看具体的源码。它返回一个简单的数组作为结果。可以以任意形式打印,在后面会讨论。
printrpc,printrpcstats是用于分别打印结果和统计的函数。
mc.disconnect
将客户端从中间件断开连接。如果不这样像ACtiveMQ这样的中间件会记录异常日志。
改变输出
输出冗余结果(详细信息)
mc = rpcclient("helloworld")
mc.discover :verbose => true
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC"), :verbose => true
前面看到如果正常执行,则没有任何输出。但如果加了verbose标志后,不需使用-v选项也能输出所有结果:
[root@master application]# mco hello
Determining the amount of hosts matching filter for 2 seconds .... 1
* [ ============================================================> ] 1 / 1
master.example.com : OK
{:msg=>"Welcome", :out=>nil, :err=>""}
---- rpc stats ----
Nodes: 1 / 1
Pass / Fail: 1 / 0
Start Time: Wed Jul 25 11:13:26 -0400 2012
Discovery Time: 2003.30ms
Agent Time: 119.64ms
Total Time: 2122.94ms
如果你想与所有的客户端交互都输出详细信息,可以:
mc = rpcclient("helloworld")
mc.verbose = true
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
禁用进度条
mc = rpcclient("helloworld")
mc.progress = false
默认会动态显示多台机器执行进度,如果想禁用可以如上操作。
结果保存变量而不打印
stats = mc.echo(:msg => "Welcome to MCollective Simple RPC").stats
report = stats.report
这样标准输出就不会有,但你同样可以查看到该变量的值。
可编程应用过滤器
不仅可以在命令行传入--with-*参数来做为过滤,也可以直接通过编程来实现: mc = rpcclient("helloworld")
mc.class_filter /dev_server/
mc.fact_filter "country", "uk"
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
还可以设置其他过滤器:agentfilter, identityfilter 和 compound_filter
fact_filter还支持其他几种形式:
mc.fact_filter "country=uk"
mc.fact_filter "physicalprocessorcount", "4", ">="
这会限制所有的机器都在UK而且不能超过3个处理器
重置过滤器为空
mc = rpcclient("helloworld")
mc.class_filter /dev_server/
mc.reset_filter
这样所有被包括的代理的过滤器都被重置为空。
批量处理代理
默认情况下客户端会同时与所有机器通信,这在有些情况下可能不是你所想的,比如处理DOS或相关组件。
可以指定客户端以批量的形式来与远程代理交互,在每个批量处理过程中sleep。
客户端都有--batch和--batch-sleep-time命令行选项在程序中也可以编程实现:
mc = rpcclient("helloworld")
mc.batch_size = 10
mc.batch_sleep_time = 5
mc.echo(:msg => "hello world")
或者
mc = rpcclient("helloworld")
mc.echo(:msg => "hello world", :batch_size => 10, :batch_sleep_time => 5)
强制重新发现
默认情况下一个脚本只能执行一次discovery,并重用发现结果。可以通过编程实现使用不同的filter时重新发现:
mc = rpcclient("helloworld")
mc.class_filter /dev_server/
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
mc.reset
mc.fact_filter "country", "uk"
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
这里执行一次mco echo调用会进行两次discovery,并且都是根据不同过滤条件来执行discovery。
应用自己的发现信息
新的消息支持直接消息模式,这时候可以使用自己的发现信息。这用于比如你在向集群部署某个应用时,你知道某个已知的异常在一些机器上会发生,想更快的确定是那个机器发生问题时可这么做,此外经常还设置TTL。
mc = rpcclient("helloworld")
mc.ttl = 3600
mc.discover(:nodes => ["host1", "host2", "host3"]
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
仅向部分发现机器发送请求
默认情况是向所有执行发现之后的机器发送请求。但是如果想把MCollective看成HA服务一样,你可已只针对其中的某些发送请求包括随机与不随机。
前10%选择时:
mc = rpcclient("helloworld")
mc.limit_targets = "10%"
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
随机选择时:
mc = rpcclient("helloworld")
mc.limit_targets = "10%"
mc.limit_method = :random
printrpc mc.echo(:msg => "Welcome to MCollective Simple RPC")
直接处理结果
结果与异常对应表:
状态码 描述 异常类
0 OK
1 OK failed(所有数据解析正确,有某个动作未执行完) RPCAborted
2 Unknown action 未知操作 UnknownRPCAction
3 Missing data 缺失数据 MissingRPCData
4 Invalid data 无效数据 InvalidRPCData
5 Other error 其他错误 UnknownRPCError
SimpleRPC风格结果
SimpleRPC提供由基本客户端库结果修剪的的输出版本,你可以根据需要选择需要返回的结果。这仅仅是ruby语言提供的功能。可以自定义为以下格式:
mc.echo(:msg => "hello world").each do |resp|
printf("%-40s: %s\n", resp[:sender], resp[:data][:msg])
end
输出就像:
dev1.you.net : hello world
dev2.you.net : hello world
dev3.you.net : hello world
echo循环结果数组,并选择需要输出的两项打印,结果是一个哈希数组。
[{:statusmsg=>"OK",
:sender=>"dev1.your.net",
:data=>{:msg => "hello world"},
:statuscode=>0},
{:statusmsg=>"OK",
:sender=>"dev2.your.net",
:data=>{:msg => "hello world"},
:statuscode=>0}]
这里的statuscode与上表中的返回码对应。
获取MCollective::Client访问权限#req 结果
可以实时获取每个结果,这时需要处理异常,这会得到一个不同风格的结果集,这个结果集是所有客户端提供的结果的全部。
mc.echo(:msg => "hello world") do |resp|
begin
printf("%-40s: %s\n", resp[:senderid], resp[:body][:data])
rescue RPCError => e
puts "The RPC agent returned an error: #{e}"
end
end
打印的结果与之前一样,但是结果集却不同。
{:msgtarget=>"/topic/mcollective.helloworld.reply",
:senderid=>"dev2.your.net",
:msgtime=>1261696663,
:hash=>"2d37daf690c4bcef5b5380b1e0c55f0c",
:body=>{:statusmsg=>"OK", :statuscode=>0, :data=>{:msg => "hello world"}},
:requestid=>"2884afb0b52cb38ea4d4a3146d18ef5f",
:senderagent=>"helloworld"}
当然也可以获取SimpleRPC风格结果而显示原生客户端结果集。
mc.echo(:msg => "hello world") do |resp, simpleresp|
begin
printf("%-40s: %s\n", simpleresp[:sender], simpleresp[:data][:msg])
rescue RPCError => e
puts "The RPC agent returned an error: #{e}"
end
end
添加自定义命令行选项
#!/usr/bin/ruby
require 'mcollective'
include MCollective::RPC
options = rpcoptions do |parser, options|
parser.define_head "Generic Echo Client"
parser.banner = "Usage: hello [options] [filters] --msg MSG"
parser.on('-m', '--msg MSG', 'Message to pass') do |v|
options[:msg] = v
end
end
unless options.include?(:msg)
puts("You need to specify a message with --msg")
exit! 1
end
mc = rpcclient("helloworld", :options => options)
mc.echo(:msg => options[:msg]).each do |resp|
printf("%-40s: %s\n", resp[:sender], resp[:data][:msg])
end
执行结果:
[root@master application]# mco hello --msg "we"
* [ ============================================================> ] 1 / 1
master.example.com : we
并且帮助信息:
[root@master application]# mco hello --help
Usage: hello [options] [filters] --msg MSG
Generic Echo Client
-m, --msg MSG Message to pass
--np, --no-progress Do not show the progress bar
-1, --one Send request to only one discovered nodes
--batch SIZE Do requests in batches
......
此外客户端还支持以下功能:
禁用命令行解析提供默认选项
向SimpleRPC发送请求不需发现和阻止
使用自定义发现策略(discovery)
blog comments powered by Disqus