1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_result_set_impl.h"
17
18 #include <mutex>
19
20 #include "kv_store_errno.h"
21
22 namespace DistributedDB {
IsValid() const23 inline bool RelationalResultSetImpl::IsValid() const
24 {
25 return !isClosed_ && cacheDataSet_.empty();
26 }
27
IsValid(int64_t index) const28 inline bool RelationalResultSetImpl::IsValid(int64_t index) const
29 {
30 return !isClosed_ && cacheDataSet_.empty() && index >= 0 && index < dataSet_.GetSize();
31 }
32
GetCount() const33 int RelationalResultSetImpl::GetCount() const
34 {
35 std::shared_lock<std::shared_mutex> readLock(mutex_);
36 if (IsValid()) {
37 return dataSet_.GetSize();
38 }
39 return 0;
40 }
41
GetPosition() const42 int RelationalResultSetImpl::GetPosition() const
43 {
44 std::shared_lock<std::shared_mutex> readLock(mutex_);
45 return index_;
46 }
47
MoveToFirst()48 bool RelationalResultSetImpl::MoveToFirst()
49 {
50 return MoveToPosition(0);
51 }
52
MoveToLast()53 bool RelationalResultSetImpl::MoveToLast()
54 {
55 return MoveToPosition(dataSet_.GetSize() - 1);
56 }
57
MoveToNext()58 bool RelationalResultSetImpl::MoveToNext()
59 {
60 return Move(1);
61 }
62
MoveToPrevious()63 bool RelationalResultSetImpl::MoveToPrevious()
64 {
65 return Move(-1);
66 }
67
Move(int offset)68 bool RelationalResultSetImpl::Move(int offset)
69 {
70 return MoveToPosition(index_ + offset);
71 }
72
MoveToPosition(int position)73 bool RelationalResultSetImpl::MoveToPosition(int position)
74 {
75 std::unique_lock<std::shared_mutex> writeLock(mutex_);
76 int64_t index = position;
77 if (IsValid(index)) {
78 index_ = index;
79 return true;
80 }
81 if (index < 0) {
82 index_ = -1;
83 } else if (index >= dataSet_.GetSize()) {
84 index_ = dataSet_.GetSize();
85 }
86 return false;
87 }
88
IsFirst() const89 bool RelationalResultSetImpl::IsFirst() const
90 {
91 std::shared_lock<std::shared_mutex> readLock(mutex_);
92 return IsValid(index_) && index_ == 0;
93 }
94
IsLast() const95 bool RelationalResultSetImpl::IsLast() const
96 {
97 std::shared_lock<std::shared_mutex> readLock(mutex_);
98 return IsValid(index_) && index_ == dataSet_.GetSize() - 1;
99 }
100
IsBeforeFirst() const101 bool RelationalResultSetImpl::IsBeforeFirst() const
102 {
103 std::shared_lock<std::shared_mutex> readLock(mutex_);
104 return dataSet_.GetSize() == 0 || index_ <= -1;
105 }
106
IsAfterLast() const107 bool RelationalResultSetImpl::IsAfterLast() const
108 {
109 std::shared_lock<std::shared_mutex> readLock(mutex_);
110 return dataSet_.GetSize() == 0 || index_ >= dataSet_.GetSize();
111 }
112
IsClosed() const113 bool RelationalResultSetImpl::IsClosed() const
114 {
115 std::shared_lock<std::shared_mutex> readLock(mutex_);
116 return isClosed_;
117 }
118
Close()119 void RelationalResultSetImpl::Close()
120 {
121 std::unique_lock<std::shared_mutex> writeLock(mutex_);
122 if (isClosed_) {
123 return;
124 }
125 isClosed_ = true;
126 index_ = -1;
127 cacheDataSet_.clear();
128 colNames_.clear();
129 dataSet_.Clear();
130 }
131
GetEntry(Entry & entry) const132 DBStatus RelationalResultSetImpl::GetEntry(Entry &entry) const
133 {
134 return NOT_SUPPORT;
135 }
136
GetColumnNames(std::vector<std::string> & columnNames) const137 void RelationalResultSetImpl::GetColumnNames(std::vector<std::string> &columnNames) const
138 {
139 std::shared_lock<std::shared_mutex> readLock(mutex_);
140 columnNames = dataSet_.GetColNames();
141 }
142
143 namespace {
144 struct ColumnTypePair {
145 int index_;
146 ResultSet::ColumnType type_;
147 };
148
GetColType(int index,const ColumnTypePair * colMap,int32_t len)149 ResultSet::ColumnType GetColType(int index, const ColumnTypePair *colMap, int32_t len)
150 {
151 int32_t head = 0;
152 int32_t end = len - 1;
153 while (head <= end) {
154 int32_t mid = (head + end) / 2;
155 if (colMap[mid].index_ < index) {
156 head = mid + 1;
157 continue;
158 }
159 if (colMap[mid].index_ > index) {
160 end = mid - 1;
161 continue;
162 }
163 return colMap[mid].type_;
164 }
165 return ResultSet::ColumnType::INVALID_TYPE;
166 }
167 }
168
GetColumnType(int columnIndex,ColumnType & columnType) const169 DBStatus RelationalResultSetImpl::GetColumnType(int columnIndex, ColumnType &columnType) const
170 {
171 static constexpr ColumnTypePair mappingTbl[] = {
172 { static_cast<int>(StorageType::STORAGE_TYPE_NONE), ColumnType::INVALID_TYPE },
173 { static_cast<int>(StorageType::STORAGE_TYPE_NULL), ColumnType::NULL_VALUE },
174 { static_cast<int>(StorageType::STORAGE_TYPE_INTEGER), ColumnType::INT64 },
175 { static_cast<int>(StorageType::STORAGE_TYPE_REAL), ColumnType::DOUBLE },
176 { static_cast<int>(StorageType::STORAGE_TYPE_TEXT), ColumnType::STRING },
177 { static_cast<int>(StorageType::STORAGE_TYPE_BLOB), ColumnType::BLOB },
178 };
179
180 std::shared_lock<std::shared_mutex> readLock(mutex_);
181 const RelationalRowData *rowData = dataSet_.Get(index_);
182 if (rowData == nullptr) {
183 return NOT_FOUND;
184 }
185 auto type = StorageType::STORAGE_TYPE_NONE;
186 int errCode = rowData->GetType(columnIndex, type);
187 if (errCode == E_OK) {
188 columnType = GetColType(static_cast<int>(type), mappingTbl, sizeof(mappingTbl) / sizeof(ColumnTypePair));
189 }
190 return TransferDBErrno(errCode);
191 }
192
GetColumnIndex(const std::string & columnName,int & columnIndex) const193 DBStatus RelationalResultSetImpl::GetColumnIndex(const std::string &columnName, int &columnIndex) const
194 {
195 if (colNames_.empty()) {
196 std::unique_lock<std::shared_mutex> writeLock(mutex_);
197 if (colNames_.empty()) {
198 for (size_t i = 0; i < dataSet_.GetColNames().size(); ++i) {
199 colNames_[dataSet_.GetColNames().at(i)] = static_cast<int>(i);
200 }
201 }
202 }
203 std::shared_lock<std::shared_mutex> readLock(mutex_);
204 if (!IsValid(index_)) {
205 return NOT_FOUND;
206 }
207 auto iter = colNames_.find(columnName);
208 if (iter == colNames_.end()) {
209 return NONEXISTENT;
210 }
211 columnIndex = iter->second;
212 return OK;
213 }
214
GetColumnName(int columnIndex,std::string & columnName) const215 DBStatus RelationalResultSetImpl::GetColumnName(int columnIndex, std::string &columnName) const
216 {
217 std::shared_lock<std::shared_mutex> readLock(mutex_);
218 if (!IsValid(index_)) {
219 return NOT_FOUND;
220 }
221 const auto &colNames = dataSet_.GetColNames();
222 if (columnIndex < 0 || columnIndex >= static_cast<int>(colNames.size())) {
223 return NONEXISTENT;
224 }
225 columnName = colNames.at(columnIndex);
226 return OK;
227 }
228
Get(int columnIndex,std::vector<uint8_t> & value) const229 DBStatus RelationalResultSetImpl::Get(int columnIndex, std::vector<uint8_t> &value) const
230 {
231 std::shared_lock<std::shared_mutex> readLock(mutex_);
232 const RelationalRowData *rowData = dataSet_.Get(index_);
233 if (rowData == nullptr) {
234 return NOT_FOUND;
235 }
236 return TransferDBErrno(rowData->Get(columnIndex, value));
237 }
238
Get(int columnIndex,std::string & value) const239 DBStatus RelationalResultSetImpl::Get(int columnIndex, std::string &value) const
240 {
241 std::shared_lock<std::shared_mutex> readLock(mutex_);
242 const RelationalRowData *rowData = dataSet_.Get(index_);
243 if (rowData == nullptr) {
244 return NOT_FOUND;
245 }
246 return TransferDBErrno(rowData->Get(columnIndex, value));
247 }
248
Get(int columnIndex,int64_t & value) const249 DBStatus RelationalResultSetImpl::Get(int columnIndex, int64_t &value) const
250 {
251 std::shared_lock<std::shared_mutex> readLock(mutex_);
252 const RelationalRowData *rowData = dataSet_.Get(index_);
253 if (rowData == nullptr) {
254 return NOT_FOUND;
255 }
256 return TransferDBErrno(rowData->Get(columnIndex, value));
257 }
258
Get(int columnIndex,double & value) const259 DBStatus RelationalResultSetImpl::Get(int columnIndex, double &value) const
260 {
261 std::shared_lock<std::shared_mutex> readLock(mutex_);
262 const RelationalRowData *rowData = dataSet_.Get(index_);
263 if (rowData == nullptr) {
264 return NOT_FOUND;
265 }
266 return TransferDBErrno(rowData->Get(columnIndex, value));
267 }
268
IsColumnNull(int columnIndex,bool & isNull) const269 DBStatus RelationalResultSetImpl::IsColumnNull(int columnIndex, bool &isNull) const
270 {
271 std::shared_lock<std::shared_mutex> readLock(mutex_);
272 const RelationalRowData *rowData = dataSet_.Get(index_);
273 if (rowData == nullptr) {
274 return NOT_FOUND;
275 }
276 auto type = StorageType::STORAGE_TYPE_NONE;
277 int errCode = rowData->GetType(columnIndex, type);
278 if (errCode == E_OK) {
279 isNull = type == StorageType::STORAGE_TYPE_NULL;
280 }
281 return TransferDBErrno(errCode);
282 }
283
284 // This func is not API. Impossible concurrency. There is no need to hold mutex. columnIndex must be valid
GetData(int64_t columnIndex) const285 VariantData RelationalResultSetImpl::GetData(int64_t columnIndex) const
286 {
287 auto columnType = ColumnType::INVALID_TYPE;
288 (void)GetColumnType(columnIndex, columnType);
289 switch (columnType) {
290 case INT64: {
291 int64_t value = 0;
292 (void)Get(columnIndex, value);
293 return value;
294 }
295 case DOUBLE: {
296 double value = 0;
297 (void)Get(columnIndex, value);
298 return value;
299 }
300 case STRING: {
301 std::string value;
302 (void)Get(columnIndex, value);
303 return value;
304 }
305 case BLOB: {
306 std::vector<uint8_t> value;
307 (void)Get(columnIndex, value);
308 return value;
309 }
310 case NULL_VALUE:
311 default:
312 return VariantData();
313 }
314 }
315
GetRow(std::map<std::string,VariantData> & data) const316 DBStatus RelationalResultSetImpl::GetRow(std::map<std::string, VariantData> &data) const
317 {
318 data.clear();
319 std::shared_lock<std::shared_mutex> readLock(mutex_);
320 if (!IsValid(index_)) {
321 return NOT_FOUND;
322 }
323
324 for (int columnIndex = 0; columnIndex < static_cast<int>(dataSet_.GetColNames().size()); ++columnIndex) {
325 data[dataSet_.GetColNames().at(columnIndex)] = GetData(columnIndex);
326 }
327 return OK;
328 }
329
330 // This func is not API. Impossible concurrency. There is no need to hold mutex.
Put(const DeviceID & deviceName,uint32_t sequenceId,RelationalRowDataSet && data)331 int RelationalResultSetImpl::Put(const DeviceID &deviceName, uint32_t sequenceId, RelationalRowDataSet &&data)
332 {
333 if (sequenceId == 0) {
334 LOGE("[RelationalResultSetImpl] Invalid sequenceId");
335 return -E_INVALID_ARGS;
336 }
337 cacheDataSet_[sequenceId - 1] = std::move(data);
338 for (auto iter = cacheDataSet_.begin(); iter != cacheDataSet_.end();) {
339 if (iter->first != static_cast<uint32_t>(dataSetSize_)) {
340 break;
341 }
342 int errCode = dataSet_.Merge(std::move(iter->second)); // pay attention, this is rvalue.
343 if (errCode != E_OK) {
344 return errCode;
345 }
346 iter = cacheDataSet_.erase(iter);
347 dataSetSize_++;
348 }
349 return E_OK;
350 }
351 }
352 #endif