1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "runtime_config.h"
28 #include "sqlite_relational_store.h"
29 #include "sqlite_relational_utils.h"
30 #include "time_helper.h"
31 #include "virtual_asset_loader.h"
32 #include "virtual_cloud_data_translate.h"
33 #include "virtual_cloud_db.h"
34 #include "virtual_communicator_aggregator.h"
35 #include <gtest/gtest.h>
36 #include <iostream>
37
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 using namespace std;
42
43 namespace {
44 const string STORE_ID = "Relational_Store_SYNC";
45 const string DB_SUFFIX = ".db";
46 const string ASSETS_TABLE_NAME = "student";
47 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
48 const string NO_PRIMARY_TABLE = "teacher";
49 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
50 const string COMPOUND_PRIMARY_TABLE = "worker1";
51 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
52 const string DEVICE_CLOUD = "cloud_dev";
53 const string COL_ID = "id";
54 const string COL_NAME = "name";
55 const string COL_HEIGHT = "height";
56 const string COL_ASSET = "asset";
57 const string COL_ASSETS = "assets";
58 const string COL_AGE = "age";
59 const int64_t SYNC_WAIT_TIME = 600;
60 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
61 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
62 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
63 {COL_AGE, TYPE_INDEX<int64_t>}};
64 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
65 {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
66 {COL_AGE, TYPE_INDEX<int64_t>}};
67 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
68 {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
69 {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
70 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
71 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
72 COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
73 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
74 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
75 " ASSETS," + COL_AGE + " INT);";
76 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
77 " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
78 COL_AGE + " INT, PRIMARY KEY (id, age));";
79 const Asset ASSET_COPY = {.version = 1,
80 .name = "Phone",
81 .assetId = "0",
82 .subpath = "/local/sync",
83 .uri = "/local/sync",
84 .modifyTime = "123456",
85 .createTime = "",
86 .size = "256",
87 .hash = "ASE"};
88 const Asset ASSET_COPY2 = {.version = 1,
89 .name = "Phone_copy_2",
90 .assetId = "0",
91 .subpath = "/local/sync",
92 .uri = "/local/sync",
93 .modifyTime = "123456",
94 .createTime = "",
95 .size = "256",
96 .hash = "ASE"};
97 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
98 const std::string QUERY_CONSISTENT_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
99 " where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
101 " where flag&0x10!=0;";
102
103 string g_storePath;
104 string g_testDir;
105 RelationalStoreObserverUnitTest *g_observer = nullptr;
106 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
107 RelationalStoreDelegate *g_delegate = nullptr;
108 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
109 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
110 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
111 SyncProcess g_syncProcess;
112 std::condition_variable g_processCondition;
113 std::mutex g_processMutex;
114 IRelationalStore *g_store = nullptr;
115 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
116 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
117
InitDatabase(sqlite3 * & db)118 void InitDatabase(sqlite3 *&db)
119 {
120 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
121 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
122 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
123 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
124 }
125
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)126 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
127 {
128 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
129 .fields = CLOUD_FIELDS};
130 dataBaseSchema.tables.push_back(assetsTableSchema);
131 assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
132 .fields = NO_PRIMARY_FIELDS};
133 dataBaseSchema.tables.push_back(assetsTableSchema);
134 assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
135 .fields = COMPOUND_PRIMARY_FIELDS};
136 dataBaseSchema.tables.push_back(assetsTableSchema);
137 }
138
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)139 void GenerateDataRecords(
140 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
141 {
142 for (int64_t i = begin; i < begin + count; i++) {
143 Assets assets;
144 Asset asset = ASSET_COPY;
145 asset.name = ASSET_COPY.name + std::to_string(i);
146 assets.emplace_back(asset);
147 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
148 assets.emplace_back(asset);
149 VBucket data;
150 data.insert_or_assign(COL_ID, i);
151 data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
152 data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
153 data.insert_or_assign(COL_ASSETS, assets);
154 data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
155 record.push_back(data);
156
157 VBucket log;
158 Timestamp now = TimeHelper::GetSysCurrentTime();
159 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
161 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
162 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
163 extend.push_back(log);
164 }
165 }
166
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)167 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
168 {
169 int errCode;
170 std::vector<VBucket> record;
171 std::vector<VBucket> extend;
172 GenerateDataRecords(begin, count, 0, record, extend);
173 const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
174 for (VBucket vBucket : record) {
175 sqlite3_stmt *stmt = nullptr;
176 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
177 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
178 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
179 ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
180 sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
181 if (isAssetNull) {
182 ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
183 } else {
184 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
185 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
186 }
187 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
188 std::get<Assets>(vBucket[COL_ASSETS]));
189 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
190 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
191 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
192 SQLiteUtils::ResetStatement(stmt, true, errCode);
193 }
194 }
195
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets)196 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets)
197 {
198 int errCode;
199 std::vector<uint8_t> assetBlob;
200 const string sql = "update " + tableName + " set assets=?;";
201 sqlite3_stmt *stmt = nullptr;
202 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
203 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
204 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
205 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
206 SQLiteUtils::ResetStatement(stmt, true, errCode);
207 }
208
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)209 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
210 {
211 int errCode;
212 std::vector<uint8_t> assetBlob;
213 const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
214 " and id<=" + std::to_string(end) + ";";
215 sqlite3_stmt *stmt = nullptr;
216 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
217 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
218 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
219 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
220 SQLiteUtils::ResetStatement(stmt, true, errCode);
221 }
222
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)223 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
224 {
225 ASSERT_NE(db, nullptr);
226 for (int64_t i = begin; i < begin + count; i++) {
227 string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
228 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
229 }
230 }
231
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)232 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
233 {
234 for (int64_t i = begin; i < begin + count; i++) {
235 VBucket idMap;
236 idMap.insert_or_assign("#_gid", std::to_string(i));
237 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
238 }
239 }
240
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)241 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
242 const std::string &tableName)
243 {
244 std::this_thread::sleep_for(std::chrono::milliseconds(1));
245 std::vector<VBucket> record;
246 std::vector<VBucket> extend;
247 GenerateDataRecords(begin, count, gidStart, record, extend);
248 for (auto &entry: extend) {
249 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
250 }
251 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
252 std::this_thread::sleep_for(std::chrono::milliseconds(1));
253 }
254
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)255 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
256 {
257 auto status = static_cast<std::vector<int64_t> *>(data);
258 int base = 10;
259 for (int i = 0; i < count; i++) {
260 status->push_back(strtol(colValue[0], nullptr, base));
261 }
262 return 0;
263 }
264
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)265 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
266 {
267 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
268 std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
269 " and data_key <=" + std::to_string(endId) + ";";
270 std::vector<int64_t> status;
271 char *str = NULL;
272 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
273 SQLITE_OK);
274 ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
275
276 for (auto stat : status) {
277 ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
278 }
279 }
280
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)281 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
282 {
283 std::vector<VBucket> record;
284 std::vector<VBucket> extend;
285 GenerateDataRecords(begin, count, gidStart, record, extend);
286 if (tableName == ASSETS_TABLE_NAME_SHARED) {
287 for (auto &vBucket: record) {
288 vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
289 }
290 }
291 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
292 }
293
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)294 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
295 {
296 std::unique_lock<std::mutex> lock(g_processMutex);
297 bool result = g_processCondition.wait_for(
298 lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
299 ASSERT_EQ(result, true);
300 LOGD("-------------------sync end--------------");
301 }
302
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK)303 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK)
304 {
305 g_syncProcess = {};
306 Query query = Query::Select().FromTable(tableNames);
307 std::vector<SyncProcess> expectProcess;
308 CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
309 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
310 g_syncProcess = std::move(process.begin()->second);
311 if (g_syncProcess.process == FINISHED) {
312 g_processCondition.notify_one();
313 ASSERT_EQ(g_syncProcess.errCode, errCode);
314 }
315 };
316 CloudSyncOption option;
317 option.devices = {DEVICE_CLOUD};
318 option.mode = mode;
319 option.query = query;
320 option.waitTime = SYNC_WAIT_TIME;
321 option.lockAction = static_cast<LockAction>(0xff); // lock all
322 ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
323
324 if (dbStatus == DBStatus::OK) {
325 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
326 }
327 }
328
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)329 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
330 {
331 for (auto &item : assets) {
332 for (auto &asset : item.second) {
333 EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
334 if (index < 4) { // 1-4 is inserted
335 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
336 }
337 LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
338 index);
339 }
340 }
341 }
342
CheckDownloadFailedForTest002(sqlite3 * & db)343 void CheckDownloadFailedForTest002(sqlite3 *&db)
344 {
345 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
346 sqlite3_stmt *stmt = nullptr;
347 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
348 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
349 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
350 Type cloudValue;
351 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
352 std::vector<uint8_t> assetsBlob;
353 Assets assets;
354 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
355 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
356 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
357 for (size_t i = 0; i < assets.size(); ++i) {
358 EXPECT_EQ(assets[i].hash, "");
359 EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
360 }
361 }
362 int errCode;
363 SQLiteUtils::ResetStatement(stmt, true, errCode);
364 }
365
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)366 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
367 {
368 Assets assets;
369 Asset asset = ASSET_COPY;
370 asset.name = ASSET_COPY.name + std::to_string(id);
371 asset.status = status;
372 assets.emplace_back(asset);
373 asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
374 assets.emplace_back(asset);
375 int errCode;
376 std::vector<uint8_t> assetBlob;
377 const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
378 sqlite3_stmt *stmt = nullptr;
379 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
380 assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
381 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
382 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
383 SQLiteUtils::ResetStatement(stmt, true, errCode);
384 }
385
CheckConsistentCount(sqlite3 * db,int64_t expectCount)386 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
387 {
388 EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
389 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
390 }
391
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)392 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
393 {
394 EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
395 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
396 }
397
CloseDb()398 void CloseDb()
399 {
400 delete g_observer;
401 g_virtualCloudDb = nullptr;
402 if (g_delegate != nullptr) {
403 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
404 g_delegate = nullptr;
405 }
406 }
407
408 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
409 public:
410 static void SetUpTestCase(void);
411 static void TearDownTestCase(void);
412 void SetUp();
413 void TearDown();
414
415 protected:
416 void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
417 const std::set<int> &failIndex);
418 void CheckLocalAssetIsEmpty(const std::string &tableName);
419 void CheckCursorData(const std::string &tableName, int begin);
420 void WaitForSync(int &syncCount);
421 const RelationalSyncAbleStorage *GetRelationalStore();
422 void InitDataStatusTest(bool needDownload);
423 void DataStatusTest001(bool needDownload);
424 void DataStatusTest003();
425 void DataStatusTest004();
426 void DataStatusTest005();
427 void DataStatusTest006();
428 void DataStatusTest007();
429 sqlite3 *db = nullptr;
430 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
431 };
432
SetUpTestCase(void)433 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
434 {
435 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
436 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
437 LOGI("The test db is:%s", g_storePath.c_str());
438 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
439 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
440 }
441
TearDownTestCase(void)442 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
443
SetUp(void)444 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
445 {
446 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
447 LOGE("rm test db files error.");
448 }
449 DistributedDBToolsUnitTest::PrintTestCaseInfo();
450 LOGD("Test dir is %s", g_testDir.c_str());
451 db = RelationalTestUtils::CreateDataBase(g_storePath);
452 ASSERT_NE(db, nullptr);
453 InitDatabase(db);
454 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
455 ASSERT_NE(g_observer, nullptr);
456 ASSERT_EQ(
457 g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
458 DBStatus::OK);
459 ASSERT_NE(g_delegate, nullptr);
460 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
461 ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
462 ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
463 g_virtualCloudDb = make_shared<VirtualCloudDb>();
464 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
465 g_syncProcess = {};
466 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
467 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
468 DataBaseSchema dataBaseSchema;
469 GetCloudDbSchema(dataBaseSchema);
470 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
471 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
472 ASSERT_NE(g_cloudStoreHook, nullptr);
473 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
474 ASSERT_TRUE(communicatorAggregator_ != nullptr);
475 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
476 }
477
TearDown(void)478 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
479 {
480 RefObject::DecObjRef(g_store);
481 g_virtualCloudDb->ForkUpload(nullptr);
482 CloseDb();
483 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
484 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
485 LOGE("rm test db files error.");
486 }
487 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
488 communicatorAggregator_ = nullptr;
489 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
490 }
491
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)492 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
493 const std::string &expectAssetId, const std::set<int> &failIndex)
494 {
495 std::string sql = "SELECT assets FROM " + tableName + ";";
496 sqlite3_stmt *stmt = nullptr;
497 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
498 int index = 0;
499 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
500 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
501 Type cloudValue;
502 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
503 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
504 for (const auto &asset : assets) {
505 index++;
506 if (failIndex.find(index) != failIndex.end()) {
507 EXPECT_EQ(asset.assetId, "0");
508 } else {
509 EXPECT_EQ(asset.assetId, expectAssetId);
510 }
511 }
512 }
513 int errCode = E_OK;
514 SQLiteUtils::ResetStatement(stmt, true, errCode);
515 }
516
CheckLocalAssetIsEmpty(const std::string & tableName)517 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
518 {
519 std::string sql = "SELECT asset FROM " + tableName + ";";
520 sqlite3_stmt *stmt = nullptr;
521 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
522 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
523 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
524 }
525 int errCode = E_OK;
526 SQLiteUtils::ResetStatement(stmt, true, errCode);
527 }
528
CheckCursorData(const std::string & tableName,int begin)529 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
530 {
531 std::string logTableName = DBCommon::GetLogTableName(tableName);
532 std::string sql = "SELECT cursor FROM " + logTableName + ";";
533 sqlite3_stmt *stmt = nullptr;
534 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
535 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
536 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
537 Type cloudValue;
538 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
539 EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
540 begin++;
541 }
542 int errCode = E_OK;
543 SQLiteUtils::ResetStatement(stmt, true, errCode);
544 }
545
WaitForSync(int & syncCount)546 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
547 {
548 std::unique_lock<std::mutex> lock(g_processMutex);
549 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
550 [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
551 ASSERT_EQ(result, true);
552 }
553
GetRelationalStore()554 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
555 {
556 RelationalDBProperties properties;
557 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
558 int errCode = E_OK;
559 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
560 if (g_store == nullptr) {
561 return nullptr;
562 }
563 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
564 }
565
InitDataStatusTest(bool needDownload)566 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
567 {
568 int cloudCount = 20;
569 int localCount = 10;
570 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
571 if (needDownload) {
572 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
573 }
574 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
575 std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
576 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
577 sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
578 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
579 sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
580 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
581 std::this_thread::sleep_for(std::chrono::milliseconds(1));
582 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
583 std::this_thread::sleep_for(std::chrono::milliseconds(1));
584 sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
585 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
586 sql = "update " + logName + " SET status = 1 where data_key in (4);";
587 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
588 }
589
DataStatusTest001(bool needDownload)590 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
591 {
592 int cloudCount = 20;
593 int count = 0;
594 g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
595 count++;
596 if (count == 1) {
597 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
598 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
599 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
600 }
601 if (count == 2) { // 2 is compensated sync
602 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
603 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
604 CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
605 g_processCondition.notify_one();
606 }
607 });
608 InitDataStatusTest(needDownload);
609 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
610 WaitForSync(count);
611 }
612
DataStatusTest003()613 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
614 {
615 int count = 0;
616 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
617 count++;
618 if (count == 1) {
619 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
620 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
621 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
622 }
623 if (count == 2) { // 2 is compensated sync
624 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
625 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
626 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
627 g_processCondition.notify_one();
628 }
629 });
630 int downLoadCount = 0;
631 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
632 downLoadCount++;
633 if (downLoadCount == 1) {
634 std::vector<std::vector<uint8_t>> hashKey;
635 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
636 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
637 }
638 });
639 InitDataStatusTest(true);
640 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
641 WaitForSync(count);
642 }
643
DataStatusTest004()644 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
645 {
646 int count = 0;
647 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
648 count++;
649 if (count == 1) {
650 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
651 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
652 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
653 }
654 if (count == 2) { // 2 is compensated sync
655 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
656 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
657 CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
658 g_processCondition.notify_one();
659 }
660 });
661 int downLoadCount = 0;
662 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
663 downLoadCount++;
664 if (downLoadCount == 1) {
665 std::vector<std::vector<uint8_t>> hashKey;
666 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
667 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
668 std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
669 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
670 }
671 });
672 int queryIdx = 0;
673 g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
674 LOGD("query index:%d", ++queryIdx);
675 if (queryIdx == 4) { // 4 is compensated sync
676 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
677 " SET status = 1 where data_key=15;";
678 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
679 }
680 });
681 InitDataStatusTest(true);
682 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
683 WaitForSync(count);
684 }
685
DataStatusTest005()686 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
687 {
688 int count = 0;
689 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
690 count++;
691 if (count == 1) {
692 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
693 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
694 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
695 }
696 if (count == 2) { // 2 is compensated sync
697 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
698 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
699 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
700 g_processCondition.notify_one();
701 }
702 });
703 int downLoadCount = 0;
704 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
705 downLoadCount++;
706 if (downLoadCount == 1) {
707 std::vector<std::vector<uint8_t>> hashKey;
708 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
709 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
710 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
711 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
712 }
713 });
714 InitDataStatusTest(true);
715 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
716 WaitForSync(count);
717 }
718
DataStatusTest006()719 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
720 {
721 int count = 0;
722 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
723 count++;
724 if (count == 1) {
725 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
726 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
727 "(status = 0 and data_key in (11))";
728 CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
729 }
730 if (count == 2) { // 2 is compensated sync
731 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
732 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
733 CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
734 g_processCondition.notify_one();
735 }
736 });
737 int downLoadCount = 0;
738 g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
739 downLoadCount++;
740 if (downLoadCount == 1) {
741 std::vector<std::vector<uint8_t>> hashKey;
742 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
743 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
744 std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
745 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
746 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
747 }
748 });
749 InitDataStatusTest(true);
750 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
751 WaitForSync(count);
752 }
753
DataStatusTest007()754 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
755 {
756 int count = 0;
757 g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
758 count++;
759 if (count == 1) {
760 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
761 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
762 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
763 }
764 if (count == 2) { // 2 is compensated sync
765 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
766 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
767 CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
768 g_processCondition.notify_one();
769 }
770 });
771 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
772 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
773 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
774 .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
775 std::map<std::string, Assets> &assets) {
776 return CLOUD_ERROR;
777 });
778 InitDataStatusTest(true);
779 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
780 WaitForSync(count);
781 }
782
783 /*
784 * @tc.name: DownloadAssetForDupDataTest001
785 * @tc.desc: Test the download interface call with duplicate data for the same primary key.
786 * @tc.type: FUNC
787 * @tc.require:
788 * @tc.author: liufuchenxing
789 */
790 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level0)
791 {
792 /**
793 * @tc.steps:step1. Mock asset download interface.
794 * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
795 */
796 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
797 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
798 int index = 1;
799 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
800 .Times(2)
801 .WillRepeatedly(
__anon32d7e6601102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 802 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
803 LOGD("Download GID:%s", gid.c_str());
804 CheckDownloadForTest001(index, assets);
805 index++;
806 return DBStatus::OK;
807 });
808
809 /**
810 * @tc.steps:step2. Insert local data [0, 10), sync data
811 * @tc.expected: step2. sync success.
812 */
813 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
814 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
815
816 /**
817 * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
818 * @tc.expected: step3. sync success.
819 */
820 DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
821 InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
822 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
823 }
824
825 /**
826 * @tc.name: FillAssetId001
827 * @tc.desc: Test if assetId is filled in single primary key table
828 * @tc.type: FUNC
829 * @tc.require:
830 * @tc.author: chenchaohao
831 */
832 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level0)
833 {
834 /**
835 * @tc.steps:step1. local insert assets and sync, check the local assetId.
836 * @tc.expected: step1. return OK.
837 */
838 int localCount = 50;
839 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
840 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
841 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
842
843 /**
844 * @tc.steps:step2. local update assets and sync ,check the local assetId.
845 * @tc.expected: step2. sync success.
846 */
847 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
848 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
849 CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
850 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
851 }
852
853 /**
854 * @tc.name: FillAssetId002
855 * @tc.desc: Test if assetId is filled in no primary key table
856 * @tc.type: FUNC
857 * @tc.require:
858 * @tc.author: chenchaohao
859 */
860 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level0)
861 {
862 /**
863 * @tc.steps:step1. local insert assets and sync, check the local assetId.
864 * @tc.expected: step1. return OK.
865 */
866 int localCount = 50;
867 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
868 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
869 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
870
871 /**
872 * @tc.steps:step2. local update assets and sync ,check the local assetId.
873 * @tc.expected: step2. sync success.
874 */
875 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
876 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
877 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
878 }
879
880 /**
881 * @tc.name: FillAssetId003
882 * @tc.desc: Test if assetId is filled in compound primary key table
883 * @tc.type: FUNC
884 * @tc.require:
885 * @tc.author: chenchaohao
886 */
887 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level0)
888 {
889 /**
890 * @tc.steps:step1. local insert assets and sync, check the local assetId.
891 * @tc.expected: step1. return OK.
892 */
893 int localCount = 50;
894 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
895 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
896 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
897
898 /**
899 * @tc.steps:step2. local update assets and sync ,check the local assetId.
900 * @tc.expected: step2. sync success.
901 */
902 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
903 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
904 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
905 }
906
907 /**
908 * @tc.name: FillAssetId004
909 * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
910 * @tc.type: FUNC
911 * @tc.require:
912 * @tc.author: chenchaohao
913 */
914 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level0)
915 {
916 /**
917 * @tc.steps:step1. local insert assets and sync, check the local assetId.
918 * @tc.expected: step1. return OK.
919 */
920 int localCount = 50;
921 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
922 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
923 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
924
925 /**
926 * @tc.steps:step2. local update assets and sync ,check the local assetId.
927 * @tc.expected: step2. sync success.
928 */
929 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
930 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
931 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
932 }
933
934 /**
935 * @tc.name: FillAssetId001
936 * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
937 * @tc.type: FUNC
938 * @tc.require:
939 * @tc.author: chenchaohao
940 */
941 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level0)
942 {
943 /**
944 * @tc.steps:step1. local insert assets and sync, check the local assetId.
945 * @tc.expected: step1. return OK.
946 */
947 int localCount = 50;
948 InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
949 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
950 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
951
952 /**
953 * @tc.steps:step2. local update assets and sync ,check the local assetId.
954 * @tc.expected: step2. sync success.
955 */
956 UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
957 CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
958 CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
959 }
960
961 /**
962 * @tc.name: FillAssetId006
963 * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
964 * @tc.type: FUNC
965 * @tc.require:
966 * @tc.author: chenchaohao
967 */
968 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level0)
969 {
970 /**
971 * @tc.steps:step1. local insert assets and sync, check the local assetId.
972 * @tc.expected: step1. return OK.
973 */
974 int localCount = 50;
975 InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
976 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
977 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
978
979 /**
980 * @tc.steps:step2. local update assets and sync ,check the local assetId.
981 * @tc.expected: step2. sync success.
982 */
983 UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
984 CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
985 CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
986 }
987
988 /**
989 * @tc.name: FillAssetId007
990 * @tc.desc: Test if assetId is filled when extend lack of assets
991 * @tc.type: FUNC
992 * @tc.require:
993 * @tc.author: chenchaohao
994 */
995 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level0)
996 {
997 CloudSyncConfig config;
998 config.maxUploadCount = 200; // max upload 200
999 g_delegate->SetCloudSyncConfig(config);
1000 /**
1001 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1002 * @tc.expected: step1. return OK.
1003 */
1004 int localCount = 50;
1005 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601202(const std::string &tableName, VBucket &extend) 1006 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1007 extend.erase("assets");
1008 });
1009 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1010 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1011
1012 /**
1013 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1014 * @tc.expected: step2. sync success.
1015 */
1016 int addLocalCount = 10;
1017 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601302(const std::string &tableName, VBucket &extend) 1018 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1019 if (extend.find("assets") != extend.end()) {
1020 for (auto &asset : std::get<Assets>(extend["assets"])) {
1021 asset.name = "pad";
1022 }
1023 }
1024 });
1025 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1026 int beginFailFillNum = 101;
1027 int endFailFillNum = 120;
1028 std::set<int> index;
1029 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1030 index.insert(i);
1031 }
1032 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1033
1034 /**
1035 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1036 * @tc.expected: step2. sync success.
1037 */
1038 g_virtualCloudDb->ForkUpload(nullptr);
1039 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1040 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1041 }
1042
1043 /**
1044 * @tc.name: FillAssetId008
1045 * @tc.desc: Test if assetId is filled when extend lack of assetId
1046 * @tc.type: FUNC
1047 * @tc.require:
1048 * @tc.author: chenchaohao
1049 */
1050 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level0)
1051 {
1052 /**
1053 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1054 * @tc.expected: step1. return OK.
1055 */
1056 int localCount = 50;
1057 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601402(const std::string &tableName, VBucket &extend) 1058 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1059 if (extend.find("assets") != extend.end()) {
1060 for (auto &asset : std::get<Assets>(extend["assets"])) {
1061 asset.assetId = "";
1062 }
1063 }
1064 });
1065 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1066 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1067
1068 /**
1069 * @tc.steps:step2. local update assets and sync ,check the local assetId.
1070 * @tc.expected: step2. sync success.
1071 */
1072 g_virtualCloudDb->ForkUpload(nullptr);
1073 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1074 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1075 }
1076
1077 /**
1078 * @tc.name: FillAssetId009
1079 * @tc.desc: Test if assetId is filled when extend exists useless assets
1080 * @tc.type: FUNC
1081 * @tc.require:
1082 * @tc.author: chenchaohao
1083 */
1084 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level0)
1085 {
1086 /**
1087 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1088 * @tc.expected: step1. return OK.
1089 */
1090 int localCount = 50;
1091 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601502(const std::string &tableName, VBucket &extend) 1092 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1093 if (extend.find("assets") != extend.end()) {
1094 Asset asset = ASSET_COPY2;
1095 Assets &assets = std::get<Assets>(extend["assets"]);
1096 assets.push_back(asset);
1097 }
1098 });
1099 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1100 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1101 }
1102
1103 /**
1104 * @tc.name: FillAssetId010
1105 * @tc.desc: Test if assetId is filled when some success and some fail
1106 * @tc.type: FUNC
1107 * @tc.require:
1108 * @tc.author: chenchaohao
1109 */
1110 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level0)
1111 {
1112 /**
1113 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1114 * @tc.expected: step1. return OK.
1115 */
1116 int localCount = 30;
1117 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1118 g_virtualCloudDb->SetInsertFailed(1);
1119 std::atomic<int> count = 0;
__anon32d7e6601602(const std::string &tableName, VBucket &extend) 1120 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1121 if (extend.find("assets") != extend.end() && count == 0) {
1122 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1123 count++;
1124 }
1125 });
1126 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1127 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1128 }
1129
1130 /**
1131 * @tc.name: FillAssetId011
1132 * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1133 * @tc.type: FUNC
1134 * @tc.require:
1135 * @tc.author: chenchaohao
1136 */
1137 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level0)
1138 {
1139 /**
1140 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1141 * @tc.expected: step1. return OK.
1142 */
1143 int localCount = 50;
1144 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1145 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1146 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1147
1148 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1149 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1150 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1151 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1152 }
1153
1154 /**
1155 * @tc.name: FillAssetId012
1156 * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1157 * @tc.type: FUNC
1158 * @tc.require:
1159 * @tc.author: chenchaohao
1160 */
1161 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level0)
1162 {
1163 /**
1164 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1165 * @tc.expected: step1. return OK.
1166 */
1167 int localCount = 50;
1168 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1169 std::atomic<int> count = 1;
1170 g_virtualCloudDb->SetClearExtend(count);
1171 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1172 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1173
1174 /**
1175 * @tc.steps:step2. set extend size normal then sync, check the asseid.
1176 * @tc.expected: step2. return OK.
1177 */
1178 g_virtualCloudDb->SetClearExtend(0);
1179 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1180 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1181
1182 /**
1183 * @tc.steps:step3. set extend size large then sync, check the asseid.
1184 * @tc.expected: step3. return OK.
1185 */
1186 count = -1; // -1 means extend push a empty vBucket
1187 g_virtualCloudDb->SetClearExtend(count);
1188 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1189 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1190 }
1191
1192 /**
1193 * @tc.name: FillAssetId013
1194 * @tc.desc: Test fill assetId and removedevicedata when data is delete
1195 * @tc.type: FUNC
1196 * @tc.require:
1197 * @tc.author: chenchaohao
1198 */
1199 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level0)
1200 {
1201 /**
1202 * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1203 * @tc.expected: step1. return OK.
1204 */
1205 int localCount = 20;
1206 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1207 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1208 int deleteLocalCount = 10;
1209 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1210 int addLocalCount = 30;
1211 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1212 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1213
1214 /**
1215 * @tc.steps:step2. RemoveDeviceData.
1216 * @tc.expected: step2. return OK.
1217 */
1218 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1219 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1220 }
1221
1222 /**
1223 * @tc.name: FillAssetId014
1224 * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1225 * @tc.type: FUNC
1226 * @tc.require:
1227 * @tc.author: bty
1228 */
1229 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level0)
1230 {
1231 /**
1232 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1233 * @tc.expected: step1. return OK.
1234 */
1235 int localCount = 50;
1236 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1237 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1238 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1239
1240 /**
1241 * @tc.steps:step2. RemoveDeviceData
1242 * @tc.expected: step2. return OK.
1243 */
1244 Assets assets;
1245 std::vector<AssetStatus> statusVec = {
1246 AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1247 AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1248 };
1249 for (auto &status : statusVec) {
1250 Asset temp = ASSET_COPY;
1251 temp.name += std::to_string(status);
1252 temp.status = status | AssetStatus::UPLOADING;
1253 assets.emplace_back(temp);
1254 }
1255 UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1256 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1257 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1258
1259 /**
1260 * @tc.steps:step3. check status
1261 * @tc.expected: step3. return OK.
1262 */
1263 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1264 sqlite3_stmt *stmt = nullptr;
1265 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1266 int index = 0;
1267 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1268 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1269 Type cloudValue;
1270 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1271 Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1272 for (const auto &ast : newAssets) {
1273 EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1274 }
1275 }
1276 int errCode = E_OK;
1277 SQLiteUtils::ResetStatement(stmt, true, errCode);
1278 }
1279
1280 /**
1281 * @tc.name: FillAssetId015
1282 * @tc.desc: Test if fill assetId when upload return cloud network error
1283 * @tc.type: FUNC
1284 * @tc.require:
1285 * @tc.author: chenchaohao
1286 */
1287 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level0)
1288 {
1289 /**
1290 * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1291 * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1292 */
1293 int localCount = 20;
1294 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1295 g_virtualCloudDb->SetCloudNetworkError(true);
1296 std::atomic<int> count = 0;
__anon32d7e6601702(const std::string &tableName, VBucket &extend) 1297 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1298 if (extend.find("assets") != extend.end() && count == 0) {
1299 extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1300 count++;
1301 }
1302 });
1303 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1304 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1305 g_virtualCloudDb->SetCloudNetworkError(false);
1306 g_virtualCloudDb->ForkUpload(nullptr);
1307 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1308 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1309
1310 /**
1311 * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1312 * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1313 */
1314 int addLocalCount = 10;
1315 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1316 std::atomic<int> num = 0;
__anon32d7e6601802(const std::string &tableName, VBucket &extend) 1317 g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1318 if (extend.find("assets") != extend.end() && num == 0) {
1319 for (auto &asset : std::get<Assets>(extend["assets"])) {
1320 asset.name = "pad";
1321 break;
1322 }
1323 num++;
1324 }
1325 });
1326 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1327 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1328 }
1329
1330 /**
1331 * @tc.name: FillAssetId016
1332 * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1333 * @tc.type: FUNC
1334 * @tc.require:
1335 * @tc.author: chenchaohao
1336 */
1337 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level0)
1338 {
1339 /**
1340 * @tc.steps:step1. local insert data and sync, then delete last local data
1341 * @tc.expected: step1. return OK.
1342 */
1343 int localCount = 20;
1344 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1345 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1346 int deletLocalCount = 10;
1347 DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1348 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1349
1350 /**
1351 * @tc.steps:step2. RemoveDeviceData.
1352 * @tc.expected: step2. return OK.
1353 */
1354 g_delegate->RemoveDeviceData("", FLAG_ONLY);
1355 CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1356 }
1357
1358 /**
1359 * @tc.name: FillAssetId017
1360 * @tc.desc: Test cursor when download not change
1361 * @tc.type: FUNC
1362 * @tc.require:
1363 * @tc.author: chenchaohao
1364 */
1365 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level0)
1366 {
1367 /**
1368 * @tc.steps:step1. local insert data and sync,check cursor.
1369 * @tc.expected: step1. return OK.
1370 */
1371 int localCount = 20;
1372 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1373 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1374 CheckCursorData(ASSETS_TABLE_NAME, 1);
1375
1376 /**
1377 * @tc.steps:step2. sync again and optype is not change, check cursor.
1378 * @tc.expected: step2. return OK.
1379 */
1380 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1381 CheckCursorData(ASSETS_TABLE_NAME, 1);
1382 }
1383
1384 /**
1385 * @tc.name: FillAssetId018
1386 * @tc.desc: Test if assetId is filled when contains "#_error"
1387 * @tc.type: FUNC
1388 * @tc.require:
1389 * @tc.author: zhaoliang
1390 */
1391 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level0)
1392 {
1393 /**
1394 * @tc.steps:step1. local insert assets and sync, check the local assetId.
1395 * @tc.expected: step1. return OK.
1396 */
1397 int localCount = 30;
1398 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1399 std::atomic<int> count = 0;
__anon32d7e6601902(const std::string &tableName, VBucket &extend) 1400 g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1401 if (extend.find("assets") != extend.end() && count == 0) {
1402 extend["#_error"] = std::string("test");
1403 count++;
1404 }
1405 });
1406 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1407 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1408 }
1409
1410 /**
1411 * @tc.name: DownloadAssetForDupDataTest002
1412 * @tc.desc: Test download failed
1413 * @tc.type: FUNC
1414 * @tc.require:
1415 * @tc.author: bty
1416 */
1417 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level0)
1418 {
1419 /**
1420 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1421 * @tc.expected: step1. return OK
1422 */
1423 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1424 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1425 int index = 0;
1426 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1427 .WillRepeatedly(
__anon32d7e6601a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1428 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1429 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1430 return DBStatus::CLOUD_ERROR;
1431 });
1432
1433 /**
1434 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1435 * @tc.expected: step2. sync success.
1436 */
1437 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1438 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1439
1440 /**
1441 * @tc.steps:step3. check if the hash of assets in db is empty
1442 * @tc.expected: step3. OK
1443 */
1444 CheckDownloadFailedForTest002(db);
1445 }
1446
1447 /**
1448 * @tc.name: DownloadAssetForDupDataTest003
1449 * @tc.desc: Test download failed and flag was modified
1450 * @tc.type: FUNC
1451 * @tc.require:
1452 * @tc.author: bty
1453 */
1454 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level0)
1455 {
1456 /**
1457 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1458 * @tc.expected: step1. return OK
1459 */
1460 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1461 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1462 int index = 0;
1463 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1464 .WillRepeatedly(
__anon32d7e6601b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1465 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1466 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1467 for (auto &item : assets) {
1468 for (auto &asset : item.second) {
1469 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1470 }
1471 }
1472 return DBStatus::CLOUD_ERROR;
1473 });
1474
1475 /**
1476 * @tc.steps:step2. Insert cloud data [0, 10), sync data
1477 * @tc.expected: step2. sync success.
1478 */
1479 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1480 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1481
1482 /**
1483 * @tc.steps:step3. check if the hash of assets in db is empty
1484 * @tc.expected: step3. OK
1485 */
1486 CheckDownloadFailedForTest002(db);
1487 }
1488
1489 /**
1490 * @tc.name: DownloadAssetForDupDataTest004
1491 * @tc.desc: test sync with deleted assets
1492 * @tc.type: FUNC
1493 * @tc.require:
1494 * @tc.author: bty
1495 */
1496 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level0)
1497 {
1498 /**
1499 * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1500 * @tc.expected: step1. return OK
1501 */
1502 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1503 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1504 int index = 0;
1505 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1506 .WillRepeatedly(
__anon32d7e6601c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1507 [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1508 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1509 return DBStatus::OK;
1510 });
1511
1512 /**
1513 * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1514 * @tc.expected: step2. return OK
1515 */
1516 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1517 UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1518 UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1519 InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1520
1521 /**
1522 * @tc.steps:step3. sync, check download num
1523 * @tc.expected: step3. return OK
1524 */
1525 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1526 EXPECT_GE(index, 2); // 2 is download num
1527 }
1528
1529 /**
1530 * @tc.name: DownloadAssetForDupDataTest005
1531 * @tc.desc: test DOWNLOADING status of asset after uploading
1532 * @tc.type: FUNC
1533 * @tc.require:
1534 * @tc.author: bty
1535 */
1536 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level0)
1537 {
1538 /**
1539 * @tc.steps:step1. init data and sync
1540 * @tc.expected: step1. return OK
1541 */
1542 InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1543 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1544 UpdateAssetsForLocal(db, 6, AssetStatus::DOWNLOADING); // 6 is id
1545 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1546
1547 /**
1548 * @tc.steps:step2. check asset status
1549 * @tc.expected: step2. return OK
1550 */
1551 std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1552 sqlite3_stmt *stmt = nullptr;
1553 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1554 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1555 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1556 Type cloudValue;
1557 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1558 std::vector<uint8_t> assetsBlob;
1559 Assets assets;
1560 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1561 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1562 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1563 for (size_t i = 0; i < assets.size(); ++i) {
1564 EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1565 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1566 }
1567 }
1568 int errCode;
1569 SQLiteUtils::ResetStatement(stmt, true, errCode);
1570 }
1571
1572 /**
1573 * @tc.name: FillAssetId019
1574 * @tc.desc: Test the stability of cleaning asset id
1575 * @tc.type: FUNC
1576 * @tc.require:
1577 * @tc.author: bty
1578 */
1579 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level0)
1580 {
1581 /**
1582 * @tc.steps:step1. local insert assets and sync.
1583 * @tc.expected: step1. return OK.
1584 */
1585 int localCount = 20;
1586 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1587 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1588
1589 /**
1590 * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1591 * @tc.expected: step2. return OK.
1592 */
1593 std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1594 + " set data_key='999' where data_key>'10';";
1595 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1596 EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1597 }
1598
1599 /**
1600 * @tc.name: FillAssetId020
1601 * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1602 * @tc.type: FUNC
1603 * @tc.require:
1604 * @tc.author: zhangtao
1605 */
1606 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level0)
1607 {
1608 CloudSyncConfig config;
1609 config.maxUploadCount = 200; // max upload 200
1610 g_delegate->SetCloudSyncConfig(config);
1611
1612 /**
1613 * @tc.steps:step1. local insert assets and erase assets extends
1614 * @tc.expected: step1. return OK.
1615 */
1616 int localCount = 50;
1617 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601d02(const std::string &tableName, VBucket &extend) 1618 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1619 extend.erase("assets");
1620 });
1621 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1622 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1623
1624 /**
1625 * @tc.steps:step2. local insert assets and modify assetId to empty
1626 * @tc.expected: step2. return OK.
1627 */
1628 int addLocalCount = 10;
1629 InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601e02(const std::string &tableName, VBucket &extend) 1630 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1631 if (extend.find("assets") != extend.end()) {
1632 for (auto &asset : std::get<Assets>(extend["assets"])) {
1633 asset.assetId = "";
1634 }
1635 }
1636 });
1637 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1638 int beginFailFillNum = 101;
1639 int endFailFillNum = 120;
1640 std::set<int> index;
1641 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1642 index.insert(i);
1643 }
1644 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1645
1646 /**
1647 * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1648 * @tc.expected: step3. return OK.
1649 */
1650 InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601f02(const std::string &tableName, VBucket &extend) 1651 g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1652 if (extend.find("assets") != extend.end()) {
1653 for (auto &asset : std::get<Assets>(extend["assets"])) {
1654 asset.name = "mod_pat";
1655 }
1656 }
1657 });
1658 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1659 beginFailFillNum = 121;
1660 endFailFillNum = 140;
1661 std::set<int> newIndex;
1662 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1663 newIndex.insert(i);
1664 }
1665 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1666
1667 /**
1668 * @tc.steps:step4. local update assets and sync, check the local assetId.
1669 * @tc.expected: step4. sync success.
1670 */
1671 g_virtualCloudDb->ForkUpload(nullptr);
1672 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1673 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1674 }
1675
1676 /**
1677 * @tc.name: FillAssetId021
1678 * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1679 * @tc.type: FUNC
1680 * @tc.require:
1681 * @tc.author: zhangtao
1682 */
1683 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level0)
1684 {
1685 CloudSyncConfig config;
1686 config.maxUploadCount = 200; // max upload 200
1687 g_delegate->SetCloudSyncConfig(config);
1688
1689 /**
1690 * @tc.steps:step1. local insert assets and erase assets extends
1691 * @tc.expected: step1. return OK.
1692 */
1693 int localCount = 50;
1694 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1695
1696 /**
1697 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1698 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1699 */
1700 int uploadFailId = 0;
1701 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1702 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1703 uploadFailId++;
1704 if (uploadFailId == 25) { // 25 is the middle record
1705 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1706 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1707 }
1708 return OK;
1709 });
1710
1711 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1712 int beginFailFillNum = 49;
1713 int endFailFillNum = 50;
1714 std::set<int> index;
1715 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1716 index.insert(i);
1717 }
1718 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1719 g_virtualCloudDb->ForkUpload(nullptr);
1720 }
1721
1722 /**
1723 * @tc.name: FillAssetId023
1724 * @tc.desc: Test if BatchUpdate with local assets missing
1725 * @tc.type: FUNC
1726 * @tc.require:
1727 * @tc.author: zhangtao
1728 */
1729 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level0)
1730 {
1731 /**
1732 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1733 * @tc.expected: step1. return OK.
1734 */
1735 int localCount = 50;
1736 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1737 std::atomic<int> count = 1;
1738 g_virtualCloudDb->SetClearExtend(count);
1739 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1740 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1741
1742 /**
1743 * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1744 * @tc.expected: step2. return OK.
1745 */
1746 g_virtualCloudDb->SetClearExtend(0);
1747
1748 int uploadFailId = 0;
1749 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1750 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1751 uploadFailId++;
1752 if (uploadFailId == 25) { // 25 is the middle record
1753 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1754 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1755 }
1756 return OK;
1757 });
1758 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1759 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1760 }
1761
1762 /**
1763 * @tc.name: FillAssetId024
1764 * @tc.desc: Test if BatchUpdate with multiple local assets missing
1765 * @tc.type: FUNC
1766 * @tc.require:
1767 * @tc.author: zhangtao
1768 */
1769 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level0)
1770 {
1771 /**
1772 * @tc.steps:step1. set extend size missing then sync, check the asseid.
1773 * @tc.expected: step1. return OK.
1774 */
1775 int localCount = 50;
1776 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1777 std::atomic<int> count = 1;
1778 g_virtualCloudDb->SetClearExtend(count);
1779 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1780 CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1781
1782 /**
1783 * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1784 * @tc.expected: step2. return OK.
1785 */
1786 g_virtualCloudDb->SetClearExtend(0);
1787
1788 int uploadFailId = 0;
1789 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1790 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1791 uploadFailId++;
1792 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1793 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1794 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1795 }
1796 return OK;
1797 });
1798 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1799 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1800 }
1801
1802 /**
1803 * @tc.name: FillAssetId022
1804 * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1805 * @tc.type: FUNC
1806 * @tc.require:
1807 * @tc.author: zhangtao
1808 */
1809 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level0)
1810 {
1811 CloudSyncConfig config;
1812 config.maxUploadCount = 200; // max upload 200
1813 g_delegate->SetCloudSyncConfig(config);
1814
1815 /**
1816 * @tc.steps:step1. local insert assets and erase assets extends
1817 * @tc.expected: step1. return OK.
1818 */
1819 int localCount = 50;
1820 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1821
1822 /**
1823 * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1824 * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1825 */
1826 int uploadFailId = 0;
1827 g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1828 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1829 uploadFailId++;
1830 if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1831 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1832 return DBStatus::LOCAL_ASSET_NOT_FOUND;
1833 }
1834 return OK;
1835 });
1836
1837 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1838 int beginFailFillNum = 49;
1839 int endFailFillNum = 54;
1840 std::set<int> index;
1841 for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1842 index.insert(i);
1843 }
1844 CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1845 g_virtualCloudDb->ForkUpload(nullptr);
1846 }
1847
1848 /**
1849 * @tc.name: ConsistentFlagTest001
1850 * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1851 * @tc.type: FUNC
1852 * @tc.require:
1853 * @tc.author: bty
1854 */
1855 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level0)
1856 {
1857 /**
1858 * @tc.steps:step1. init data for the different asset, sync and check flag
1859 * @tc.expected: step1. return OK.
1860 */
1861 int localCount = 10; // 10 is num of local
1862 int cloudCount = 20; // 20 is num of cloud
1863 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1864 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1865 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1866 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1867 CheckConsistentCount(db, cloudCount);
1868
1869 /**
1870 * @tc.steps:step2. update local data, sync and check flag
1871 * @tc.expected: step2. return OK.
1872 */
1873 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1874 DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1875 CheckConsistentCount(db, 0L);
1876 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1877 CheckConsistentCount(db, cloudCount);
1878 }
1879
1880 /**
1881 * @tc.name: ConsistentFlagTest002
1882 * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1883 * @tc.type: FUNC
1884 * @tc.require:
1885 * @tc.author: bty
1886 */
1887 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level0)
1888 {
1889 /**
1890 * @tc.steps:step1. init data for the same asset, sync and check flag
1891 * @tc.expected: step1. return OK.
1892 */
1893 int cloudCount = 20; // 20 is num of cloud
1894 InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1895 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1896 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1897 CheckConsistentCount(db, cloudCount);
1898
1899 /**
1900 * @tc.steps:step2. update local data, sync and check flag
1901 * @tc.expected: step2. return OK.
1902 */
1903 int deleteLocalCount = 5;
1904 DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1905 CheckConsistentCount(db, cloudCount - deleteLocalCount);
1906 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1907 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1908 CheckConsistentCount(db, cloudCount);
1909 }
1910
1911 /**
1912 * @tc.name: ConsistentFlagTest003
1913 * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
1914 * @tc.type: FUNC
1915 * @tc.require:
1916 * @tc.author: bty
1917 */
1918 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level0)
1919 {
1920 /**
1921 * @tc.steps:step1. init data
1922 * @tc.expected: step1. return OK.
1923 */
1924 int localCount = 20; // 20 is num of local
1925 int cloudCount = 10; // 10 is num of cloud
1926 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1927 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1928 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1929
1930 /**
1931 * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
1932 * @tc.expected: step2. return OK.
1933 */
1934 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1935 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1936 int index = 0;
1937 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1938 .WillRepeatedly(
__anon32d7e6602402(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1939 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1940 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
1941 if (index == 1) { // 1 is first download
1942 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
1943 }
1944 return DBStatus::OK;
1945 });
1946
1947 /**
1948 * @tc.steps:step3. fork upload, check consistent count
1949 * @tc.expected: step3. return OK.
1950 */
1951 int upIdx = 0;
__anon32d7e6602502(const std::string &tableName, VBucket &extend) 1952 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
1953 LOGD("upload index:%d", ++upIdx);
1954 if (upIdx == 1) { // 1 is first upload
1955 CheckConsistentCount(db, localCount - cloudCount - 1);
1956 }
1957 });
1958
1959 /**
1960 * @tc.steps:step4. fork query, check consistent count
1961 * @tc.expected: step4. return OK.
1962 */
1963 int queryIdx = 0;
__anon32d7e6602602(const std::string &, VBucket &) 1964 g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
1965 LOGD("query index:%d", ++queryIdx);
1966 if (queryIdx == 3) { // 3 is the last query
1967 CheckConsistentCount(db, localCount - 1);
1968 }
1969 });
1970 int count = 0;
__anon32d7e6602702() 1971 g_cloudStoreHook->SetSyncFinishHook([&count]() {
1972 count++;
1973 if (count == 2) { // 2 is compensated sync
1974 g_processCondition.notify_one();
1975 }
1976 });
1977 /**
1978 * @tc.steps:step5. sync, check consistent count
1979 * @tc.expected: step5. return OK.
1980 */
1981 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1982 WaitForSync(count);
1983 CheckConsistentCount(db, localCount);
1984 }
1985
1986 /**
1987 * @tc.name: ConsistentFlagTest004
1988 * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
1989 * @tc.type: FUNC
1990 * @tc.require:
1991 * @tc.author: bty
1992 */
1993 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level0)
1994 {
1995 /**
1996 * @tc.steps:step1. init data
1997 * @tc.expected: step1. return OK.
1998 */
1999 int localCount = 20; // 20 is num of local
2000 int cloudCount = 10; // 10 is num of cloud
2001 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2002 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2003 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2004
2005 /**
2006 * @tc.steps:step2. fork upload, not return error filed of CLOUD_NETWORK_ERROR
2007 * @tc.expected: step2. return OK.
2008 */
2009 int upIdx = 0;
__anon32d7e6602802(const std::string &tableName, VBucket &extend) 2010 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2011 LOGD("upload index:%d", ++upIdx);
2012 if (upIdx == 1) {
2013 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2014 }
2015 });
2016
2017 /**
2018 * @tc.steps:step3. sync, check consistent count
2019 * @tc.expected: step3. return OK.
2020 */
2021 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2022 CheckConsistentCount(db, localCount - 1);
2023
2024 /**
2025 * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2026 * @tc.expected: step4. return OK.
2027 */
2028 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2029 upIdx = 0;
__anon32d7e6602902(const std::string &tableName, VBucket &extend) 2030 g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2031 LOGD("upload index:%d", ++upIdx);
2032 if (upIdx == 1) {
2033 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2034 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2035 }
2036 if (upIdx == 2) {
2037 int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2038 extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2039 }
2040 });
2041
2042 /**
2043 * @tc.steps:step5. sync, check consistent count
2044 * @tc.expected: step5. return OK.
2045 */
2046 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2047 CheckConsistentCount(db, localCount - 2);
2048 }
2049
2050 /**
2051 * @tc.name: ConsistentFlagTest005
2052 * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2053 * @tc.type: FUNC
2054 * @tc.require:
2055 * @tc.author: bty
2056 */
2057 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level0)
2058 {
2059 /**
2060 * @tc.steps:step1. init data
2061 * @tc.expected: step1. return OK.
2062 */
2063 int localCount = 20; // 20 is num of local
2064 int cloudCount = 10; // 10 is num of cloud
2065 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2066 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2067 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2068
2069 /**
2070 * @tc.steps:step2. fork download, update local assets where id=2
2071 * @tc.expected: step2. return OK.
2072 */
2073 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2074 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2075 int index = 0;
2076 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2077 .WillRepeatedly(
2078 [this, &index](const std::string &, const std::string &gid, const Type &,
__anon32d7e6602a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2079 std::map<std::string, Assets> &assets) {
2080 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2081 if (index == 1) { // 1 is first download
2082 std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2083 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2084 }
2085 return DBStatus::OK;
2086 });
2087
2088 /**
2089 * @tc.steps:step3. fork upload, check consistent count
2090 * @tc.expected: step3. return OK.
2091 */
2092 int upIdx = 0;
__anon32d7e6602b02(const std::string &tableName, VBucket &extend) 2093 g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2094 LOGD("upload index:%d", ++upIdx);
2095 if (upIdx == 1) { // 1 is first upload
2096 CheckConsistentCount(db, localCount - cloudCount - 1);
2097 }
2098 });
2099
2100 /**
2101 * @tc.steps:step4. sync, check consistent count
2102 * @tc.expected: step4. return OK.
2103 */
2104 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2105 CheckConsistentCount(db, localCount);
2106 }
2107
2108 /**
2109 * @tc.name: ConsistentFlagTest006
2110 * @tc.desc:
2111 * @tc.type: FUNC
2112 * @tc.require:
2113 * @tc.author: bty
2114 */
2115 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level0)
2116 {
2117 /**
2118 * @tc.steps:step1. init data
2119 * @tc.expected: step1. return OK.
2120 */
2121 int cloudCount = 10; // 10 is num of cloud
2122 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2123 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2124
2125 /**
2126 * @tc.steps:step2. fork download, update local assets where id=2
2127 * @tc.expected: step2. return OK.
2128 */
2129 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2130 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2131 int delCount = 3; // 3 is num of cloud
2132 DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2133 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2134 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2135 int index = 0;
2136 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2137 .WillRepeatedly(
2138 [&index](const std::string &, const std::string &gid, const Type &,
__anon32d7e6602c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2139 std::map<std::string, Assets> &assets) {
2140 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2141 if (index == 1) { // 1 is first download
2142 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2143 }
2144 return DBStatus::OK;
2145 });
2146
2147 /**
2148 * @tc.steps:step3. fork upload, check consistent count
2149 * @tc.expected: step3. return OK.
2150 */
2151 int upIdx = 0;
__anon32d7e6602d02(const std::string &tableName, VBucket &extend) 2152 g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2153 LOGD("upload index:%d", ++upIdx);
2154 if (upIdx == 1) { // 1 is first upload
2155 CheckConsistentCount(db, delCount);
2156 CheckCompensatedCount(db, 0L);
2157 }
2158 });
2159
2160 /**
2161 * @tc.steps:step4. sync, check consistent count
2162 * @tc.expected: step4. return OK.
2163 */
2164 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2165 CheckConsistentCount(db, cloudCount);
2166 }
2167
2168 /**
2169 * @tc.name: SyncDataStatusTest001
2170 * @tc.desc: No need to download asset, check status after sync
2171 * @tc.type: FUNC
2172 * @tc.require:
2173 * @tc.author: bty
2174 */
2175 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level0)
2176 {
2177 DataStatusTest001(false);
2178 }
2179
2180 /**
2181 * @tc.name: SyncDataStatusTest002
2182 * @tc.desc: Need to download asset, check status after sync
2183 * @tc.type: FUNC
2184 * @tc.require:
2185 * @tc.author: bty
2186 */
2187 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level0)
2188 {
2189 DataStatusTest001(true);
2190 }
2191
2192 /**
2193 * @tc.name: SyncDataStatusTest003
2194 * @tc.desc: Lock during download and check status
2195 * @tc.type: FUNC
2196 * @tc.require:
2197 * @tc.author: bty
2198 */
2199 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level0)
2200 {
2201 DataStatusTest003();
2202 }
2203
2204 /**
2205 * @tc.name: SyncDataStatusTest004
2206 * @tc.desc: Lock and delete during download, check status
2207 * @tc.type: FUNC
2208 * @tc.require:
2209 * @tc.author: bty
2210 */
2211 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level0)
2212 {
2213 DataStatusTest004();
2214 }
2215
2216 /**
2217 * @tc.name: SyncDataStatusTest005
2218 * @tc.desc: Lock and update during download, check status
2219 * @tc.type: FUNC
2220 * @tc.require:
2221 * @tc.author: bty
2222 */
2223 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level0)
2224 {
2225 DataStatusTest005();
2226 }
2227
2228 /**
2229 * @tc.name: SyncDataStatusTest006
2230 * @tc.desc: Lock and update and Unlock during download, check status
2231 * @tc.type: FUNC
2232 * @tc.require:
2233 * @tc.author: bty
2234 */
2235 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level0)
2236 {
2237 DataStatusTest006();
2238 }
2239
2240 /**
2241 * @tc.name: SyncDataStatusTest007
2242 * @tc.desc: Download return error, check status
2243 * @tc.type: FUNC
2244 * @tc.require:
2245 * @tc.author: bty
2246 */
2247 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level0)
2248 {
2249 DataStatusTest007();
2250 }
2251
2252 /**
2253 * @tc.name: SyncDataStatusTest008
2254 * @tc.desc: Test upload process when data locked
2255 * @tc.type: FUNC
2256 * @tc.require:
2257 * @tc.author: bty
2258 */
2259 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level0)
2260 {
2261 /**
2262 * @tc.steps:step1. init local data
2263 * @tc.expected: step1. return OK.
2264 */
2265 int localCount = 40;
2266 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2267 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2268 std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2269 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2270
2271 /**
2272 * @tc.steps:step2. sync and check process
2273 * @tc.expected: step2. return OK.
2274 */
2275 g_syncProcess = {};
2276 Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2277 std::vector<TableProcessInfo> expectProcess = {
2278 { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2279 { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2280 };
2281 int index = 0;
2282 CloudSyncConfig config;
2283 config.maxUploadCount = 100; // max upload 100
2284 g_delegate->SetCloudSyncConfig(config);
__anon32d7e6602e02(const std::map<std::string, SyncProcess> &process) 2285 CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2286 g_syncProcess = std::move(process.begin()->second);
2287 ASSERT_LT(index, 2);
2288 for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2289 EXPECT_EQ(info.process, expectProcess[index].process);
2290 EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2291 EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2292 EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2293 EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2294 }
2295 index++;
2296 if (g_syncProcess.process == FINISHED) {
2297 g_processCondition.notify_one();
2298 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2299 }
2300 };
2301 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2302 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2303 }
2304
2305 /**
2306 * @tc.name: DownloadAssetTest001
2307 * @tc.desc: Test the asset status after the share table sync
2308 * @tc.type: FUNC
2309 * @tc.require:
2310 * @tc.author: bty
2311 */
2312 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level0)
2313 {
2314 /**
2315 * @tc.steps:step1. init data and sync
2316 * @tc.expected: step1. return OK.
2317 */
2318 int cloudCount = 10; // 10 is num of cloud
2319 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2320 CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2321
2322 /**
2323 * @tc.steps:step2. check asset status
2324 * @tc.expected: step2. return OK.
2325 */
2326 SqlCondition condition;
2327 condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2328 condition.readOnly = true;
2329 std::vector<VBucket> records;
2330 EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2331 for (const auto &data: records) {
2332 Assets assets;
2333 CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2334 for (const auto &asset: assets) {
2335 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2336 }
2337 }
2338 }
2339
2340 /**
2341 * @tc.name: DownloadAssetTest002
2342 * @tc.desc: Test asset download failed and re download
2343 * @tc.type: FUNC
2344 * @tc.require:
2345 * @tc.author: liaoyonghuang
2346 */
2347 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level0)
2348 {
2349 /**
2350 * @tc.steps:step1. init data
2351 * @tc.expected: step1. return OK.
2352 */
2353 int cloudCount = 10; // 10 is num of cloud
2354 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2355
2356 /**
2357 * @tc.steps:step2. Set asset download status error and sync
2358 * @tc.expected: step2. sync successful but download assets fail.
2359 */
2360 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2361 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2362
2363 /**
2364 * @tc.steps:step3. Set asset download status OK and sync
2365 * @tc.expected: step3. return OK.
2366 */
2367 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2368 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2369
2370 /**
2371 * @tc.steps:step4. Check assets status
2372 * @tc.expected: step4. status is NORMAL.
2373 */
2374 std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2375 sqlite3_stmt *stmt = nullptr;
2376 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2377 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2378 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2379 Type cloudValue;
2380 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2381 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2382 for (const auto &asset : assets) {
2383 EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2384 }
2385 }
2386 int errCode = E_OK;
2387 SQLiteUtils::ResetStatement(stmt, true, errCode);
2388 EXPECT_EQ(errCode, E_OK);
2389 }
2390
2391 /**
2392 * @tc.name: RecordLockFuncTest001
2393 * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2394 * @tc.type: FUNC
2395 * @tc.author: lijun
2396 */
2397 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level0)
2398 {
2399 /**
2400 * @tc.steps:step1. init local data
2401 * @tc.expected: step1. return OK.
2402 */
2403 int localCount = 100;
2404 int cloudCount = 100;
2405 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2406 std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2407 std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2408 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2409 CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2410 CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2411 DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2412
2413 /**
2414 * @tc.steps:step2. init cloud data
2415 * @tc.expected: step2. return OK.
2416 */
2417 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2418 UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2419
2420 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2421 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2422 int index = 0;
2423 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2424 .WillRepeatedly(
__anon32d7e6602f02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2425 [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2426 LOGD("Download GID:%s %d", gid.c_str(), index);
2427 index++;
2428 if (index <= 30) {
2429 return DBStatus::CLOUD_ERROR;
2430 } else {
2431 return DBStatus::OK;
2432 }
2433
2434 });
2435
2436 std::mutex mtx;
2437 std::condition_variable cv;
2438 int queryIdx = 0;
__anon32d7e6603002(const std::string &, VBucket &) 2439 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2440 LOGD("query index:%d", ++queryIdx);
2441 if (queryIdx == 2) { // 2 is compensated sync
2442 mtx.lock();
2443 cv.notify_one();
2444 mtx.unlock();
2445 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2446 }
2447 });
2448 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2449 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2450
2451 {
2452 std::unique_lock<std::mutex> lock(mtx);
2453 cv.wait(lock);
2454 }
2455 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2456
2457 /**
2458 * @tc.steps:step3. check before compensated sync
2459 * @tc.expected: 70-99 is UNLOCKING.
2460 */
2461 CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2462 CheckLockStatus(db, 70, 99, LockStatus::UNLOCKING);
2463
2464 std::this_thread::sleep_for(std::chrono::seconds(3));
2465 /**
2466 * @tc.steps:step4. check after compensated sync
2467 * @tc.expected: all is UNLOCKING.
2468 */
2469 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2470 }
2471
2472 /**
2473 * @tc.name: RecordLockFuncTest002
2474 * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2475 * based on the cloud, and the last synchronization data is based on the device.
2476 * @tc.type: FUNC
2477 * @tc.author: lijun
2478 */
2479 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level0)
2480 {
2481 /**
2482 * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2483 * @tc.expected: step1. return OK.
2484 */
2485 int localCount = 120;
2486 InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2487 std::vector<std::vector<uint8_t>> hashKey;
2488 CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2489 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2490 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2491 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2492 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2493
2494 /**
2495 * @tc.steps:step2. Check the synchronization result and log table status
2496 * @tc.expected: step2.100-109 is LOCK_CHANGE.
2497 */
2498 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2499 CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2500 UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2501 CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2502 CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2503
2504 /**
2505 * @tc.steps:step3. Synchronize and check the lock_change data status
2506 * @tc.expected: step3.100-119 is LOCK_CHANGE.
2507 */
2508 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2509 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2510 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2511
2512 /**
2513 * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2514 * @tc.expected: step4.100-119 is UNLOCKING.
2515 */
2516 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2517 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2518 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2519
2520 /**
2521 * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2522 * @tc.expected: step5.100-119 is LOCK_CHANGE.
2523 */
2524 EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2525 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2526 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2527
2528 /**
2529 * @tc.steps:step6. Synchronize and check the lock_change data status
2530 * @tc.expected: step6.100-119 is LOCK_CHANGE.
2531 */
2532 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2533 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2534 CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2535
2536 /**
2537 * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2538 * @tc.expected: step7.100-119 is UNLOCKING.
2539 */
2540 EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2541 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2542
2543 /**
2544 * @tc.steps:step8. Synchronize data
2545 * @tc.expected: step8.return OK.
2546 */
2547 std::mutex mtx;
2548 std::condition_variable cv;
2549 int queryIdx = 0;
__anon32d7e6603102(const std::string &, VBucket &) 2550 g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2551 LOGD("query index:%d", ++queryIdx);
2552 if (queryIdx == 5) { // 5 is compensated sync
2553 mtx.lock();
2554 cv.notify_one();
2555 mtx.unlock();
2556 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2557 }
2558 });
2559 CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2560 {
2561 std::unique_lock<std::mutex> lock(mtx);
2562 cv.wait(lock);
2563 }
2564
2565 /**
2566 * @tc.steps:step9. check before compensated sync
2567 * @tc.expected: 100-119 is UNLOCKING.
2568 */
2569 CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2570 CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2571
2572 std::this_thread::sleep_for(std::chrono::seconds(3));
2573 /**
2574 * @tc.steps:step10. check after compensated sync
2575 * @tc.expected: all is UNLOCK.
2576 */
2577 CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2578 }
2579 } // namespace
2580 #endif // RELATIONAL_STORE
2581