栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 软件开发 > 后端开发 > C/C++/C#

基于Inotify封装的C++事件类

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于Inotify封装的C++事件类

目前在试验过程中发现上层用户使用我们A/B缓冲区的实时数据时因为采用了固定频率轮训的方式从缓冲区拿数据,那么平均下来,数据获取的延迟为1000/freq/2 ms。这种情况在仿真环境中倒是没啥问题,但是到了实物环境中,这点延迟也是要进行优化的。

针对上述需求,可以调整为更短的轮训周期,但是这种方式不太靠谱。为了从根本上解决该问题,我们设计采用事件触发的方式来及时通知用户数据有更新。

这里借鉴的是inotify机制,它是基于inode的一种消息链通知机制,可以有效跟踪文件(目录)粒度的创建、删除、更改、移动等事件。在多进程共同操作一个文件时,主要是读和写在不同的进程中这种生产者/消费者模式,消费者可以通过调用inotify的相关函数来感知到生产者已经将数据准备好。但是,在编程实现方面,这又得借助while()循环来实现,当时间没有来的时候,inotify机制的函数会阻塞,当事件产生后,才能往后执行。

那么,有没有可能通过回调函数来实现上述方式呢,当事件发生时,自动调用回调函数进行处理,这种被动处理比主动轮训可能更节约资源,也更便于理解。

头文件:

#ifndef __SCEvent_H__
#define __SCEvent_H__

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include  
//SPDlog是需要独立安装的轻量级日志系统(so文件和.h文件分别安装到了/usr/local/lib和/usr/local/include,并在.bashrc中export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib),这里采用的是只包含头文件的方式加载


//监控事件定义,对于使用A/B缓冲区的上层应用,这里主要使用_MODIFY_EVENT即可
#define _ACCESS_EVENT      "IN_ACCESS"         //文件/目录被访问
#define _ATTRIB_EVENT      "IN_ATTRIB"         //文件元数据被修改
#define _CLOSE_WR_EVENT    "IN_CLOSE_WRITE"    //以只写方式打开的文件被关闭
#define _CLOSE_NOWR_EVENT  "IN_CLOSE_NOWRITE"  //以只读方式打开的文件被关闭
#define _CREATE_EVNT       "IN_CREATE"         //一个文件/目录 被创建
#define _DELETe_SELF_EVENT "IN_DELETE_SELF"    //一个文件/目录 被删除
#define _MODIFY_EVENT      "IN_MODIFY"         //一个文件被修改
#define _MOVE_SELF_EVENT   "IN_MOVE_SELF"      //被监控的目录/文件本身被移动
#define _MOVED_FROM_EVENT  "IN_MOVED_FROM"     //文件移除到受监控的目录之外
#define _MOVED_TO_EVENT    "IN_MOVED_TO"       //一个文件被一入受监控目录
#define _OPEN_EVNET        "IN_OPEN"           //文件被打开
#define _IGNORED_EVENT     "IN_IGNORED"        //受监控的项目被移除
#define _ISDIR_EVENT       "IN_ISDIR"          //name 返回的是一个目录
#define _UNMOUNT_EVENT     "IN_UNMOUNT"        //受监控文件被卸载
    
//单个监控文件结构体的大小,这里我们只监控一个文件。对于A/B缓冲区而言,它只是一个mmap文件。这个定义将来可以考虑移动到外面去。    
#define A_FILE_SIZE ((sizeof(struct inotify_event) + NAME_MAX + 1))   

//回调函数的别称,eventPrint和eventUnmount都是回调函数,当某事件发生后,该函数自动调用。
//将来的obtainNew或者其封装也要弄成这个样子
typedef void (*EventHandle) (const std::string, void *);

//场景事件类
class SCEvent
{
public:
    
    SCEvent(void);

    
    ~SCEvent(void);

    
    size_t InitWatchFile(std::vector &watch_name, void *par, const uint32_t mask = IN_ALL_EVENTS);


    
    int RmWatchFile(const std::string &rm_file);

    
    
