新书推介:《语义网技术体系》
作者:瞿裕忠,胡伟,程龚
   XML论坛     W3CHINA.ORG讨论区     计算机科学论坛     SOAChina论坛     Blog     开放翻译计划     新浪微博  
 
  • 首页
  • 登录
  • 注册
  • 软件下载
  • 资料下载
  • 核心成员
  • 帮助
  •   Add to Google

    >> 操作系统研究。UEFI
    [返回] 中文XML论坛 - 专业的XML技术讨论区计算机理论与工程『 操作系统原理 』 → Heartbeat 通信层结构分析[转帖] 查看新帖用户列表

      发表一个新主题  发表一个新投票  回复主题  (订阅本版) 您是本帖的第 5810 个阅读者浏览上一篇主题  刷新本主题   树形显示贴子 浏览下一篇主题
     * 贴子主题: Heartbeat 通信层结构分析[转帖] 举报  打印  推荐  IE收藏夹 
       本主题类别:     
     hi2008 帅哥哟,离线,有人找我吗?
      
      
      等级:大一新生
      文章:2
      积分:71
      门派:XML.ORG.CN
      注册:2007/4/25

    姓名:(无权查看)
    城市:(无权查看)
    院校:(无权查看)
    给hi2008发送一个短消息 把hi2008加入好友 查看hi2008的个人资料 搜索hi2008在『 操作系统原理 』的所有贴子 引用回复这个贴子 回复这个贴子 查看hi2008的博客楼主
    发贴心情 Heartbeat 通信层结构分析[转帖]

    原贴:
    [url]http://www.gd-linux.org/bbs/showthread.php?t=4915 [/url]

    Heartbeat 通信层结构分析

    广东省Linux公共服务技术支持中心
    蔡强
    前言
      Heartbeat是Linux-HA开源项目发布的用于关键应用环境的HA软件名称。从1999以来到现在, 历经1.2.x, 2.0.x等多个版本,在全球开源HA领域具有举足轻重的知名度, 应用日益广泛, 并且得到了一些主流Linux操作系统厂商的支持。
    而通信层实现无疑是集群软件运行的最基本底层支撑。本文对通过分析Heartbeat源码,对其通信层基本结构和机制进行了分析和阐述。给出了基本数据结构和实现流程。
      所有分析基于Heartbeat 2.0.4版本。
    相关源码:[url]http://www.linux-ha.org/download/heartbeat-2.0.4.tar.gz [/url]


    Heartbeat通信结构概述
      主要分2种:
      1.HBcomm 通信层PLUGIN (节点之间的进程通信)
    实现主要是在各个媒介的Plugin里,通过PILS动态连接库加载。比如支持多播,单播,串口等通信方式。所有节点间通信PLUGIN模块放在lib/plugins/hbcomm/路径下。
      2.Unix Domain Socket (节点内的进程通信)
      /include/clplumbing/Ipc.h, IPC抽象层数据结构定义
      /lib/clplumbing/ocf_ipc.c, IPC底层抽象实现
      /lib/clplumbing/ipcsocket.c, IPC的unix域套接字具体实现

    交叉点:
      节点间和节点内2种通信方式的交接点在heartbeat.c的read_child(), write_child()等函数中, 在这里实现消息的转发。
    Heartbeat API
      是基于ipc抽象层的Unix域实现基础上,用于满足heartbeat和client子模块之间的应用层通信需求。:
      client_lib.c实现了Heartbeat API 客户端部分。
      hb_api.c实现了heartbeat API服务器端部分。
    按此在新窗口浏览图片
     图1:Heartbeat通信结构概图
    上图描述了一个client子模块把消息通过Heartbeat通信机制发送到另一个节点相同模块的过程。
      1. client子模块通过FIFO管道把消息发送到FIFO子进程fifo_child。为什么使用FIFO来进行通信呢,应该是有些进程不能很方便的和Heartbeat主进程建立Unix域IPC通道的关系,比如执行的脚本和集群管理程序, 集群状态查询程序。
      2. FIFO子进程通过msgfromstream()从fifo管道收到消息后,利用事先建立好的和Heartbeat之间的IPC通道转发给Heartbeat主进程
      3. 主进程判断消息是发给自己的则调用process_msg()进行处理,否则调用send_to_all_media()通过各个媒介的wchan通道发送给write_child子进程。
      4. write_child子进程通过ipcmsgfromIPC()从主进程收到消息,调用各个媒介结构hb_media的write函数把消息发送到集群其他节点。
      5. 其他节点的read_child子进程通过各个媒介结构hb_media的read函数读到消息后,使用事先和Heartbeat主进程建立的IPC通道发送消息到Heartbeat主进程
      6. Heartbeat主进程通过msgfromIPC()收到消息后,调用process_clustermsg()函数进行处理。具体为,如果是主进程处理的消息调用HBDoMsgCallback进行处理,否则通过newstartha_monitor发送到各个client子进程


    节点间通信Plugin
      代码在lib/plugins/hbcomm/目录中
      bcast.c /* 广播 */
      mcast.c /* 多播 */
      ucast.c /* 单播 */
      openais.c /* openais */
      serial.c /* 串口 */
      ping.c /* icmp */
      ping_group.c /* ping一组主机 */
      hbaping.c /* 光纤总线适配器ping */

    /* 这个结构的每个函数对应Plugin里的具体函数。*/
    struct hb_media_fns {
    struct hb_media*(*new) (const char * token); /* 建立媒介 */
    int (*parse) (const char * options); /* 读取配置文件参数 */
    int (*open) (struct hb_media *mp); /* 打开 */
    int (*close) (struct hb_media *mp); /* 关闭 */
    void* (*read) (struct hb_media *mp, int *len ); /* 读 */
    int (*write) (struct hb_media *mp , void *msg, int len); /* 写 */
    int (*mtype) (char **buffer); /* 获取媒介类型 */
    int (*descr) (char **buffer); /* 获取媒介描述 */
    int (*isping) (void); /* 是否ping类型媒介 */
    };

    hb_media_fns各功能函数调用之处:
    new(): config.c的add_option函数
    parse(): config.c的parse_config函数
    open(): heartbeat.c的initialize_heartbeat函数
    close(): heartbeat.c的initialize_heartbeat函数
    read(): heartbeat.c的read_child函数
    write(): heartbeat.c的write_child函数
    mtype(): config.c的parse_config函数
    descr(): config.c的parse_config函数
    isping(): 在config.c和hb_api.c被调用

    节点内IPC通信
    IPC通信抽象层(include\clplumbing\ipc.h)
    IPC抽象层数据结构概述:
    (注:缩进的为该数据结构所属元素)
    IPC_AUTH /* 安全认证数据结构 */
    IPC_WAIT_CONNECTION /* 等待连接数据结构 */
    IPC_WAIT_OPS /* 等待连接函数集 */
    IPC_CHANNEL /* 通信管道数据结构 */
    IPC_OPS /* 通信管道函数集 */
    IPC_QUEUE /* 信息队列 */
    ipc_bufpool /* 接收缓冲池,经处理转化为接收队列 */
    IPC_MESSAGE /* IPC通信信息数据结构 */
    IPC_CHANNEL /* 信息所属通信管道 */
    SOCKET_MSG_HEAD /* 信息头数据结构 */


    其中2种主要抽象数据结构:
    /* server端等待客户端的连接 */
    struct IPC_WAIT_CONNECTION{
    int ch_status; /* wait conn. status.*/
    void * ch_private; /* wait conn. private data. */
    IPC_WaitOps *ops; /* wait conn. function table .*/
    };

    /* 活动的通信管道结构 */
    struct IPC_CHANNEL{
    int ch_status; /* 通道状态 */
    pid_t farside_pid; /* 远端 pid */
    void* ch_private; /* channel private data. (may contain conn. info.) */
    IPC_Ops* ops; /* 通道函数集 */
    unsigned int msgpad; /* 信息前缀字节数 */
    unsigned int bytes_remaining; /* 剩余未发送的字节数 */
    gboolean should_send_block; /* */

    /* private: */
    IPC_Queue* send_queue; /* 发送缓冲 */
    IPC_Queue* recv_queue; /* 接收缓冲 */

    /* 接收缓冲池, 经处理后转化为接收信息队列recv_queue */
    struct ipc_bufpool* pool; /* buffer pool */

    /* 发送的流量控制 */
    int high_flow_mark;
    int low_flow_mark;
    void* high_flow_userdata;
    void* low_flow_userdata;
    flow_callback_t high_flow_callback;
    flow_callback_t low_flow_callback;

    int conntype;
    char failreason[MAXFAILREASON];
    };


    IPC抽象层通信
    server端:
    1. 调用ipc_wait_conn_constructor()建立等待连接管道,成功则返回IPC_WaitConnection.
    2. 通过poll/select来轮询客户请求。使用accept_connection接受连接,返回IPC_Channel。

    client端:
    调用ipc_channel_constructor()连接server, 返回IPC_Channel。


    IPC抽象层的UNIX Domain Socket实现

    static struct IPC_OPS socket_ops = {
    destroy: socket_destroy_channel, /* 删除通信管道 */
    initiate_connection: socket_initiate_connection, /* 从client端建立连接 */
    verify_auth: socket_verify_auth, /* 客户端认证信息 */
    assert_auth: socket_assert_auth, /* 断言认证, (未用)*/
    send: socket_send, /* 向管道发送信息 */
    recv: socket_recv, /* 从管道接收信息*/
    waitin: socket_waitin, /* 等待输入信息, (然后读取) */
    waitout: socket_waitout, /* 等待信息输出结束 */
    is_message_pending: socket_is_message_pending, /* 有信息可读或挂断 */
    is_sending_blocked: socket_is_output_pending, /* 输出是否阻塞 */
    resume_io: socket_resume_io, /* 恢复所有可能的ipc操作 */
    get_send_select_fd: socket_get_send_fd, /* 取得发送fd */
    get_recv_select_fd: socket_get_recv_fd, /* 取得接收fd */
    set_send_qlen: socket_set_send_qlen, /* 设置最大发送缓冲长度 */
    set_recv_qlen: socket_set_recv_qlen, /* 设置最大接收缓冲长度*/
    set_high_flow_callback: socket_set_high_flow_callback, /* 高流量callback函数 */
    set_low_flow_callback: socket_set_low_flow_callback, /* 低流量callback函数 */
    new_ipcmsg: socket_new_ipcmsg, /* 返回一个新建立的IPC信息 */
    get_chan_status: socket_get_chan_status, /* 返回管道状态 */
    is_sendq_full: socket_is_sendq_full, /* 发送缓冲是否已满 */
    is_recvq_full: socket_is_recvq_full, /* 接收缓冲是否已满 */
    get_conntype: socket_get_conntype, /* 返回管道类型 */
    /* 可以是IPC_SERVER , IPC_CLIENT , IPC_PEER */
    };


    节点间通信Plugin / 节点内通信交叉点
    主要实现代码在heartbeart.c中

    Heartbeat通信媒介结构
    struct hb_media {
    void * pd; /* 自定义数据结构 */
    const char * name; /* 媒介名 */
    char* type; /* 媒介类型 */
    char* description; /* 媒介描述 */
    const struct hb_media_fns*vf; /* hbcomm媒介处理函数集 */
    IPC_Channel* wchan[2]; /* Unix域写子进程通信管道 */
    IPC_Channel* rchan[2]; /* Unix域读子进程通信管道 */
    };


    /* heartbeat发送信息集群 */
    /* 1.发送消息到write_child子进程 */
    send_cluster_msg{ /* 发送信息到集群 */

    process_outbound_packet{ /* 带包重传控制 */
    send_to_all_media{ /* 发送到所有媒介 */
    for (j=0; j < nummedia; ++j) {
    IPC_Channel* wch = sysmedia[j]->wchan[P_WRITEFD];

    /* 发送到特定传送媒介的写子进程 */
    wrc=wch->ops->send(wch, outmsg);
    }
    }
    }
    }

    /* 2. write_child写子进程发送消息到集群 */
    write_child(){
    IPC_Channel* ourchan = mp->wchan[P_READFD];
    for(; ; ){
    /* write_child通过Unix Domain Socket 接收heartbeat信息 */
    IPC_Message* ipcmsg = ipcmsgfromIPC(ourchan); /* 调用ops->recv() */

    /* 发送到集群其他节点 */
    if (mp->vf->write(mp, ipcmsg->msg_body, ipcmsg->msg_len) != HA_OK) {
    ……
    }
    }
    }


    /* 从集群接收信息 */
    /* 1. read_child读子进程从集群接收消息 */
    Read_child(){
    IPC_Channel* ourchan = mp->rchan[P_READFD];
    For(;; ){
    /* 从hbcomm PLUGIN接收 */
    if ((pkt=mp->vf->read(mp, &pktlen)) == NULL) {
    ……
    }
    if (NULL != imsg){
    /* read_child子进程通过UNIX Domain Socket, 发送到heartbeat */
    rc = ourchan->ops->send(ourchan, imsg);
    rc2 = ourchan->ops->waitout(ourchan);

    }
    }
    }

    /* 2. heartbeat从read_child子进程接收信息并进行处理 */
    s = G_main_add_IPC_Channel(PRI_READPKT
    , sysmedia[j]->rchan[P_WRITEFD], FALSE
    , read_child_dispatch, sysmedia+j, NULL);
    read_child_dispatch(){

    msg = msgfromIPC(source, MSG_NEEDAUTH); /*调用ops->recv()从read_child读 */
    process_clustermsg(msg, lnk); /* 对读到信息进行处理 */
    }

    heartbeat API Server端
    struct api_query_handler query_handler_list[] = {
    {API_SIGNOFF, api_signoff}, /* client登陆 */
    {API_SETFILTER, api_setfilter}, /* 设置消息过滤 */
    {API_SETSIGNAL, api_setsignal}, /* 设置消息到达信号通知 */
    {API_NODELIST, api_nodelist}, /* 获取节点列表 */
    {API_NODESTATUS, api_nodestatus}, /* 查询节点状态 */
    {API_NODETYPE, api_nodetype}, /* 查询节点类型 */
    {API_IFSTATUS, api_ifstatus}, /* 查询心跳状态 */
    {API_IFLIST, api_iflist}, /* 查询心跳列表 */
    {API_CLIENTSTATUS, api_clientstatus}, /* 查询client模块状态 */
    {API_NUMNODES, api_num_nodes}, /* 返回集群普通节点数 */
    {API_GETPARM, api_get_parameter}, /* 返回特定参数值 */
    {API_GETRESOURCES, api_get_resources}, /* 返回资源状态(兼容1.2.x以前版本) */
    {API_GETUUID, api_get_uuid}, /* 取得节点uuid值 */
    {API_GETNAME, api_get_nodename}, /* 取得节点名 */
    {API_SET_SENDQLEN, api_set_sendqlen} /* 设置发送队列长度 */
    };


    heartbeat API client端
    static struct llc_ops heartbeat_ops = {
    signon: hb_api_signon, /* 注册新的heartbeat client */
    signoff: hb_api_signoff, /* 注销一个heartbeat client */
    delete: hb_api_delete, /* 注销结构 */
    set_msg_callback: set_msg_callback, /* 设置某信息类型callback */
    set_nstatus_callback: set_nstatus_callback, /* 设置节点状态类型callback */
    set_ifstatus_callback: set_ifstatus_callback, /* 设置心跳状态类型callback */
    set_cstatus_callback: set_cstatus_callback, /* 设置client状态类型callback */
    init_nodewalk: init_nodewalk, /* 初始化节点遍历 */
    nextnode: nextnode, /* 下一个节点 */
    end_nodewalk: end_nodewalk, /* 结束节点遍历 */
    node_status: get_nodestatus, /* 节点当前状态 */
    node_type: get_nodetype, /* 节点类型 */
    init_ifwalk: init_ifwalk, /* 初始化心跳遍历 */
    nextif: nextif, /* 下一个心跳接口 */
    end_ifwalk: end_ifwalk, /* 结束心跳遍历 */
    if_status: get_ifstatus, /* 心跳当前状态 */
    client_status: get_clientstatus, /* client当前状态 */
    get_uuid_by_name: get_uuid_by_name, /* 根据名字取得uuid */
    get_name_by_uuid: get_name_by_uuid, /* 根据uuid取得名字 */
    sendclustermsg: sendclustermsg, /* 发送消息到cluster中所有成员*/
    sendnodemsg: sendnodemsg, /* 发送消息到特定节点 */
    sendnodemsg_byuuid: sendnodemsg_byuuid, /* 发送消息到特定节点(by uuid)*/
    send_ordered_clustermsg:send_ordered_clustermsg, /* 发送顺序集群信息 */
    send_ordered_nodemsg: send_ordered_nodemsg, /* 发送顺序节点信息 */
    inputfd: get_inputfd, /* 返回和检测信息到达*/
    ipcchan: get_ipcchan, /* 返回IPC_Channel 类型ipc通道 */
    msgready: msgready, /* 当有信息可读时返回true*/
    setmsgsignal: hb_api_setsignal, /* setmsgsignal */
    rcvmsg: rcvmsg, /* 接收msg, 交给callback处理 */
    readmsg: read_msg_w_callbacks, /* 返回没有注册callback的msg */
    setfmode: setfmode, /* setfmode */
    get_parameter: get_parameter,
    get_deadtime: get_deadtime,
    get_keepalive: get_keepalive,
    get_mynodeid: get_mynodeid, /* 取得本地节点名 */
    get_logfacility: get_logfacility, /* suggested logging facility */
    get_resources: get_resources, /* 取得资源当前分布状态 */
    chan_is_connected: chan_is_connected,
    set_sendq_len: set_sendq_len, /* 设置发送缓存区长度 */
    set_send_block_mode: socket_set_send_block_mode,
    errmsg: APIError,
    };
    注:
    Client端API函数集明显比Server端查询处理函数集要多,是因为有些功能不需要通过Server端查询来得到。

    原贴:
    [url]http://www.gd-linux.org/bbs/showthread.php?t=4915 [/url]


       收藏   分享  
    顶(0)
      




    点击查看用户来源及管理<br>发贴IP:*.*.*.* 2007/4/29 11:50:00
     
     GoogleAdSense
      
      
      等级:大一新生
      文章:1
      积分:50
      门派:无门无派
      院校:未填写
      注册:2007-01-01
    给Google AdSense发送一个短消息 把Google AdSense加入好友 查看Google AdSense的个人资料 搜索Google AdSense在『 操作系统原理 』的所有贴子 访问Google AdSense的主页 引用回复这个贴子 回复这个贴子 查看Google AdSense的博客广告
    2024/4/29 2:16:51

    本主题贴数1,分页: [1]

    管理选项修改tag | 锁定 | 解锁 | 提升 | 删除 | 移动 | 固顶 | 总固顶 | 奖励 | 惩罚 | 发布公告
    W3C Contributing Supporter! W 3 C h i n a ( since 2003 ) 旗 下 站 点
    苏ICP备05006046号《全国人大常委会关于维护互联网安全的决定》《计算机信息网络国际联网安全保护管理办法》
    1,925.781ms