1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "server_executor/include/future_factory.h"
17 
18 #include "platform/time/include/time.h"
19 #include "protocol/data_channel/include/i_request.h"
20 #include "protocol/data_channel/include/i_response.h"
21 #include "protocol/retcode_inner/aie_retcode_inner.h"
22 #include "server_executor/include/future.h"
23 #include "server_executor/include/i_future_listener.h"
24 #include "utils/log/aie_log.h"
25 
26 namespace OHOS {
27 namespace AI {
28 namespace {
29 const int MAX_NUM_FUTURES = 1024;
30 const int INVALID_SEQUENCE_ID = -1;
31 const int MIN_SEQUENCE_ID = 1;
32 }
33 
34 std::mutex FutureFactory::mutex_;
35 FutureFactory *FutureFactory::instance_ = nullptr;
36 
GetInstance()37 FutureFactory *FutureFactory::GetInstance()
38 {
39     CHK_RET(instance_ != nullptr, instance_);
40 
41     std::lock_guard<std::mutex> lock(mutex_);
42     CHK_RET(instance_ != nullptr, instance_);
43 
44     AIE_NEW(instance_, FutureFactory);
45 
46     return instance_;
47 }
48 
ReleaseInstance()49 void FutureFactory::ReleaseInstance()
50 {
51     std::lock_guard<std::mutex> lock(mutex_);
52     AIE_DELETE(instance_);
53 }
54 
FutureFactory()55 FutureFactory::FutureFactory() : sequenceId_(0)
56 {
57 }
58 
~FutureFactory()59 FutureFactory::~FutureFactory()
60 {
61     HILOGI("[FutureFactory]Begin to release FutureFactory.");
62     for (auto &iter : listeners_) {
63         AIE_DELETE(iter.second);
64     }
65     listeners_.clear();
66     for (auto &iter : futures_) {
67         AIE_DELETE(iter.second);
68     }
69     futures_.clear();
70 }
71 
FindSequenceId()72 long long FutureFactory::FindSequenceId()
73 {
74     std::lock_guard<std::mutex> lock(innerMutex_);
75 
76     if (futures_.size() > MAX_NUM_FUTURES) {
77         HILOGE("[FutureFactory]Num of valid futures reaches max.");
78         return INVALID_SEQUENCE_ID;
79     }
80 
81     do {
82         ++sequenceId_;
83         if (sequenceId_ < MIN_SEQUENCE_ID) {
84             HILOGI("[FutureFactory]The sequenceId_ is smaller than MIN_SEQUENCE_ID.");
85             sequenceId_ = MIN_SEQUENCE_ID;
86         }
87     } while (futures_.find(sequenceId_) != futures_.end());
88 
89     return sequenceId_;
90 }
91 
CreateFuture(IRequest * request)92 int FutureFactory::CreateFuture(IRequest *request)
93 {
94     long long sequenceId = FindSequenceId();
95     if (sequenceId == INVALID_SEQUENCE_ID) {
96         HILOGE("[FutureFactory]Invalid sequence id generated.");
97         return RETCODE_NULL_PARAM;
98     }
99 
100     if (request == nullptr) {
101         HILOGE("[FutureFactory]Param request is nullptr.");
102         return RETCODE_NULL_PARAM;
103     }
104     Request *req = reinterpret_cast<Request*>(request);
105     req->SetInnerSequenceId(sequenceId);
106 
107     Future *future = nullptr;
108     AIE_NEW(future, Future(request, sequenceId, request->GetTransactionId()));
109     CHK_RET(future == nullptr, RETCODE_OUT_OF_MEMORY);
110 
111     AddFuture(sequenceId, future);
112     return RETCODE_SUCCESS;
113 }
114 
AddFuture(long long sequenceId,Future * future)115 void FutureFactory::AddFuture(long long sequenceId, Future* future)
116 {
117     std::lock_guard<std::mutex> lock(innerMutex_);
118 
119     futures_[sequenceId] = future;
120 }
121 
Release(long long sequenceId)122 void FutureFactory::Release(long long sequenceId)
123 {
124     DeleteFuture(sequenceId);
125 }
126 
DeleteFuture(long long sequenceId)127 void FutureFactory::DeleteFuture(long long sequenceId)
128 {
129     std::lock_guard<std::mutex> lock(innerMutex_);
130     auto iter = futures_.find(sequenceId);
131     if (iter != futures_.end()) {
132         delete iter->second;
133         iter->second = nullptr;
134         futures_.erase(sequenceId);
135     }
136 }
137 
RegisterListener(IFutureListener * listener,long long transactionId)138 void FutureFactory::RegisterListener(IFutureListener *listener, long long transactionId)
139 {
140     std::lock_guard<std::mutex> lock(innerMutex_);
141     listeners_[transactionId] = listener;
142 }
143 
UnregisterListener(long long transactionId)144 void FutureFactory::UnregisterListener(long long transactionId)
145 {
146     std::lock_guard<std::mutex> lock(innerMutex_);
147     auto iter = listeners_.find(transactionId);
148     if (iter != listeners_.end()) {
149         delete iter->second;
150         iter->second = nullptr;
151         listeners_.erase(transactionId);
152     }
153 }
154 
ProcessResponse(PluginEvent event,IResponse * response)155 int FutureFactory::ProcessResponse(PluginEvent event, IResponse *response)
156 {
157     CHK_RET(response == nullptr, RETCODE_NULL_PARAM);
158     HILOGI("[FutureFactory]Begin to Process Response.");
159     Response *res = reinterpret_cast<Response *>(response);
160     Future *future = FetchFuture(res);
161     if (!future) {
162         HILOGE("[FutureFactory][transactionId:%lld]No matched future found, seqId=%lld.",
163             res->GetTransactionId(), res->GetInnerSequenceId());
164         return RETCODE_NULL_PARAM;
165     }
166 
167     FutureStatus status = Future::ConvertPluginStatus(event);
168     future->SetResponse(status, response);
169 
170     IFutureListener *listener = FindListener(response->GetTransactionId());
171     if (listener == nullptr) {
172         HILOGE("[FutureFactory][transactionId:%lld]No matched listener found.", response->GetTransactionId());
173         return RETCODE_NO_LISTENER_FOUND;
174     }
175 
176     listener->OnReply(future);
177     future->DetachResponse();
178     DeleteFuture(future->GetSequenceId());
179 
180     return RETCODE_SUCCESS;
181 }
182 
FetchFuture(Response * response)183 Future *FutureFactory::FetchFuture(Response *response)
184 {
185     long long sequenceId = response->GetInnerSequenceId();
186     std::lock_guard<std::mutex> lock(innerMutex_);
187 
188     auto findProc = futures_.find(sequenceId);
189     CHK_RET(findProc == futures_.end(), nullptr);
190 
191     return findProc->second;
192 }
193 
FindListener(long long transactionId)194 IFutureListener *FutureFactory::FindListener(long long transactionId)
195 {
196     std::lock_guard<std::mutex> lock(innerMutex_);
197 
198     auto findProc = listeners_.find(transactionId);
199     CHK_RET(findProc == listeners_.end(), nullptr);
200 
201     return findProc->second;
202 }
203 } // namespace AI
204 } // namespace OHOS