    int StartWatchThread(std::map &funcMap, pthread_t &_pid);
private:
    
    
    static void *watch_file(void *self);

    
    int AddWatchFile(std::vector &watch_name, const uint32_t mask = IN_ALL_EVENTS);

    
    void judgeEventStr(uint32_t mask, std::string &eventDescription);
    
    void judgeEventName(uint32_t mask, std::string &eventName);


    
    int findWD(const std::string &file);

    //重新监控文件
    void Rewatch(const std::string &watch_name);

    


   
    static void eventUnmount(const std::string event_file, void *flags);

    
    static void eventPrint(const std::string, void *);
private:
    int                                  m_inotifyFD;          //句柄
    bool                                 m_isContinue;         //监控进程是否继续执行
    void                                *m_pParameter;         //回调函数扩展参数
    size_t                               m_watchNum;           //监控文件数量
    std::map           m_watchFiles;         //监控文件和 wd 之间的映射
    std::map   m_handleMap;          //事件处理回调函数集合
    std::shared_ptr m_logger; //spdlog日志对象,将来在整个pm框架中之保留一个即可
    bool logEnable;
   // static int count = 0;
};

#endif //__SCEvent_H__

 cpp文件:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 

#include "uInotify.h"
//#include "Tools/logTools.h"

#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_DEBUG
using namespace std;

SCEvent::SCEvent(void) : m_inotifyFD(0), m_isContinue(true), m_pParameter(NULL)
{
    try
    {
        //在logs/basic.txt中写日志
        m_logger = spdlog::basic_logger_mt("sbasic_logger", "logs/basic.txt");
        logEnable = true;
        //my_logger->info("Hello {}", "world");
    }
    catch (const spdlog::spdlog_ex &ex)
    {
        logEnable = false;
        m_logger->~logger();
        std::cout << "Log initialization failed: " << ex.what() << std::endl;
    }
}

SCEvent::~SCEvent(void)
{
}

//初始话监控文件列表
size_t SCEvent::InitWatchFile(std::vector &watch_name, void *par, const uint32_t mask )
{
    m_inotifyFD = inotify_init(); //创建 initify 实例
    if (m_inotifyFD == -1)
    {
        SPDLOG_TRACE(m_logger, "initalize inotify failed.");
        return -1;
    }

    //初始化回调函数集
    m_handleMap[_IGNORED_EVENT] = eventUnmount;
    m_handleMap[_ATTRIB_EVENT] = eventUnmount;
    m_handleMap[_MODIFY_EVENT] = eventPrint;
    m_pParameter = par;

    return AddWatchFile(watch_name, mask);
}

//添加监控文件
int SCEvent::AddWatchFile(std::vector &watch_name, const uint32_t mask )
{
    vector::iterator it = watch_name.begin();
    int wd;
    while (it != watch_name.end())
    {
        string file_buf = *it++;
        wd = inotify_add_watch(m_inotifyFD, file_buf.c_str(), mask);
        if (wd == -1)
        {
            SPDLOG_TRACE("add watch failed.");
            return -1;
        }
        m_watchFiles[wd] = file_buf;
    }
    m_watchNum = m_watchFiles.size();

    return m_watchNum;
}

//删除监控文件
int SCEvent::RmWatchFile(const std::string &rm_file)
{
    int wd = findWD(rm_file);

    return inotify_rm_watch(m_inotifyFD, wd);
}

//开始监控
int SCEvent::StartWatchThread(std::map &func, pthread_t &_pid)
{
    map::iterator it = func.begin();
    while (it != func.end())
    {
        m_handleMap[it->first] = it->second;
        it++;
    }

    _pid = 0;
    return pthread_create(&_pid, NULL, watch_file, (void *)this); //启动监控线程
}

