C++多进程并发框架(1)(2)
如何注册服务和接口
来看一下Echo 服务的实现:
- struct echo_service_t
- {
- public:
- void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_)
- {
- logtrace((FF, "echo_service_t::echo done value<%s>", in_msg_.value.c_str()));
- echo_t::out_t out;
- out.value = in_msg_.value;
- cb_(out);
- }
- };
- int main(int argc, char* argv[])
- {
- int g_index = 1;
- if (argc > 1)
- {
- g_index = atoi(argv[1]);
- }
- char buff[128];
- snprintf(buff, sizeof(buff), "tcp://%s:%s", "127.0.0.1", "10241");
- msg_bus_t msg_bus;
- assert(0 == singleton_t<msg_bus_t>::instance().open("tcp://127.0.0.1:10241") && "can't connnect to broker");
- echo_service_t f;
- singleton_t<msg_bus_t>::instance().create_service_group("echo");
- singleton_t<msg_bus_t>::instance().create_service("echo", g_index)
- .bind_service(&f)
- .reg(&echo_service_t::echo);
- signal_helper_t::wait();
- singleton_t<msg_bus_t>::instance().close();
- //usleep(1000);
- cout <<"\noh end\n";
- return 0;
- }
- create_service_group 创建一个服务group,一个服务组可能有多个并行的实例
- create_service 以特定的id 创建一个服务实例
- reg 为该服务注册接口
- 接口的定义规范为void echo(echo_t::in_t& in_msg_, rpc_callcack_t<echo_t::out_t>& cb_),第一个参数为输入的消息struct,第二个参数为回调函数的模板特例,模板参数为返回消息的struct 类型。接口无需知道发送消息等细节,只需将结果callback 即可。
- 注册到Broker 后,所有Client都可获取该服务
消息定义的规范
我们约定每个接口(远程或本地都应满足)都包含一个输入消息和一个结果消息。来看一下echo 服务的消息定义:
- struct echo_t
- {
- struct in_t: public msg_i
- {
- in_t():
- msg_i("echo_t::in_t")
- {}
- virtual string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- virtual void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- string value;
- };
- struct out_t: public msg_i
- {
- out_t():
- msg_i("echo_t::out_t")
- {}
- virtual string encode()
- {
- return (init_encoder() << value).get_buff();
- }
- virtual void decode(const string& src_buff_)
- {
- init_decoder(src_buff_) >> value;
- }
- string value;
- };
- };
- 每个接口必须包含in_t消息和out_t消息,并且他们定义在接口名(如echo _t)的内部
- 所有消息都继承于msg_i, 其封装了二进制的序列化、反序列化等。构造时赋予类型名作为消息的名称。
- 每个消息必须实现encode 和 decode 函数
这里需要指出的是,FFLIB 中不需要为每个消息定义对应的CMD。当接口如echo向Broker 注册时,reg接口通过C++ 模板的类型推断会自动将该msg name 注册给Broker, Broker为每个msg name 分配唯一的msg_id。Msg_bus 中自动维护了msg_name 和msg_id 的映射。Msg_i 的定义如下:
- struct msg_i : public codec_i
- {
- msg_i(const char* msg_name_):
- cmd(0),
- uuid(0),
- service_group_id(0),
- service_id(0),
- msg_id(0),
- msg_name(msg_name_)
- {}
- void set(uint16_t group_id, uint16_t id_, uint32_t uuid_, uint16_t msg_id_)
- {
- service_group_id = group_id;
- service_id = id_;
- uuid = uuid_;
- msg_id = msg_id_;
- }
- uint16_t cmd;
- uint16_t get_group_id() const{ return service_group_id; }
- uint16_t get_service_id() const{ return service_id; }
- uint32_t get_uuid() const{ return uuid; }
- uint16_t get_msg_id() const{ return msg_id; }
- const string& get_name() const
- {
- if (msg_name.empty() == false)
- {
- return msg_name;
- }
- return singleton_t<msg_name_store_t>::instance().id_to_name(this->get_msg_id());
- }
- void set_uuid(uint32_t id_) { uuid = id_; }
- void set_msg_id(uint16_t id_) { msg_id = id_;}
- void set_sgid(uint16_t sgid_) { service_group_id = sgid_;}
- void set_sid(uint16_t sid_) { service_id = sid_; }
- uint32_t uuid;
- uint16_t service_group_id;
- uint16_t service_id;
- uint16_t msg_id;
- string msg_name;
- virtual string encode(uint16_t cmd_)
- {
- this->cmd = cmd_;
- return encode();
- }
- virtual string encode() = 0;
- bin_encoder_t& init_encoder()
- {
- return encoder.init(cmd) << uuid << service_group_id << service_id<< msg_id;
- }
- bin_encoder_t& init_encoder(uint16_t cmd_)
- {
- return encoder.init(cmd_) << uuid << service_group_id << service_id << msg_id;
- }
- bin_decoder_t& init_decoder(const string& buff_)
- {
- return decoder.init(buff_) >> uuid >> service_group_id >> service_id >> msg_id;
- }
- bin_decoder_t decoder;
- bin_encoder_t encoder;
- };
关于性能
由于远程接口的调用必须通过Broker, Broker会为每个接口自动生成性能统计数据,并每10分钟输出到perf.txt 文件中。文件格式为CSV,参见:
http://www.cnblogs.com/zhiranok/archive/2012/06/06/cpp_perf.html
总结
FFLIB框架拥有如下的特点:
- 使用多进程并发。Broker 把Client 和Service 的位置透明化
- Service 的接口要注册到Broker, 所有连接Broker的Client 都可以调用(publisher/ subscriber)
- 远程调用必须绑定回调函数
- 利用future 模式实现同步,从而支持单元测试
- 消息定义规范简单直接高效
- 所有service的接口性能监控数据自动生成,免费的午餐
- Service 单线程话,更simplicity
源代码:
Svn co http://ffown.googlecode.com/svn/trunk/
运行示例:
- Cd example/broker && make && ./app_broker –l http://127.0.0.1:10241
- Cd example/echo_server && make && ./app_echo_server
- Cd example/echo_client && make && ./app_echo_client
【编辑推荐】