1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "server_executor/include/future_factory.h"
17
18 #include "platform/time/include/time.h"
19 #include "protocol/data_channel/include/i_request.h"
20 #include "protocol/data_channel/include/i_response.h"
21 #include "protocol/retcode_inner/aie_retcode_inner.h"
22 #include "server_executor/include/future.h"
23 #include "server_executor/include/i_future_listener.h"
24 #include "utils/log/aie_log.h"
25
26 namespace OHOS {
27 namespace AI {
28 namespace {
29 const int MAX_NUM_FUTURES = 1024;
30 const int INVALID_SEQUENCE_ID = -1;
31 const int MIN_SEQUENCE_ID = 1;
32 }
33
34 std::mutex FutureFactory::mutex_;
35 FutureFactory *FutureFactory::instance_ = nullptr;
36
GetInstance()37 FutureFactory *FutureFactory::GetInstance()
38 {
39 CHK_RET(instance_ != nullptr, instance_);
40
41 std::lock_guard<std::mutex> lock(mutex_);
42 CHK_RET(instance_ != nullptr, instance_);
43
44 AIE_NEW(instance_, FutureFactory);
45
46 return instance_;
47 }
48
ReleaseInstance()49 void FutureFactory::ReleaseInstance()
50 {
51 std::lock_guard<std::mutex> lock(mutex_);
52 AIE_DELETE(instance_);
53 }
54
FutureFactory()55 FutureFactory::FutureFactory() : sequenceId_(0)
56 {
57 }
58
~FutureFactory()59 FutureFactory::~FutureFactory()
60 {
61 HILOGI("[FutureFactory]Begin to release FutureFactory.");
62 for (auto &iter : listeners_) {
63 AIE_DELETE(iter.second);
64 }
65 listeners_.clear();
66 for (auto &iter : futures_) {
67 AIE_DELETE(iter.second);
68 }
69 futures_.clear();
70 }
71
FindSequenceId()72 long long FutureFactory::FindSequenceId()
73 {
74 std::lock_guard<std::mutex> lock(innerMutex_);
75
76 if (futures_.size() > MAX_NUM_FUTURES) {
77 HILOGE("[FutureFactory]Num of valid futures reaches max.");
78 return INVALID_SEQUENCE_ID;
79 }
80
81 do {
82 ++sequenceId_;
83 if (sequenceId_ < MIN_SEQUENCE_ID) {
84 HILOGI("[FutureFactory]The sequenceId_ is smaller than MIN_SEQUENCE_ID.");
85 sequenceId_ = MIN_SEQUENCE_ID;
86 }
87 } while (futures_.find(sequenceId_) != futures_.end());
88
89 return sequenceId_;
90 }
91
CreateFuture(IRequest * request)92 int FutureFactory::CreateFuture(IRequest *request)
93 {
94 long long sequenceId = FindSequenceId();
95 if (sequenceId == INVALID_SEQUENCE_ID) {
96 HILOGE("[FutureFactory]Invalid sequence id generated.");
97 return RETCODE_NULL_PARAM;
98 }
99
100 if (request == nullptr) {
101 HILOGE("[FutureFactory]Param request is nullptr.");
102 return RETCODE_NULL_PARAM;
103 }
104 Request *req = reinterpret_cast<Request*>(request);
105 req->SetInnerSequenceId(sequenceId);
106
107 Future *future = nullptr;
108 AIE_NEW(future, Future(request, sequenceId, request->GetTransactionId()));
109 CHK_RET(future == nullptr, RETCODE_OUT_OF_MEMORY);
110
111 AddFuture(sequenceId, future);
112 return RETCODE_SUCCESS;
113 }
114
AddFuture(long long sequenceId,Future * future)115 void FutureFactory::AddFuture(long long sequenceId, Future* future)
116 {
117 std::lock_guard<std::mutex> lock(innerMutex_);
118
119 futures_[sequenceId] = future;
120 }
121
Release(long long sequenceId)122 void FutureFactory::Release(long long sequenceId)
123 {
124 DeleteFuture(sequenceId);
125 }
126
DeleteFuture(long long sequenceId)127 void FutureFactory::DeleteFuture(long long sequenceId)
128 {
129 std::lock_guard<std::mutex> lock(innerMutex_);
130 auto iter = futures_.find(sequenceId);
131 if (iter != futures_.end()) {
132 delete iter->second;
133 iter->second = nullptr;
134 futures_.erase(sequenceId);
135 }
136 }
137
RegisterListener(IFutureListener * listener,long long transactionId)138 void FutureFactory::RegisterListener(IFutureListener *listener, long long transactionId)
139 {
140 std::lock_guard<std::mutex> lock(innerMutex_);
141 listeners_[transactionId] = listener;
142 }
143
UnregisterListener(long long transactionId)144 void FutureFactory::UnregisterListener(long long transactionId)
145 {
146 std::lock_guard<std::mutex> lock(innerMutex_);
147 auto iter = listeners_.find(transactionId);
148 if (iter != listeners_.end()) {
149 delete iter->second;
150 iter->second = nullptr;
151 listeners_.erase(transactionId);
152 }
153 }
154
ProcessResponse(PluginEvent event,IResponse * response)155 int FutureFactory::ProcessResponse(PluginEvent event, IResponse *response)
156 {
157 CHK_RET(response == nullptr, RETCODE_NULL_PARAM);
158 HILOGI("[FutureFactory]Begin to Process Response.");
159 Response *res = reinterpret_cast<Response *>(response);
160 Future *future = FetchFuture(res);
161 if (!future) {
162 HILOGE("[FutureFactory][transactionId:%lld]No matched future found, seqId=%lld.",
163 res->GetTransactionId(), res->GetInnerSequenceId());
164 return RETCODE_NULL_PARAM;
165 }
166
167 FutureStatus status = Future::ConvertPluginStatus(event);
168 future->SetResponse(status, response);
169
170 IFutureListener *listener = FindListener(response->GetTransactionId());
171 if (listener == nullptr) {
172 HILOGE("[FutureFactory][transactionId:%lld]No matched listener found.", response->GetTransactionId());
173 return RETCODE_NO_LISTENER_FOUND;
174 }
175
176 listener->OnReply(future);
177 future->DetachResponse();
178 DeleteFuture(future->GetSequenceId());
179
180 return RETCODE_SUCCESS;
181 }
182
FetchFuture(Response * response)183 Future *FutureFactory::FetchFuture(Response *response)
184 {
185 long long sequenceId = response->GetInnerSequenceId();
186 std::lock_guard<std::mutex> lock(innerMutex_);
187
188 auto findProc = futures_.find(sequenceId);
189 CHK_RET(findProc == futures_.end(), nullptr);
190
191 return findProc->second;
192 }
193
FindListener(long long transactionId)194 IFutureListener *FutureFactory::FindListener(long long transactionId)
195 {
196 std::lock_guard<std::mutex> lock(innerMutex_);
197
198 auto findProc = listeners_.find(transactionId);
199 CHK_RET(findProc == listeners_.end(), nullptr);
200
201 return findProc->second;
202 }
203 } // namespace AI
204 } // namespace OHOS