//监控线程
void *SCEvent::watch_file(void *self)
{
    SCEvent *pUCI = (SCEvent *)self;
    ssize_t numRead;

    while (pUCI->m_isContinue)
    {
        //申请特点大小的缓存
        char buf[pUCI->m_watchNum * A_FILE_SIZE];
        bzero(buf, sizeof(buf));
        numRead = read(pUCI->m_inotifyFD, buf, sizeof(buf));
        if (numRead < 0)
        {
            SPDLOG_TRACE("Read inotify failed. what:{} ", strerror(errno));
            return NULL;
        }
        char *p = NULL;
        struct inotify_event *event = NULL;
        for (p = buf; p < (buf + numRead);)
        {
            event = (struct inotify_event *)p;

            string whatEvent, eventName, watchFile;
            pUCI->judgeEventStr(event->mask, whatEvent);  //获取事件描述
            pUCI->judgeEventName(event->mask, eventName); //获取事件名称
            //获取文件名称
            watchFile = pUCI->m_watchFiles[event->wd];
            SPDLOG_TRACE("The event is: {}, the file is: {}", eventName, watchFile);
            if (pUCI->m_handleMap.find(eventName) != pUCI->m_handleMap.end())
            {
                SPDLOG_TRACE("The event have handle is:{},the file is:{} ", eventName, watchFile);
                if ((strcmp(eventName.c_str(), _IGNORED_EVENT) == 0) ||
                    strcmp(eventName.c_str(), _ATTRIB_EVENT) == 0)
                {
                    pUCI->m_handleMap[eventName](watchFile, pUCI); //监控对象自己定义的回调函数
                }
                else
                {
                    pUCI->m_handleMap[eventName](watchFile, pUCI->m_pParameter); //用户提供的回调函数
                }
            }

            p += sizeof(struct inotify_event) + event->len;
        }
    }

    return NULL;
}

//识别事件
void SCEvent::judgeEventStr(uint32_t mask, std::string &event)
{
    if (mask & IN_ACCESS)
        event = "File access";
    if (mask & IN_ATTRIB)
        event = "The file metadata is modified.";
    if (mask & IN_CLOSE_WRITE)
        event = "Close the file open for writing.";
    if (mask & IN_CLOSE_NOWRITE)
        event = "Close the opened the file read-only.";
    if (mask & IN_CREATE)
        event = "Create a file or directory.";
    if (mask & IN_DELETe_SELF)
        event = "Delete a file or directory.";
    if (mask & IN_MODIFY)
        event = "The file was modified.";
    if (mask & IN_MOVE_SELF)
        event = "Move the monitored directory or file itself.";
    if (mask & IN_MOVED_FROM)
        event = "File moved outside of the monitored directory.";
    if (mask & IN_MOVED_TO)
        event = "File moved to of the monitored directory.";
    if (mask & IN_OPEN)
        event = "The file has been opened.";
    if (mask & IN_IGNORED)
        event = "Monitored item is unload.";
    if (mask & IN_ISDIR)
        event = "The name is directory.";
    if (mask & IN_UNMOUNT)
        event = "Target file unload.";
}

void SCEvent::judgeEventName(uint32_t mask, std::string &event)
{
    if (mask & IN_ACCESS)
        event = "IN_ACCESS";
    if (mask & IN_ATTRIB)
        event = "IN_ATTRIB";
    if (mask & IN_CLOSE_WRITE)
        event = "IN_CLOSE_WRITE";
    if (mask & IN_CLOSE_NOWRITE)
        event = "IN_CLOSE_NOWRITE";
    if (mask & IN_CREATE)
        event = "IN_CREATE";
    if (mask & IN_DELETe_SELF)
        event = "IN_DELETE_SELF";
    if (mask & IN_MODIFY)
        event = "IN_MODIFY";
    if (mask & IN_MOVE_SELF)
        event = "IN_MOVE_SELF";
    if (mask & IN_MOVED_FROM)
        event = "IN_MOVED_FROM";
    if (mask & IN_MOVED_TO)
        event = "IN_MOVED_TO";
    if (mask & IN_OPEN)
        event = "IN_OPEN";
    if (mask & IN_IGNORED)
        event = "IN_IGNORED";
    if (mask & IN_ISDIR)
        event = "IN_ISDIR";
    if (mask & IN_UNMOUNT)
        event = "IN_UNMOUNT";
}

