1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_result_set_impl.h"
17 
18 #include <mutex>
19 
20 #include "kv_store_errno.h"
21 
22 namespace DistributedDB {
IsValid() const23 inline bool RelationalResultSetImpl::IsValid() const
24 {
25     return !isClosed_ && cacheDataSet_.empty();
26 }
27 
IsValid(int64_t index) const28 inline bool RelationalResultSetImpl::IsValid(int64_t index) const
29 {
30     return !isClosed_ && cacheDataSet_.empty() && index >= 0 && index < dataSet_.GetSize();
31 }
32 
GetCount() const33 int RelationalResultSetImpl::GetCount() const
34 {
35     std::shared_lock<std::shared_mutex> readLock(mutex_);
36     if (IsValid()) {
37         return dataSet_.GetSize();
38     }
39     return 0;
40 }
41 
GetPosition() const42 int RelationalResultSetImpl::GetPosition() const
43 {
44     std::shared_lock<std::shared_mutex> readLock(mutex_);
45     return index_;
46 }
47 
MoveToFirst()48 bool RelationalResultSetImpl::MoveToFirst()
49 {
50     return MoveToPosition(0);
51 }
52 
MoveToLast()53 bool RelationalResultSetImpl::MoveToLast()
54 {
55     return MoveToPosition(dataSet_.GetSize() - 1);
56 }
57 
MoveToNext()58 bool RelationalResultSetImpl::MoveToNext()
59 {
60     return Move(1);
61 }
62 
MoveToPrevious()63 bool RelationalResultSetImpl::MoveToPrevious()
64 {
65     return Move(-1);
66 }
67 
Move(int offset)68 bool RelationalResultSetImpl::Move(int offset)
69 {
70     return MoveToPosition(index_ + offset);
71 }
72 
MoveToPosition(int position)73 bool RelationalResultSetImpl::MoveToPosition(int position)
74 {
75     std::unique_lock<std::shared_mutex> writeLock(mutex_);
76     int64_t index = position;
77     if (IsValid(index)) {
78         index_ = index;
79         return true;
80     }
81     if (index < 0) {
82         index_ = -1;
83     } else if (index >= dataSet_.GetSize()) {
84         index_ = dataSet_.GetSize();
85     }
86     return false;
87 }
88 
IsFirst() const89 bool RelationalResultSetImpl::IsFirst() const
90 {
91     std::shared_lock<std::shared_mutex> readLock(mutex_);
92     return IsValid(index_) && index_ == 0;
93 }
94 
IsLast() const95 bool RelationalResultSetImpl::IsLast() const
96 {
97     std::shared_lock<std::shared_mutex> readLock(mutex_);
98     return IsValid(index_) && index_ == dataSet_.GetSize() - 1;
99 }
100 
IsBeforeFirst() const101 bool RelationalResultSetImpl::IsBeforeFirst() const
102 {
103     std::shared_lock<std::shared_mutex> readLock(mutex_);
104     return dataSet_.GetSize() == 0 || index_ <= -1;
105 }
106 
IsAfterLast() const107 bool RelationalResultSetImpl::IsAfterLast() const
108 {
109     std::shared_lock<std::shared_mutex> readLock(mutex_);
110     return dataSet_.GetSize() == 0 || index_ >= dataSet_.GetSize();
111 }
112 
IsClosed() const113 bool RelationalResultSetImpl::IsClosed() const
114 {
115     std::shared_lock<std::shared_mutex> readLock(mutex_);
116     return isClosed_;
117 }
118 
Close()119 void RelationalResultSetImpl::Close()
120 {
121     std::unique_lock<std::shared_mutex> writeLock(mutex_);
122     if (isClosed_) {
123         return;
124     }
125     isClosed_ = true;
126     index_ = -1;
127     cacheDataSet_.clear();
128     colNames_.clear();
129     dataSet_.Clear();
130 }
131 
GetEntry(Entry & entry) const132 DBStatus RelationalResultSetImpl::GetEntry(Entry &entry) const
133 {
134     return NOT_SUPPORT;
135 }
136 
GetColumnNames(std::vector<std::string> & columnNames) const137 void RelationalResultSetImpl::GetColumnNames(std::vector<std::string> &columnNames) const
138 {
139     std::shared_lock<std::shared_mutex> readLock(mutex_);
140     columnNames = dataSet_.GetColNames();
141 }
142 
143 namespace {
144 struct ColumnTypePair {
145     int index_;
146     ResultSet::ColumnType type_;
147 };
148 
GetColType(int index,const ColumnTypePair * colMap,int32_t len)149 ResultSet::ColumnType GetColType(int index, const ColumnTypePair *colMap, int32_t len)
150 {
151     int32_t head = 0;
152     int32_t end = len - 1;
153     while (head <= end) {
154         int32_t mid = (head + end) / 2;
155         if (colMap[mid].index_ < index) {
156             head = mid + 1;
157             continue;
158         }
159         if (colMap[mid].index_ > index) {
160             end = mid - 1;
161             continue;
162         }
163         return colMap[mid].type_;
164     }
165     return ResultSet::ColumnType::INVALID_TYPE;
166 }
167 }
168 
GetColumnType(int columnIndex,ColumnType & columnType) const169 DBStatus RelationalResultSetImpl::GetColumnType(int columnIndex, ColumnType &columnType) const
170 {
171     static constexpr ColumnTypePair mappingTbl[] = {
172         { static_cast<int>(StorageType::STORAGE_TYPE_NONE),    ColumnType::INVALID_TYPE },
173         { static_cast<int>(StorageType::STORAGE_TYPE_NULL),    ColumnType::NULL_VALUE },
174         { static_cast<int>(StorageType::STORAGE_TYPE_INTEGER), ColumnType::INT64 },
175         { static_cast<int>(StorageType::STORAGE_TYPE_REAL),    ColumnType::DOUBLE },
176         { static_cast<int>(StorageType::STORAGE_TYPE_TEXT),    ColumnType::STRING },
177         { static_cast<int>(StorageType::STORAGE_TYPE_BLOB),    ColumnType::BLOB },
178     };
179 
180     std::shared_lock<std::shared_mutex> readLock(mutex_);
181     const RelationalRowData *rowData = dataSet_.Get(index_);
182     if (rowData == nullptr) {
183         return NOT_FOUND;
184     }
185     auto type = StorageType::STORAGE_TYPE_NONE;
186     int errCode = rowData->GetType(columnIndex, type);
187     if (errCode == E_OK) {
188         columnType = GetColType(static_cast<int>(type), mappingTbl, sizeof(mappingTbl) / sizeof(ColumnTypePair));
189     }
190     return TransferDBErrno(errCode);
191 }
192 
GetColumnIndex(const std::string & columnName,int & columnIndex) const193 DBStatus RelationalResultSetImpl::GetColumnIndex(const std::string &columnName, int &columnIndex) const
194 {
195     if (colNames_.empty()) {
196         std::unique_lock<std::shared_mutex> writeLock(mutex_);
197         if (colNames_.empty()) {
198             for (size_t i = 0; i < dataSet_.GetColNames().size(); ++i) {
199                 colNames_[dataSet_.GetColNames().at(i)] = static_cast<int>(i);
200             }
201         }
202     }
203     std::shared_lock<std::shared_mutex> readLock(mutex_);
204     if (!IsValid(index_)) {
205         return NOT_FOUND;
206     }
207     auto iter = colNames_.find(columnName);
208     if (iter == colNames_.end()) {
209         return NONEXISTENT;
210     }
211     columnIndex = iter->second;
212     return OK;
213 }
214 
GetColumnName(int columnIndex,std::string & columnName) const215 DBStatus RelationalResultSetImpl::GetColumnName(int columnIndex, std::string &columnName) const
216 {
217     std::shared_lock<std::shared_mutex> readLock(mutex_);
218     if (!IsValid(index_)) {
219         return NOT_FOUND;
220     }
221     const auto &colNames = dataSet_.GetColNames();
222     if (columnIndex < 0 || columnIndex >= static_cast<int>(colNames.size())) {
223         return NONEXISTENT;
224     }
225     columnName = colNames.at(columnIndex);
226     return OK;
227 }
228 
Get(int columnIndex,std::vector<uint8_t> & value) const229 DBStatus RelationalResultSetImpl::Get(int columnIndex, std::vector<uint8_t> &value) const
230 {
231     std::shared_lock<std::shared_mutex> readLock(mutex_);
232     const RelationalRowData *rowData = dataSet_.Get(index_);
233     if (rowData == nullptr) {
234         return NOT_FOUND;
235     }
236     return TransferDBErrno(rowData->Get(columnIndex, value));
237 }
238 
Get(int columnIndex,std::string & value) const239 DBStatus RelationalResultSetImpl::Get(int columnIndex, std::string &value) const
240 {
241     std::shared_lock<std::shared_mutex> readLock(mutex_);
242     const RelationalRowData *rowData = dataSet_.Get(index_);
243     if (rowData == nullptr) {
244         return NOT_FOUND;
245     }
246     return TransferDBErrno(rowData->Get(columnIndex, value));
247 }
248 
Get(int columnIndex,int64_t & value) const249 DBStatus RelationalResultSetImpl::Get(int columnIndex, int64_t &value) const
250 {
251     std::shared_lock<std::shared_mutex> readLock(mutex_);
252     const RelationalRowData *rowData = dataSet_.Get(index_);
253     if (rowData == nullptr) {
254         return NOT_FOUND;
255     }
256     return TransferDBErrno(rowData->Get(columnIndex, value));
257 }
258 
Get(int columnIndex,double & value) const259 DBStatus RelationalResultSetImpl::Get(int columnIndex, double &value) const
260 {
261     std::shared_lock<std::shared_mutex> readLock(mutex_);
262     const RelationalRowData *rowData = dataSet_.Get(index_);
263     if (rowData == nullptr) {
264         return NOT_FOUND;
265     }
266     return TransferDBErrno(rowData->Get(columnIndex, value));
267 }
268 
IsColumnNull(int columnIndex,bool & isNull) const269 DBStatus RelationalResultSetImpl::IsColumnNull(int columnIndex, bool &isNull) const
270 {
271     std::shared_lock<std::shared_mutex> readLock(mutex_);
272     const RelationalRowData *rowData = dataSet_.Get(index_);
273     if (rowData == nullptr) {
274         return NOT_FOUND;
275     }
276     auto type = StorageType::STORAGE_TYPE_NONE;
277     int errCode = rowData->GetType(columnIndex, type);
278     if (errCode == E_OK) {
279         isNull = type == StorageType::STORAGE_TYPE_NULL;
280     }
281     return TransferDBErrno(errCode);
282 }
283 
284 // This func is not API. Impossible concurrency. There is no need to hold mutex. columnIndex must be valid
GetData(int64_t columnIndex) const285 VariantData RelationalResultSetImpl::GetData(int64_t columnIndex) const
286 {
287     auto columnType = ColumnType::INVALID_TYPE;
288     (void)GetColumnType(columnIndex, columnType);
289     switch (columnType) {
290         case INT64: {
291             int64_t value = 0;
292             (void)Get(columnIndex, value);
293             return value;
294         }
295         case DOUBLE: {
296             double value = 0;
297             (void)Get(columnIndex, value);
298             return value;
299         }
300         case STRING: {
301             std::string value;
302             (void)Get(columnIndex, value);
303             return value;
304         }
305         case BLOB: {
306             std::vector<uint8_t> value;
307             (void)Get(columnIndex, value);
308             return value;
309         }
310         case NULL_VALUE:
311         default:
312             return VariantData();
313     }
314 }
315 
GetRow(std::map<std::string,VariantData> & data) const316 DBStatus RelationalResultSetImpl::GetRow(std::map<std::string, VariantData> &data) const
317 {
318     data.clear();
319     std::shared_lock<std::shared_mutex> readLock(mutex_);
320     if (!IsValid(index_)) {
321         return NOT_FOUND;
322     }
323 
324     for (int columnIndex = 0; columnIndex < static_cast<int>(dataSet_.GetColNames().size()); ++columnIndex) {
325         data[dataSet_.GetColNames().at(columnIndex)] = GetData(columnIndex);
326     }
327     return OK;
328 }
329 
330 // This func is not API. Impossible concurrency. There is no need to hold mutex.
Put(const DeviceID & deviceName,uint32_t sequenceId,RelationalRowDataSet && data)331 int RelationalResultSetImpl::Put(const DeviceID &deviceName, uint32_t sequenceId, RelationalRowDataSet &&data)
332 {
333     if (sequenceId == 0) {
334         LOGE("[RelationalResultSetImpl] Invalid sequenceId");
335         return -E_INVALID_ARGS;
336     }
337     cacheDataSet_[sequenceId - 1] = std::move(data);
338     for (auto iter = cacheDataSet_.begin(); iter != cacheDataSet_.end();) {
339         if (iter->first != static_cast<uint32_t>(dataSetSize_)) {
340             break;
341         }
342         int errCode = dataSet_.Merge(std::move(iter->second));  // pay attention, this is rvalue.
343         if (errCode != E_OK) {
344             return errCode;
345         }
346         iter = cacheDataSet_.erase(iter);
347         dataSetSize_++;
348     }
349     return E_OK;
350 }
351 }
352 #endif