1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef RS_CORE_PIPELINE_RCD_MESSAGE_BUS_H
17 #define RS_CORE_PIPELINE_RCD_MESSAGE_BUS_H
18 
19 #include <iostream>
20 #include <map>
21 #include <functional>
22 #include <mutex>
23 #include <thread>
24 #include "rs_any.h"
25 #include "rs_notCopyable.h"
26 
27 // @note declaration
28 template<typename... Args>
29 struct Impl;
30 
31 // @note recursion end
32 template<typename T>
33 struct Impl<T> {
34     static std::string name()
35     {
36         return std::to_string(sizeof(T));
37     }
38 };
39 
40 // @note recursion end
41 template<>
42 struct Impl<> {
43     static std::string name()
44     {
45         return "";
46     }
47 };
48 
49 // @note recursion
50 template<typename T, typename... Args>
51 struct Impl<T, Args...> {
52     static std::string name()
53     {
54         return std::to_string(sizeof(T)) + "," + Impl<Args...>::name();
55     }
56 };
57 
58 // @note recursion start
59 template<typename... Args>
60 std::string GetTypeName()
61 {
62     return Impl<Args...>::name();
63 }
64 
65 class RsMessageBus : RsNotCopyable {
66 public:
67     // @brief register message processing functions based on topic and message type
68     // @tparam ...Args  input parameters of funtion TMember
69     // @tparam TObject  object type
70     // @tparam TMember  function member type of object
71     // @param strTopic  message topic
72     // @param Object  object
73     // @param Member  function member of the object
74     template<typename... Args, typename TObject, typename TMember>
75     void RegisterTopic(std::string strTopic, TObject *Object, TMember Member)
76     {
77         std::lock_guard<std::mutex> lock(m_map_mutex);
78         if (Object == nullptr) {
79             return;
80         }
81         std::function<void(Args...)> f = std::function<void(Args...)>([=](Args... arg) {
82             (Object->*Member)(arg...);
83         });
84         m_map.emplace(GetKey<Args...>(strTopic), f);
85     }
86     // @brief Send the message to the bus, After receiving the message,
87     // the messagebus will find and call the corresponding message processing function.
88     // @tparam ...Args  parameter's type
89     // @param strTopic  message topic
90     // @param ...args  parameters
91     template<typename... Args>
92     void SendMsg(std::string strTopic, Args... args)
93     {
94         std::lock_guard<std::mutex> lock(m_map_mutex);
95         auto range = m_map.equal_range(GetKey<Args...>(strTopic));
96         std::multimap<std::string, RsAny>::iterator it;
97         for (it = range.first; it != range.second; it++) {
98             it->second.Send<Args...>(args...);
99         }
100     }
101     // @brief  remove the topic
102     // @tparam ...Args  Parameter's type
103     // @param strTopic  message topic
104     template<typename... Args>
105     void RemoveTopic(std::string strTopic)
106     {
107         std::lock_guard<std::mutex> lock(m_map_mutex);
108         auto it = m_map.find(GetKey<Args...>(strTopic));
109         while (it != m_map.end()) {
110             m_map.erase(it++);
111         }
112     }
113 private:
114     // Calculate key values based on the Topic
115     template<typename... Args>
116     std::string GetKey(std::string &strTopic)
117     {
118         return strTopic + GetTypeName<Args...>();
119     }
120     std::multimap<std::string, RsAny> m_map;
121     std::mutex m_map_mutex;
122 };
123 #endif