//通过文件名找到 wd
int SCEvent::findWD(const std::string &file)
{
    map::iterator it = m_watchFiles.begin();
    while (it != m_watchFiles.end())
    {
        string temp = it->second;
        if (temp == file)
        {
            return it->first;
        }
        it++;
    }
    return -1;
}

//重新监控文件
void SCEvent::Rewatch(const std::string &watch_name)
{
    vector names;
    names.push_back(watch_name);
    AddWatchFile(names); //重新添加
    SPDLOG_TRACE("rewatch: {}", watch_name);
}


void SCEvent::eventUnmount(const std::string event_file, void *flags)
{
    if (flags == NULL)
        return; //什么都不做

    SCEvent *pUI = (SCEvent *)flags;
    int ret = open(event_file.c_str(), O_EXCL, O_RDONLY);
    if (ret < 0)
    {
        if (errno == EEXIST)
        {
            pUI->Rewatch(event_file);
        }
        else
        {
            SPDLOG_TRACE("open err: {}" << strerror(errno));
        }
    }
    else
    {
        pUI->Rewatch(event_file);
    }
}

void SCEvent::eventPrint(const std::string event_file, void *flags)
{
    // m_logger->info(" Modify event");
    string sTimestamp;
    char acTimestamp[256];

    struct timeval tv;
    struct tm *tm;

    gettimeofday(&tv, NULL);

    tm = localtime(&tv.tv_sec);

    sprintf(acTimestamp, "%04d-%02d-%02d %02d:%02d:%02d.%06d
",
            tm->tm_year + 1900,
            tm->tm_mon + 1,
            tm->tm_mday,
            tm->tm_hour,
            tm->tm_min,
            tm->tm_sec,
            (int)(tv.tv_usec ));

    sTimestamp = acTimestamp;

    cout <<"-th Modify time:"<< sTimestamp << endl;
}

测试文件:

#include "uInotify.h"
#include 
#include 
#include 

using namespace std;

EventHandle func(std::string, void *)
{
    string sTimestamp;
    char acTimestamp[256];

    struct timeval tv;
    struct tm *tm;

    gettimeofday(&tv, NULL);

    tm = localtime(&tv.tv_sec);

    sprintf(acTimestamp,"%04d-%02d-%02d %02d:%02d:%02d.%03d
",
            tm->tm_year + 1900,
            tm->tm_mon + 1,
            tm->tm_mday,
            tm->tm_hour,
            tm->tm_min,
            tm->tm_sec,
            (int) (tv.tv_usec / 1000)
        );

    sTimestamp = acTimestamp;

    cout << sTimestamp << endl;
}

int main()
{
    SCEvent uci;
    pthread_t pid;
    std::map funcMap;
   // funcMap.insert(std::make_pair("IN_MODIFY",func));
    std::vector wn;
    wn.push_back("/tmp/writer.txt");

    uci.InitWatchFile(wn,NULL,IN_MODIFY);
    uci.StartWatchThread(funcMap,pid);
    getchar();
    return 0;
}

这里需要使用前面文章中的生产者、消费者中的生产者程序配合测试(共享内存+inotify机制实现多进程低延迟数据共享_土豆西瓜大芝麻的博客-CSDN博客),它将数据写入mmap之后,会修改/tmp/writer.txt中的内容。我们封装的事件类监控该文件,当发生内容变更时,出发eventPrint操作。结果如下,延迟基本都小于0.1ms。

 

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/1037437.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号