1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "mock_asset_loader.h"
23 #include "process_system_api_adapter_impl.h"
24 #include "relational_store_client.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "runtime_config.h"
28 #include "sqlite_relational_store.h"
29 #include "sqlite_relational_utils.h"
30 #include "time_helper.h"
31 #include "virtual_asset_loader.h"
32 #include "virtual_cloud_data_translate.h"
33 #include "virtual_cloud_db.h"
34 #include "virtual_communicator_aggregator.h"
35 #include <gtest/gtest.h>
36 #include <iostream>
37 
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 using namespace std;
42 
43 namespace {
44 const string STORE_ID = "Relational_Store_SYNC";
45 const string DB_SUFFIX = ".db";
46 const string ASSETS_TABLE_NAME = "student";
47 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
48 const string NO_PRIMARY_TABLE = "teacher";
49 const string NO_PRIMARY_TABLE_SHARED = "teacher_shared";
50 const string COMPOUND_PRIMARY_TABLE = "worker1";
51 const string COMPOUND_PRIMARY_TABLE_SHARED = "worker1_shared";
52 const string DEVICE_CLOUD = "cloud_dev";
53 const string COL_ID = "id";
54 const string COL_NAME = "name";
55 const string COL_HEIGHT = "height";
56 const string COL_ASSET = "asset";
57 const string COL_ASSETS = "assets";
58 const string COL_AGE = "age";
59 const int64_t SYNC_WAIT_TIME = 600;
60 const int64_t COMPENSATED_SYNC_WAIT_TIME = 5;
61 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
62     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
63     {COL_AGE, TYPE_INDEX<int64_t>}};
64 const std::vector<Field> NO_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>}, {COL_NAME, TYPE_INDEX<std::string>},
65     {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>},
66     {COL_AGE, TYPE_INDEX<int64_t>}};
67 const std::vector<Field> COMPOUND_PRIMARY_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true},
68     {COL_NAME, TYPE_INDEX<std::string>}, {COL_HEIGHT, TYPE_INDEX<double>}, {COL_ASSET, TYPE_INDEX<Asset>},
69     {COL_ASSETS, TYPE_INDEX<Assets>}, {COL_AGE, TYPE_INDEX<int64_t>, true}};
70 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
71     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," +
72     COL_ASSETS + " ASSETS," + COL_AGE + " INT);";
73 const string CREATE_NO_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + NO_PRIMARY_TABLE + "(" + COL_ID +
74     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS +
75     " ASSETS," + COL_AGE + " INT);";
76 const string CREATE_COMPOUND_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + COMPOUND_PRIMARY_TABLE + "(" + COL_ID +
77     " INTEGER," + COL_NAME + " TEXT ," + COL_HEIGHT + " REAL ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS," +
78     COL_AGE + " INT, PRIMARY KEY (id, age));";
79 const Asset ASSET_COPY = {.version = 1,
80     .name = "Phone",
81     .assetId = "0",
82     .subpath = "/local/sync",
83     .uri = "/local/sync",
84     .modifyTime = "123456",
85     .createTime = "",
86     .size = "256",
87     .hash = "ASE"};
88 const Asset ASSET_COPY2 = {.version = 1,
89     .name = "Phone_copy_2",
90     .assetId = "0",
91     .subpath = "/local/sync",
92     .uri = "/local/sync",
93     .modifyTime = "123456",
94     .createTime = "",
95     .size = "256",
96     .hash = "ASE"};
97 const Assets ASSETS_COPY1 = { ASSET_COPY, ASSET_COPY2 };
98 const std::string QUERY_CONSISTENT_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
99     " where flag&0x20=0;";
100 const std::string QUERY_COMPENSATED_SQL = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
101     " where flag&0x10!=0;";
102 
103 string g_storePath;
104 string g_testDir;
105 RelationalStoreObserverUnitTest *g_observer = nullptr;
106 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
107 RelationalStoreDelegate *g_delegate = nullptr;
108 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
109 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
110 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
111 SyncProcess g_syncProcess;
112 std::condition_variable g_processCondition;
113 std::mutex g_processMutex;
114 IRelationalStore *g_store = nullptr;
115 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
116 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
117 
InitDatabase(sqlite3 * & db)118 void InitDatabase(sqlite3 *&db)
119 {
120     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
121     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
122     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_NO_PRIMARY_KEY_TABLE), SQLITE_OK);
123     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_COMPOUND_PRIMARY_KEY_TABLE), SQLITE_OK);
124 }
125 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)126 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
127 {
128     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
129                                      .fields = CLOUD_FIELDS};
130     dataBaseSchema.tables.push_back(assetsTableSchema);
131     assetsTableSchema = {.name = NO_PRIMARY_TABLE, .sharedTableName = NO_PRIMARY_TABLE_SHARED,
132                          .fields = NO_PRIMARY_FIELDS};
133     dataBaseSchema.tables.push_back(assetsTableSchema);
134     assetsTableSchema = {.name = COMPOUND_PRIMARY_TABLE, .sharedTableName = COMPOUND_PRIMARY_TABLE_SHARED,
135                          .fields = COMPOUND_PRIMARY_FIELDS};
136     dataBaseSchema.tables.push_back(assetsTableSchema);
137 }
138 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)139 void GenerateDataRecords(
140     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
141 {
142     for (int64_t i = begin; i < begin + count; i++) {
143         Assets assets;
144         Asset asset = ASSET_COPY;
145         asset.name = ASSET_COPY.name + std::to_string(i);
146         assets.emplace_back(asset);
147         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
148         assets.emplace_back(asset);
149         VBucket data;
150         data.insert_or_assign(COL_ID, i);
151         data.insert_or_assign(COL_NAME, "name" + std::to_string(i));
152         data.insert_or_assign(COL_HEIGHT, 166.0 * i); // 166.0 is random double value
153         data.insert_or_assign(COL_ASSETS, assets);
154         data.insert_or_assign(COL_AGE, 18L + i); // 18 is random int value
155         record.push_back(data);
156 
157         VBucket log;
158         Timestamp now = TimeHelper::GetSysCurrentTime();
159         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
160         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
161         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
162         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
163         extend.push_back(log);
164     }
165 }
166 
InsertLocalData(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull=true)167 void InsertLocalData(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true)
168 {
169     int errCode;
170     std::vector<VBucket> record;
171     std::vector<VBucket> extend;
172     GenerateDataRecords(begin, count, 0, record, extend);
173     const string sql = "insert or replace into " + tableName + " values (?,?,?,?,?,?);";
174     for (VBucket vBucket : record) {
175         sqlite3_stmt *stmt = nullptr;
176         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
177         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
178         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
179         ASSERT_EQ(SQLiteUtils::MapSQLiteErrno(
180             sqlite3_bind_double(stmt, 3, std::get<double>(vBucket[COL_HEIGHT]))), E_OK); // 3 is height
181         if (isAssetNull) {
182             ASSERT_EQ(sqlite3_bind_null(stmt, 4), SQLITE_OK); // 4 is asset
183         } else {
184             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
185             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetBlob, false), E_OK); // 4 is asset
186         }
187         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
188             std::get<Assets>(vBucket[COL_ASSETS]));
189         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 5, assetsBlob, false), E_OK); // 5 is assets
190         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 6, std::get<int64_t>(vBucket[COL_AGE])), E_OK); // 6 is age
191         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
192         SQLiteUtils::ResetStatement(stmt, true, errCode);
193     }
194 }
195 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets)196 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets)
197 {
198     int errCode;
199     std::vector<uint8_t> assetBlob;
200     const string sql = "update " + tableName + " set assets=?;";
201     sqlite3_stmt *stmt = nullptr;
202     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
203     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
204     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
205     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
206     SQLiteUtils::ResetStatement(stmt, true, errCode);
207 }
208 
UpdateLocalData(sqlite3 * & db,const std::string & tableName,const Assets & assets,int32_t begin,int32_t end)209 void UpdateLocalData(sqlite3 *&db, const std::string &tableName, const Assets &assets, int32_t begin, int32_t end)
210 {
211     int errCode;
212     std::vector<uint8_t> assetBlob;
213     const string sql = "update " + tableName + " set assets=? " + "where id>=" + std::to_string(begin) +
214         " and id<=" + std::to_string(end) + ";";
215     sqlite3_stmt *stmt = nullptr;
216     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
217     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
218     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
219     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
220     SQLiteUtils::ResetStatement(stmt, true, errCode);
221 }
222 
DeleteLocalRecord(sqlite3 * & db,int64_t begin,int64_t count,const std::string & tableName)223 void DeleteLocalRecord(sqlite3 *&db, int64_t begin, int64_t count, const std::string &tableName)
224 {
225     ASSERT_NE(db, nullptr);
226     for (int64_t i = begin; i < begin + count; i++) {
227         string sql = "DELETE FROM " + tableName + " WHERE id ='" + std::to_string(i) + "';";
228         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db, sql), E_OK);
229     }
230 }
231 
DeleteCloudDBData(int64_t begin,int64_t count,const std::string & tableName)232 void DeleteCloudDBData(int64_t begin, int64_t count, const std::string &tableName)
233 {
234     for (int64_t i = begin; i < begin + count; i++) {
235         VBucket idMap;
236         idMap.insert_or_assign("#_gid", std::to_string(i));
237         ASSERT_EQ(g_virtualCloudDb->DeleteByGid(tableName, idMap), DBStatus::OK);
238     }
239 }
240 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)241 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
242     const std::string &tableName)
243 {
244     std::this_thread::sleep_for(std::chrono::milliseconds(1));
245     std::vector<VBucket> record;
246     std::vector<VBucket> extend;
247     GenerateDataRecords(begin, count, gidStart, record, extend);
248     for (auto &entry: extend) {
249         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
250     }
251     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
252     std::this_thread::sleep_for(std::chrono::milliseconds(1));
253 }
254 
QueryStatusCallback(void * data,int count,char ** colValue,char ** colName)255 int QueryStatusCallback(void *data, int count, char **colValue, char **colName)
256 {
257     auto status = static_cast<std::vector<int64_t> *>(data);
258     int base = 10;
259     for (int i = 0; i < count; i++) {
260         status->push_back(strtol(colValue[0], nullptr, base));
261     }
262     return 0;
263 }
264 
CheckLockStatus(sqlite3 * db,int startId,int endId,LockStatus lockStatus)265 void CheckLockStatus(sqlite3 *db, int startId, int endId, LockStatus lockStatus)
266 {
267     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
268     std::string sql = "select status from " + logName + " where data_key >=" + std::to_string(startId) +
269         " and data_key <=" +  std::to_string(endId) + ";";
270     std::vector<int64_t> status;
271     char *str = NULL;
272     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryStatusCallback, static_cast<void *>(&status), &str),
273         SQLITE_OK);
274     ASSERT_EQ(static_cast<size_t>(endId - startId + 1), status.size());
275 
276     for (auto stat : status) {
277         ASSERT_EQ(static_cast<int64_t>(lockStatus), stat);
278     }
279 }
280 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)281 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName)
282 {
283     std::vector<VBucket> record;
284     std::vector<VBucket> extend;
285     GenerateDataRecords(begin, count, gidStart, record, extend);
286     if (tableName == ASSETS_TABLE_NAME_SHARED) {
287         for (auto &vBucket: record) {
288             vBucket.insert_or_assign(CloudDbConstant::CLOUD_OWNER, std::string("cloudA"));
289         }
290     }
291     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
292 }
293 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)294 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
295 {
296     std::unique_lock<std::mutex> lock(g_processMutex);
297     bool result = g_processCondition.wait_for(
298         lock, std::chrono::seconds(waitTime), [&syncProcess]() { return syncProcess.process == FINISHED; });
299     ASSERT_EQ(result, true);
300     LOGD("-------------------sync end--------------");
301 }
302 
CallSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,DBStatus errCode=OK)303 void CallSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, DBStatus errCode = OK)
304 {
305     g_syncProcess = {};
306     Query query = Query::Select().FromTable(tableNames);
307     std::vector<SyncProcess> expectProcess;
308     CloudSyncStatusCallback callback = [&errCode](const std::map<std::string, SyncProcess> &process) {
309         ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
310         g_syncProcess = std::move(process.begin()->second);
311         if (g_syncProcess.process == FINISHED) {
312             g_processCondition.notify_one();
313             ASSERT_EQ(g_syncProcess.errCode, errCode);
314         }
315     };
316     CloudSyncOption option;
317     option.devices = {DEVICE_CLOUD};
318     option.mode = mode;
319     option.query = query;
320     option.waitTime = SYNC_WAIT_TIME;
321     option.lockAction = static_cast<LockAction>(0xff); // lock all
322     ASSERT_EQ(g_delegate->Sync(option, callback), dbStatus);
323 
324     if (dbStatus == DBStatus::OK) {
325         WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
326     }
327 }
328 
CheckDownloadForTest001(int index,map<std::string,Assets> & assets)329 void CheckDownloadForTest001(int index, map<std::string, Assets> &assets)
330 {
331     for (auto &item : assets) {
332         for (auto &asset : item.second) {
333             EXPECT_EQ(AssetOperationUtils::EraseBitMask(asset.status), static_cast<uint32_t>(AssetStatus::INSERT));
334             if (index < 4) { // 1-4 is inserted
335                 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::INSERT));
336             }
337             LOGD("asset [name]:%s, [status]:%u, [flag]:%u, [index]:%d", asset.name.c_str(), asset.status, asset.flag,
338                 index);
339         }
340     }
341 }
342 
CheckDownloadFailedForTest002(sqlite3 * & db)343 void CheckDownloadFailedForTest002(sqlite3 *&db)
344 {
345     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME;
346     sqlite3_stmt *stmt = nullptr;
347     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
348     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
349         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
350         Type cloudValue;
351         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
352         std::vector<uint8_t> assetsBlob;
353         Assets assets;
354         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
355         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
356         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
357         for (size_t i = 0; i < assets.size(); ++i) {
358             EXPECT_EQ(assets[i].hash, "");
359             EXPECT_EQ(assets[i].status, AssetStatus::ABNORMAL);
360         }
361     }
362     int errCode;
363     SQLiteUtils::ResetStatement(stmt, true, errCode);
364 }
365 
UpdateAssetsForLocal(sqlite3 * & db,int id,uint32_t status)366 void UpdateAssetsForLocal(sqlite3 *&db, int id, uint32_t status)
367 {
368     Assets assets;
369     Asset asset = ASSET_COPY;
370     asset.name = ASSET_COPY.name + std::to_string(id);
371     asset.status = status;
372     assets.emplace_back(asset);
373     asset.name = ASSET_COPY.name + std::to_string(id) + "_copy";
374     assets.emplace_back(asset);
375     int errCode;
376     std::vector<uint8_t> assetBlob;
377     const string sql = "update " + ASSETS_TABLE_NAME + " set assets=? where id = " + std::to_string(id);
378     sqlite3_stmt *stmt = nullptr;
379     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
380     assetBlob = g_virtualCloudDataTranslate->AssetsToBlob(assets);
381     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
382     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
383     SQLiteUtils::ResetStatement(stmt, true, errCode);
384 }
385 
CheckConsistentCount(sqlite3 * db,int64_t expectCount)386 void CheckConsistentCount(sqlite3 *db, int64_t expectCount)
387 {
388     EXPECT_EQ(sqlite3_exec(db, QUERY_CONSISTENT_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
389         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
390 }
391 
CheckCompensatedCount(sqlite3 * db,int64_t expectCount)392 void CheckCompensatedCount(sqlite3 *db, int64_t expectCount)
393 {
394     EXPECT_EQ(sqlite3_exec(db, QUERY_COMPENSATED_SQL.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
395         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
396 }
397 
CloseDb()398 void CloseDb()
399 {
400     delete g_observer;
401     g_virtualCloudDb = nullptr;
402     if (g_delegate != nullptr) {
403         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
404         g_delegate = nullptr;
405     }
406 }
407 
408 class DistributedDBCloudSyncerDownloadAssetsTest : public testing::Test {
409 public:
410     static void SetUpTestCase(void);
411     static void TearDownTestCase(void);
412     void SetUp();
413     void TearDown();
414 
415 protected:
416     void CheckLocaLAssets(const std::string &tableName, const std::string &expectAssetId,
417         const std::set<int> &failIndex);
418     void CheckLocalAssetIsEmpty(const std::string &tableName);
419     void CheckCursorData(const std::string &tableName, int begin);
420     void WaitForSync(int &syncCount);
421     const RelationalSyncAbleStorage *GetRelationalStore();
422     void InitDataStatusTest(bool needDownload);
423     void DataStatusTest001(bool needDownload);
424     void DataStatusTest003();
425     void DataStatusTest004();
426     void DataStatusTest005();
427     void DataStatusTest006();
428     void DataStatusTest007();
429     sqlite3 *db = nullptr;
430     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
431 };
432 
SetUpTestCase(void)433 void DistributedDBCloudSyncerDownloadAssetsTest::SetUpTestCase(void)
434 {
435     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
436     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
437     LOGI("The test db is:%s", g_storePath.c_str());
438     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
439     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
440 }
441 
TearDownTestCase(void)442 void DistributedDBCloudSyncerDownloadAssetsTest::TearDownTestCase(void) {}
443 
SetUp(void)444 void DistributedDBCloudSyncerDownloadAssetsTest::SetUp(void)
445 {
446     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
447         LOGE("rm test db files error.");
448     }
449     DistributedDBToolsUnitTest::PrintTestCaseInfo();
450     LOGD("Test dir is %s", g_testDir.c_str());
451     db = RelationalTestUtils::CreateDataBase(g_storePath);
452     ASSERT_NE(db, nullptr);
453     InitDatabase(db);
454     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
455     ASSERT_NE(g_observer, nullptr);
456     ASSERT_EQ(
457         g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer}, g_delegate),
458         DBStatus::OK);
459     ASSERT_NE(g_delegate, nullptr);
460     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
461     ASSERT_EQ(g_delegate->CreateDistributedTable(NO_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
462     ASSERT_EQ(g_delegate->CreateDistributedTable(COMPOUND_PRIMARY_TABLE, CLOUD_COOPERATION), DBStatus::OK);
463     g_virtualCloudDb = make_shared<VirtualCloudDb>();
464     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
465     g_syncProcess = {};
466     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
467     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
468     DataBaseSchema dataBaseSchema;
469     GetCloudDbSchema(dataBaseSchema);
470     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
471     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
472     ASSERT_NE(g_cloudStoreHook, nullptr);
473     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
474     ASSERT_TRUE(communicatorAggregator_ != nullptr);
475     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
476 }
477 
TearDown(void)478 void DistributedDBCloudSyncerDownloadAssetsTest::TearDown(void)
479 {
480     RefObject::DecObjRef(g_store);
481     g_virtualCloudDb->ForkUpload(nullptr);
482     CloseDb();
483     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
484     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
485         LOGE("rm test db files error.");
486     }
487     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
488     communicatorAggregator_ = nullptr;
489     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
490 }
491 
CheckLocaLAssets(const std::string & tableName,const std::string & expectAssetId,const std::set<int> & failIndex)492 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocaLAssets(const std::string &tableName,
493     const std::string &expectAssetId, const std::set<int> &failIndex)
494 {
495     std::string sql = "SELECT assets FROM " + tableName + ";";
496     sqlite3_stmt *stmt = nullptr;
497     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
498     int index = 0;
499     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
500         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
501         Type cloudValue;
502         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
503         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
504         for (const auto &asset : assets) {
505             index++;
506             if (failIndex.find(index) != failIndex.end()) {
507                 EXPECT_EQ(asset.assetId, "0");
508             } else {
509                 EXPECT_EQ(asset.assetId, expectAssetId);
510             }
511         }
512     }
513     int errCode = E_OK;
514     SQLiteUtils::ResetStatement(stmt, true, errCode);
515 }
516 
CheckLocalAssetIsEmpty(const std::string & tableName)517 void DistributedDBCloudSyncerDownloadAssetsTest::CheckLocalAssetIsEmpty(const std::string &tableName)
518 {
519     std::string sql = "SELECT asset FROM " + tableName + ";";
520     sqlite3_stmt *stmt = nullptr;
521     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
522     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
523         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
524     }
525     int errCode = E_OK;
526     SQLiteUtils::ResetStatement(stmt, true, errCode);
527 }
528 
CheckCursorData(const std::string & tableName,int begin)529 void DistributedDBCloudSyncerDownloadAssetsTest::CheckCursorData(const std::string &tableName, int begin)
530 {
531     std::string logTableName = DBCommon::GetLogTableName(tableName);
532     std::string sql = "SELECT cursor FROM " + logTableName + ";";
533     sqlite3_stmt *stmt = nullptr;
534     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
535     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
536         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_INTEGER);
537         Type cloudValue;
538         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
539         EXPECT_EQ(std::get<int64_t>(cloudValue), begin);
540         begin++;
541     }
542     int errCode = E_OK;
543     SQLiteUtils::ResetStatement(stmt, true, errCode);
544 }
545 
WaitForSync(int & syncCount)546 void DistributedDBCloudSyncerDownloadAssetsTest::WaitForSync(int &syncCount)
547 {
548     std::unique_lock<std::mutex> lock(g_processMutex);
549     bool result = g_processCondition.wait_for(lock, std::chrono::seconds(COMPENSATED_SYNC_WAIT_TIME),
550         [&syncCount]() { return syncCount == 2; }); // 2 is compensated sync
551     ASSERT_EQ(result, true);
552 }
553 
GetRelationalStore()554 const RelationalSyncAbleStorage* DistributedDBCloudSyncerDownloadAssetsTest::GetRelationalStore()
555 {
556     RelationalDBProperties properties;
557     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
558     int errCode = E_OK;
559     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
560     if (g_store == nullptr) {
561         return nullptr;
562     }
563     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
564 }
565 
InitDataStatusTest(bool needDownload)566 void DistributedDBCloudSyncerDownloadAssetsTest::InitDataStatusTest(bool needDownload)
567 {
568     int cloudCount = 20;
569     int localCount = 10;
570     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
571     if (needDownload) {
572         UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
573     }
574     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
575     std::string sql = "update " + logName + " SET status = 1 where data_key in (1,11);";
576     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
577     sql = "update " + logName + " SET status = 2 where data_key in (2,12);";
578     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
579     sql = "update " + logName + " SET status = 3 where data_key in (3,13);";
580     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
581     std::this_thread::sleep_for(std::chrono::milliseconds(1));
582     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
583     std::this_thread::sleep_for(std::chrono::milliseconds(1));
584     sql = "update " + ASSETS_TABLE_NAME + " set age='666' where id in (4);";
585     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
586     sql = "update " + logName + " SET status = 1 where data_key in (4);";
587     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
588 }
589 
DataStatusTest001(bool needDownload)590 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest001(bool needDownload)
591 {
592     int cloudCount = 20;
593     int count = 0;
594     g_cloudStoreHook->SetSyncFinishHook([&count, cloudCount, this]() {
595         count++;
596         if (count == 1) {
597             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
598                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (11, 4)) or (status = 0)";
599             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
600         }
601         if (count == 2) { // 2 is compensated sync
602             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
603                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
604             CloudDBSyncUtilsTest::CheckCount(db, sql, cloudCount);
605             g_processCondition.notify_one();
606         }
607     });
608     InitDataStatusTest(needDownload);
609     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
610     WaitForSync(count);
611 }
612 
DataStatusTest003()613 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest003()
614 {
615     int count = 0;
616     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
617         count++;
618         if (count == 1) {
619             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
620                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key = 11)";
621             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
622         }
623         if (count == 2) { // 2 is compensated sync
624             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
625                 " (status = 3 and data_key in (0,2,3,12,13) or (status = 0))";
626             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
627             g_processCondition.notify_one();
628         }
629     });
630     int downLoadCount = 0;
631     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
632         downLoadCount++;
633         if (downLoadCount == 1) {
634             std::vector<std::vector<uint8_t>> hashKey;
635             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
636             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
637         }
638     });
639     InitDataStatusTest(true);
640     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
641     WaitForSync(count);
642 }
643 
DataStatusTest004()644 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest004()
645 {
646     int count = 0;
647     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
648         count++;
649         if (count == 1) {
650             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
651                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (-1,11))";
652             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
653         }
654         if (count == 2) { // 2 is compensated sync
655             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
656                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
657             CloudDBSyncUtilsTest::CheckCount(db, sql, 19); // 19 is match count
658             g_processCondition.notify_one();
659         }
660     });
661     int downLoadCount = 0;
662     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
663         downLoadCount++;
664         if (downLoadCount == 1) {
665             std::vector<std::vector<uint8_t>> hashKey;
666             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
667             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
668             std::string sql = "delete from " + ASSETS_TABLE_NAME + " WHERE id=0";
669             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
670         }
671     });
672     int queryIdx = 0;
673     g_virtualCloudDb->ForkQuery([this, &queryIdx](const std::string &, VBucket &) {
674         LOGD("query index:%d", ++queryIdx);
675         if (queryIdx == 4) { // 4 is compensated sync
676             std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
677                 " SET status = 1 where data_key=15;";
678             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
679         }
680     });
681     InitDataStatusTest(true);
682     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
683     WaitForSync(count);
684 }
685 
DataStatusTest005()686 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest005()
687 {
688     int count = 0;
689     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
690         count++;
691         if (count == 1) {
692             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
693                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0 and data_key in (11))";
694             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
695         }
696         if (count == 2) { // 2 is compensated sync
697             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
698                 " (status = 3 and data_key in (0,2,3,12,13)) or (status = 0)";
699             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
700             g_processCondition.notify_one();
701         }
702     });
703     int downLoadCount = 0;
704     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
705         downLoadCount++;
706         if (downLoadCount == 1) {
707             std::vector<std::vector<uint8_t>> hashKey;
708             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
709             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
710             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
711             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
712         }
713     });
714     InitDataStatusTest(true);
715     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
716     WaitForSync(count);
717 }
718 
DataStatusTest006()719 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest006()
720 {
721     int count = 0;
722     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
723         count++;
724         if (count == 1) {
725             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
726                 " (status = 3 and data_key in (2,3,12,13)) or (status = 1 and data_key in (0)) or "
727                 "(status = 0 and data_key in (11))";
728             CloudDBSyncUtilsTest::CheckCount(db, sql, 6); // 6 is match count
729         }
730         if (count == 2) { // 2 is compensated sync
731             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
732                 " (status = 3 and data_key in (2,3,12,13)) or (status = 0)";
733             CloudDBSyncUtilsTest::CheckCount(db, sql, 20); // 20 is match count
734             g_processCondition.notify_one();
735         }
736     });
737     int downLoadCount = 0;
738     g_virtualAssetLoader->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
739         downLoadCount++;
740         if (downLoadCount == 1) {
741             std::vector<std::vector<uint8_t>> hashKey;
742             CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key = 0 ", db, hashKey);
743             EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
744             std::string sql = "update " + ASSETS_TABLE_NAME + " set name='x' WHERE id=0";
745             EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
746             EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
747         }
748     });
749     InitDataStatusTest(true);
750     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
751     WaitForSync(count);
752 }
753 
DataStatusTest007()754 void DistributedDBCloudSyncerDownloadAssetsTest::DataStatusTest007()
755 {
756     int count = 0;
757     g_cloudStoreHook->SetSyncFinishHook([&count, this]() {
758         count++;
759         if (count == 1) {
760             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
761                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
762             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
763         }
764         if (count == 2) { // 2 is compensated sync
765             std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE "
766                 " (status = 3 and data_key in (2,3,13)) or (status = 1 and data_key in (1,11))";
767             CloudDBSyncUtilsTest::CheckCount(db, sql, 5); // 5 is match count
768             g_processCondition.notify_one();
769         }
770     });
771     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
772     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
773     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
774         .WillRepeatedly([](const std::string &, const std::string &gid, const Type &,
775             std::map<std::string, Assets> &assets) {
776             return CLOUD_ERROR;
777         });
778     InitDataStatusTest(true);
779     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
780     WaitForSync(count);
781 }
782 
783 /*
784  * @tc.name: DownloadAssetForDupDataTest001
785  * @tc.desc: Test the download interface call with duplicate data for the same primary key.
786  * @tc.type: FUNC
787  * @tc.require:
788  * @tc.author: liufuchenxing
789  */
790 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest001, TestSize.Level0)
791 {
792     /**
793      * @tc.steps:step1. Mock asset download interface.
794      * @tc.expected: step1. return OK and interface will be called 4 times. delete 1, delete 2, insert 1, insert 2
795      */
796     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
797     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
798     int index = 1;
799     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
800         .Times(2)
801         .WillRepeatedly(
__anon32d7e6601102(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 802             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
803                 LOGD("Download GID:%s", gid.c_str());
804                 CheckDownloadForTest001(index, assets);
805                 index++;
806                 return DBStatus::OK;
807             });
808 
809     /**
810      * @tc.steps:step2. Insert local data [0, 10), sync data
811      * @tc.expected: step2. sync success.
812      */
813     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME);
814     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
815 
816     /**
817      * @tc.steps:step3. delete cloud data [1, 2], then insert cloud data [1,2] with new gid. Finally sync data.
818      * @tc.expected: step3. sync success.
819      */
820     DeleteCloudDBData(1, 2, ASSETS_TABLE_NAME);
821     InsertCloudDBData(1, 2, 10, ASSETS_TABLE_NAME);
822     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
823 }
824 
825 /**
826  * @tc.name: FillAssetId001
827  * @tc.desc: Test if assetId is filled in single primary key table
828  * @tc.type: FUNC
829  * @tc.require:
830  * @tc.author: chenchaohao
831  */
832 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId001, TestSize.Level0)
833 {
834     /**
835      * @tc.steps:step1. local insert assets and sync, check the local assetId.
836      * @tc.expected: step1. return OK.
837      */
838     int localCount = 50;
839     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
840     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
841     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
842 
843     /**
844      * @tc.steps:step2. local update assets and sync ,check the local assetId.
845      * @tc.expected: step2. sync success.
846      */
847     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
848     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
849     CheckLocalAssetIsEmpty(ASSETS_TABLE_NAME);
850     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
851 }
852 
853 /**
854  * @tc.name: FillAssetId002
855  * @tc.desc: Test if assetId is filled in no primary key table
856  * @tc.type: FUNC
857  * @tc.require:
858  * @tc.author: chenchaohao
859  */
860 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId002, TestSize.Level0)
861 {
862     /**
863      * @tc.steps:step1. local insert assets and sync, check the local assetId.
864      * @tc.expected: step1. return OK.
865      */
866     int localCount = 50;
867     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
868     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
869     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
870 
871     /**
872      * @tc.steps:step2. local update assets and sync ,check the local assetId.
873      * @tc.expected: step2. sync success.
874      */
875     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
876     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
877     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
878 }
879 
880 /**
881  * @tc.name: FillAssetId003
882  * @tc.desc: Test if assetId is filled in compound primary key table
883  * @tc.type: FUNC
884  * @tc.require:
885  * @tc.author: chenchaohao
886  */
887 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId003, TestSize.Level0)
888 {
889     /**
890      * @tc.steps:step1. local insert assets and sync, check the local assetId.
891      * @tc.expected: step1. return OK.
892      */
893     int localCount = 50;
894     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
895     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
896     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
897 
898     /**
899      * @tc.steps:step2. local update assets and sync ,check the local assetId.
900      * @tc.expected: step2. sync success.
901      */
902     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
903     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
904     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
905 }
906 
907 /**
908  * @tc.name: FillAssetId004
909  * @tc.desc: Test if assetId is filled in single primary key table when CLOUD_FORCE_PUSH
910  * @tc.type: FUNC
911  * @tc.require:
912  * @tc.author: chenchaohao
913  */
914 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId004, TestSize.Level0)
915 {
916     /**
917      * @tc.steps:step1. local insert assets and sync, check the local assetId.
918      * @tc.expected: step1. return OK.
919      */
920     int localCount = 50;
921     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
922     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
923     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
924 
925     /**
926      * @tc.steps:step2. local update assets and sync ,check the local assetId.
927      * @tc.expected: step2. sync success.
928      */
929     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
930     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
931     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
932 }
933 
934 /**
935  * @tc.name: FillAssetId001
936  * @tc.desc: Test if assetId is filled in no primary key table when CLOUD_FORCE_PUSH
937  * @tc.type: FUNC
938  * @tc.require:
939  * @tc.author: chenchaohao
940  */
941 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId005, TestSize.Level0)
942 {
943     /**
944      * @tc.steps:step1. local insert assets and sync, check the local assetId.
945      * @tc.expected: step1. return OK.
946      */
947     int localCount = 50;
948     InsertLocalData(db, 0, localCount, NO_PRIMARY_TABLE);
949     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
950     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
951 
952     /**
953      * @tc.steps:step2. local update assets and sync ,check the local assetId.
954      * @tc.expected: step2. sync success.
955      */
956     UpdateLocalData(db, NO_PRIMARY_TABLE, ASSETS_COPY1);
957     CallSync({NO_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
958     CheckLocaLAssets(NO_PRIMARY_TABLE, "10", {});
959 }
960 
961 /**
962  * @tc.name: FillAssetId006
963  * @tc.desc: Test if assetId is filled in compound primary key table when CLOUD_FORCE_PUSH
964  * @tc.type: FUNC
965  * @tc.require:
966  * @tc.author: chenchaohao
967  */
968 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId006, TestSize.Level0)
969 {
970     /**
971      * @tc.steps:step1. local insert assets and sync, check the local assetId.
972      * @tc.expected: step1. return OK.
973      */
974     int localCount = 50;
975     InsertLocalData(db, 0, localCount, COMPOUND_PRIMARY_TABLE);
976     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
977     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
978 
979     /**
980      * @tc.steps:step2. local update assets and sync ,check the local assetId.
981      * @tc.expected: step2. sync success.
982      */
983     UpdateLocalData(db, COMPOUND_PRIMARY_TABLE, ASSETS_COPY1);
984     CallSync({COMPOUND_PRIMARY_TABLE}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
985     CheckLocaLAssets(COMPOUND_PRIMARY_TABLE, "10", {});
986 }
987 
988 /**
989  * @tc.name: FillAssetId007
990  * @tc.desc: Test if assetId is filled when extend lack of assets
991  * @tc.type: FUNC
992  * @tc.require:
993  * @tc.author: chenchaohao
994  */
995 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId007, TestSize.Level0)
996 {
997     CloudSyncConfig config;
998     config.maxUploadCount = 200; // max upload 200
999     g_delegate->SetCloudSyncConfig(config);
1000     /**
1001      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1002      * @tc.expected: step1. return OK.
1003      */
1004     int localCount = 50;
1005     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601202(const std::string &tableName, VBucket &extend) 1006     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1007         extend.erase("assets");
1008     });
1009     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1010     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1011 
1012     /**
1013      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1014      * @tc.expected: step2. sync success.
1015      */
1016     int addLocalCount = 10;
1017     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601302(const std::string &tableName, VBucket &extend) 1018     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1019         if (extend.find("assets") != extend.end()) {
1020             for (auto &asset : std::get<Assets>(extend["assets"])) {
1021                 asset.name = "pad";
1022             }
1023         }
1024     });
1025     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1026     int beginFailFillNum = 101;
1027     int endFailFillNum = 120;
1028     std::set<int> index;
1029     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1030         index.insert(i);
1031     }
1032     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1033 
1034     /**
1035      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1036      * @tc.expected: step2. sync success.
1037      */
1038     g_virtualCloudDb->ForkUpload(nullptr);
1039     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1040     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1041 }
1042 
1043 /**
1044  * @tc.name: FillAssetId008
1045  * @tc.desc: Test if assetId is filled when extend lack of assetId
1046  * @tc.type: FUNC
1047  * @tc.require:
1048  * @tc.author: chenchaohao
1049  */
1050 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId008, TestSize.Level0)
1051 {
1052     /**
1053      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1054      * @tc.expected: step1. return OK.
1055      */
1056     int localCount = 50;
1057     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601402(const std::string &tableName, VBucket &extend) 1058     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1059         if (extend.find("assets") != extend.end()) {
1060             for (auto &asset : std::get<Assets>(extend["assets"])) {
1061                 asset.assetId = "";
1062             }
1063         }
1064     });
1065     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1066     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1067 
1068     /**
1069      * @tc.steps:step2. local update assets and sync ,check the local assetId.
1070      * @tc.expected: step2. sync success.
1071      */
1072     g_virtualCloudDb->ForkUpload(nullptr);
1073     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1074     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1075 }
1076 
1077 /**
1078  * @tc.name: FillAssetId009
1079  * @tc.desc: Test if assetId is filled when extend exists useless assets
1080  * @tc.type: FUNC
1081  * @tc.require:
1082  * @tc.author: chenchaohao
1083  */
1084 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId009, TestSize.Level0)
1085 {
1086     /**
1087      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1088      * @tc.expected: step1. return OK.
1089      */
1090     int localCount = 50;
1091     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601502(const std::string &tableName, VBucket &extend) 1092     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1093         if (extend.find("assets") != extend.end()) {
1094             Asset asset = ASSET_COPY2;
1095             Assets &assets = std::get<Assets>(extend["assets"]);
1096             assets.push_back(asset);
1097         }
1098     });
1099     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1100     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1101 }
1102 
1103 /**
1104  * @tc.name: FillAssetId010
1105  * @tc.desc: Test if assetId is filled when some success and some fail
1106  * @tc.type: FUNC
1107  * @tc.require:
1108  * @tc.author: chenchaohao
1109  */
1110 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId010, TestSize.Level0)
1111 {
1112     /**
1113      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1114      * @tc.expected: step1. return OK.
1115      */
1116     int localCount = 30;
1117     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1118     g_virtualCloudDb->SetInsertFailed(1);
1119     std::atomic<int> count = 0;
__anon32d7e6601602(const std::string &tableName, VBucket &extend) 1120     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1121         if (extend.find("assets") != extend.end() && count == 0) {
1122             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1123             count++;
1124         }
1125     });
1126     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1127     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1128 }
1129 
1130 /**
1131  * @tc.name: FillAssetId011
1132  * @tc.desc: Test if assetId is null when removedevicedata in FLAG_ONLY
1133  * @tc.type: FUNC
1134  * @tc.require:
1135  * @tc.author: chenchaohao
1136  */
1137 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId011, TestSize.Level0)
1138 {
1139     /**
1140      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1141      * @tc.expected: step1. return OK.
1142      */
1143     int localCount = 50;
1144     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1145     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1146     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1147 
1148     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1149     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1150     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1151     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1152 }
1153 
1154 /**
1155  * @tc.name: FillAssetId012
1156  * @tc.desc: Test if assetid is filled when extend size is not equal to record size
1157  * @tc.type: FUNC
1158  * @tc.require:
1159  * @tc.author: chenchaohao
1160  */
1161 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId012, TestSize.Level0)
1162 {
1163     /**
1164      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1165      * @tc.expected: step1. return OK.
1166      */
1167     int localCount = 50;
1168     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1169     std::atomic<int> count = 1;
1170     g_virtualCloudDb->SetClearExtend(count);
1171     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1172     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1173 
1174     /**
1175      * @tc.steps:step2. set extend size normal then sync, check the asseid.
1176      * @tc.expected: step2. return OK.
1177      */
1178     g_virtualCloudDb->SetClearExtend(0);
1179     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1180     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1181 
1182     /**
1183      * @tc.steps:step3. set extend size large then sync, check the asseid.
1184      * @tc.expected: step3. return OK.
1185      */
1186     count = -1; // -1 means extend push a empty vBucket
1187     g_virtualCloudDb->SetClearExtend(count);
1188     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1189     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1190 }
1191 
1192 /**
1193  * @tc.name: FillAssetId013
1194  * @tc.desc: Test fill assetId and removedevicedata when data is delete
1195  * @tc.type: FUNC
1196  * @tc.require:
1197  * @tc.author: chenchaohao
1198  */
1199 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId013, TestSize.Level0)
1200 {
1201     /**
1202      * @tc.steps:step1. local insert data and sync, then delete local data and insert new data
1203      * @tc.expected: step1. return OK.
1204      */
1205     int localCount = 20;
1206     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1207     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1208     int deleteLocalCount = 10;
1209     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1210     int addLocalCount = 30;
1211     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1212     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1213 
1214     /**
1215      * @tc.steps:step2. RemoveDeviceData.
1216      * @tc.expected: step2. return OK.
1217      */
1218     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1219     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1220 }
1221 
1222 /**
1223  * @tc.name: FillAssetId014
1224  * @tc.desc: Test if asset status is reset when removedevicedata in FLAG_ONLY
1225  * @tc.type: FUNC
1226  * @tc.require:
1227  * @tc.author: bty
1228  */
1229 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId014, TestSize.Level0)
1230 {
1231     /**
1232      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1233      * @tc.expected: step1. return OK.
1234      */
1235     int localCount = 50;
1236     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1237     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1238     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1239 
1240     /**
1241      * @tc.steps:step2. RemoveDeviceData
1242      * @tc.expected: step2. return OK.
1243      */
1244     Assets assets;
1245     std::vector<AssetStatus> statusVec = {
1246         AssetStatus::INSERT, AssetStatus::UPDATE, AssetStatus::DELETE, AssetStatus::NORMAL,
1247         AssetStatus::ABNORMAL, AssetStatus::DOWNLOADING, AssetStatus::DOWNLOAD_WITH_NULL
1248     };
1249     for (auto &status : statusVec) {
1250         Asset temp = ASSET_COPY;
1251         temp.name += std::to_string(status);
1252         temp.status = status | AssetStatus::UPLOADING;
1253         assets.emplace_back(temp);
1254     }
1255     UpdateLocalData(db, ASSETS_TABLE_NAME, assets);
1256     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1257     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1258 
1259     /**
1260      * @tc.steps:step3. check status
1261      * @tc.expected: step3. return OK.
1262      */
1263     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
1264     sqlite3_stmt *stmt = nullptr;
1265     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1266     int index = 0;
1267     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1268         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1269         Type cloudValue;
1270         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1271         Assets newAssets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
1272         for (const auto &ast : newAssets) {
1273             EXPECT_EQ(ast.status, statusVec[index++ % statusVec.size()]);
1274         }
1275     }
1276     int errCode = E_OK;
1277     SQLiteUtils::ResetStatement(stmt, true, errCode);
1278 }
1279 
1280 /**
1281  * @tc.name: FillAssetId015
1282  * @tc.desc: Test if fill assetId when upload return cloud network error
1283  * @tc.type: FUNC
1284  * @tc.require:
1285  * @tc.author: chenchaohao
1286  */
1287 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId015, TestSize.Level0)
1288 {
1289     /**
1290      * @tc.steps:step1. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync
1291      * @tc.expected: step1. return OK, errcode is CLOUD_NETWORK_ERROR.
1292      */
1293     int localCount = 20;
1294     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1295     g_virtualCloudDb->SetCloudNetworkError(true);
1296     std::atomic<int> count = 0;
__anon32d7e6601702(const std::string &tableName, VBucket &extend) 1297     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1298         if (extend.find("assets") != extend.end() && count == 0) {
1299             extend["#_error"] = static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR);
1300             count++;
1301         }
1302     });
1303     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_NETWORK_ERROR);
1304     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", { 1, 2 }); // 1st, 2nd asset do not fill
1305     g_virtualCloudDb->SetCloudNetworkError(false);
1306     g_virtualCloudDb->ForkUpload(nullptr);
1307     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1308     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1309 
1310     /**
1311      * @tc.steps:step2. local insert data and fork batchinsert return CLOUD_NETWORK_ERROR, then sync.
1312      * @tc.expected: step2. return OK, errcode is CLOUD_ERROR.
1313      */
1314     int addLocalCount = 10;
1315     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
1316     std::atomic<int> num = 0;
__anon32d7e6601802(const std::string &tableName, VBucket &extend) 1317     g_virtualCloudDb->ForkUpload([&num](const std::string &tableName, VBucket &extend) {
1318         if (extend.find("assets") != extend.end() && num == 0) {
1319             for (auto &asset : std::get<Assets>(extend["assets"])) {
1320                 asset.name = "pad";
1321                 break;
1322             }
1323             num++;
1324         }
1325     });
1326     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1327     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {41}); // // 41th asset do not fill
1328 }
1329 
1330 /**
1331  * @tc.name: FillAssetId016
1332  * @tc.desc: Test fill assetId and removedevicedata when last data is delete
1333  * @tc.type: FUNC
1334  * @tc.require:
1335  * @tc.author: chenchaohao
1336  */
1337 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId016, TestSize.Level0)
1338 {
1339     /**
1340      * @tc.steps:step1. local insert data and sync, then delete last local data
1341      * @tc.expected: step1. return OK.
1342      */
1343     int localCount = 20;
1344     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1345     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1346     int deletLocalCount = 10;
1347     DeleteLocalRecord(db, deletLocalCount, deletLocalCount, ASSETS_TABLE_NAME);
1348     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1349 
1350     /**
1351      * @tc.steps:step2. RemoveDeviceData.
1352      * @tc.expected: step2. return OK.
1353      */
1354     g_delegate->RemoveDeviceData("", FLAG_ONLY);
1355     CheckLocaLAssets(ASSETS_TABLE_NAME, "", {});
1356 }
1357 
1358 /**
1359  * @tc.name: FillAssetId017
1360  * @tc.desc: Test cursor when download not change
1361  * @tc.type: FUNC
1362  * @tc.require:
1363  * @tc.author: chenchaohao
1364  */
1365 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId017, TestSize.Level0)
1366 {
1367     /**
1368      * @tc.steps:step1. local insert data and sync,check cursor.
1369      * @tc.expected: step1. return OK.
1370      */
1371     int localCount = 20;
1372     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1373     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1374     CheckCursorData(ASSETS_TABLE_NAME, 1);
1375 
1376     /**
1377      * @tc.steps:step2. sync again and optype is not change, check cursor.
1378      * @tc.expected: step2. return OK.
1379      */
1380     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1381     CheckCursorData(ASSETS_TABLE_NAME, 1);
1382 }
1383 
1384 /**
1385  * @tc.name: FillAssetId018
1386  * @tc.desc: Test if assetId is filled when contains "#_error"
1387  * @tc.type: FUNC
1388  * @tc.require:
1389  * @tc.author: zhaoliang
1390  */
1391 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId018, TestSize.Level0)
1392 {
1393     /**
1394      * @tc.steps:step1. local insert assets and sync, check the local assetId.
1395      * @tc.expected: step1. return OK.
1396      */
1397     int localCount = 30;
1398     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1399     std::atomic<int> count = 0;
__anon32d7e6601902(const std::string &tableName, VBucket &extend) 1400     g_virtualCloudDb->ForkUpload([&count](const std::string &tableName, VBucket &extend) {
1401         if (extend.find("assets") != extend.end() && count == 0) {
1402             extend["#_error"] = std::string("test");
1403             count++;
1404         }
1405     });
1406     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1407     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1408 }
1409 
1410 /**
1411  * @tc.name: DownloadAssetForDupDataTest002
1412  * @tc.desc: Test download failed
1413  * @tc.type: FUNC
1414  * @tc.require:
1415  * @tc.author: bty
1416  */
1417 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest002, TestSize.Level0)
1418 {
1419     /**
1420      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1421      * @tc.expected: step1. return OK
1422      */
1423     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1424     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1425     int index = 0;
1426     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1427         .WillRepeatedly(
__anon32d7e6601a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1428             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1429                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1430                 return DBStatus::CLOUD_ERROR;
1431             });
1432 
1433     /**
1434      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1435      * @tc.expected: step2. sync success.
1436      */
1437     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1438     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1439 
1440     /**
1441      * @tc.steps:step3. check if the hash of assets in db is empty
1442      * @tc.expected: step3. OK
1443      */
1444     CheckDownloadFailedForTest002(db);
1445 }
1446 
1447 /**
1448  * @tc.name: DownloadAssetForDupDataTest003
1449  * @tc.desc: Test download failed and flag was modified
1450  * @tc.type: FUNC
1451  * @tc.require:
1452  * @tc.author: bty
1453  */
1454 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest003, TestSize.Level0)
1455 {
1456     /**
1457      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1458      * @tc.expected: step1. return OK
1459      */
1460     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1461     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1462     int index = 0;
1463     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1464         .WillRepeatedly(
__anon32d7e6601b02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1465             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1466                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1467                 for (auto &item : assets) {
1468                     for (auto &asset : item.second) {
1469                         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
1470                     }
1471                 }
1472                 return DBStatus::CLOUD_ERROR;
1473             });
1474 
1475     /**
1476      * @tc.steps:step2. Insert cloud data [0, 10), sync data
1477      * @tc.expected: step2. sync success.
1478      */
1479     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME);
1480     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1481 
1482     /**
1483      * @tc.steps:step3. check if the hash of assets in db is empty
1484      * @tc.expected: step3. OK
1485      */
1486     CheckDownloadFailedForTest002(db);
1487 }
1488 
1489 /**
1490  * @tc.name: DownloadAssetForDupDataTest004
1491  * @tc.desc: test sync with deleted assets
1492  * @tc.type: FUNC
1493  * @tc.require:
1494  * @tc.author: bty
1495  */
1496 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest004, TestSize.Level0)
1497 {
1498     /**
1499      * @tc.steps:step1. Mock asset download return CLOUD_ERROR.
1500      * @tc.expected: step1. return OK
1501      */
1502     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1503     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1504     int index = 0;
1505     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1506         .WillRepeatedly(
__anon32d7e6601c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1507             [&](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1508                 LOGD("Download GID:%s, index:%d", gid.c_str(), ++index);
1509                 return DBStatus::OK;
1510             });
1511 
1512     /**
1513      * @tc.steps:step2. insert local data, update assets status to delete, then insert cloud data
1514      * @tc.expected: step2. return OK
1515      */
1516     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1517     UpdateAssetsForLocal(db, 1, AssetStatus::DELETE); // 1 is id
1518     UpdateAssetsForLocal(db, 2, AssetStatus::DELETE | AssetStatus::UPLOADING); // 2 is id
1519     InsertCloudDBData(0, 10, 0, ASSETS_TABLE_NAME); // 10 is num
1520 
1521     /**
1522      * @tc.steps:step3. sync, check download num
1523      * @tc.expected: step3. return OK
1524      */
1525     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1526     EXPECT_GE(index, 2); // 2 is download num
1527 }
1528 
1529 /**
1530  * @tc.name: DownloadAssetForDupDataTest005
1531  * @tc.desc: test DOWNLOADING status of asset after uploading
1532  * @tc.type: FUNC
1533  * @tc.require:
1534  * @tc.author: bty
1535  */
1536 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetForDupDataTest005, TestSize.Level0)
1537 {
1538     /**
1539      * @tc.steps:step1. init data and sync
1540      * @tc.expected: step1. return OK
1541      */
1542     InsertLocalData(db, 0, 10, ASSETS_TABLE_NAME); // 10 is num
1543     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1544     UpdateAssetsForLocal(db, 6,  AssetStatus::DOWNLOADING); // 6 is id
1545     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1546 
1547     /**
1548      * @tc.steps:step2. check asset status
1549      * @tc.expected: step2. return OK
1550      */
1551     std::string sql = "SELECT assets from " + ASSETS_TABLE_NAME + " where id = 6;";
1552     sqlite3_stmt *stmt = nullptr;
1553     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
1554     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1555         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
1556         Type cloudValue;
1557         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
1558         std::vector<uint8_t> assetsBlob;
1559         Assets assets;
1560         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
1561         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
1562         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
1563         for (size_t i = 0; i < assets.size(); ++i) {
1564             EXPECT_EQ(assets[i].hash, ASSET_COPY.hash);
1565             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
1566         }
1567     }
1568     int errCode;
1569     SQLiteUtils::ResetStatement(stmt, true, errCode);
1570 }
1571 
1572 /**
1573  * @tc.name: FillAssetId019
1574  * @tc.desc: Test the stability of cleaning asset id
1575  * @tc.type: FUNC
1576  * @tc.require:
1577  * @tc.author: bty
1578  */
1579 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId019, TestSize.Level0)
1580 {
1581     /**
1582      * @tc.steps:step1. local insert assets and sync.
1583      * @tc.expected: step1. return OK.
1584      */
1585     int localCount = 20;
1586     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1587     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1588 
1589     /**
1590      * @tc.steps:step2. construct multiple abnormal data_key, then RemoveDeviceData.
1591      * @tc.expected: step2. return OK.
1592      */
1593     std::string sql = "update " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME)
1594         + " set data_key='999' where data_key>'10';";
1595     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
1596     EXPECT_EQ(g_delegate->RemoveDeviceData("", FLAG_ONLY), OK);
1597 }
1598 
1599 /**
1600  * @tc.name: FillAssetId020
1601  * @tc.desc: Test if assetId is filled when extend(lack of assets/assetId is empty/modify asset info)
1602  * @tc.type: FUNC
1603  * @tc.require:
1604  * @tc.author: zhangtao
1605  */
1606 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId020, TestSize.Level0)
1607 {
1608     CloudSyncConfig config;
1609     config.maxUploadCount = 200; // max upload 200
1610     g_delegate->SetCloudSyncConfig(config);
1611 
1612     /**
1613      * @tc.steps:step1. local insert assets and erase assets extends
1614      * @tc.expected: step1. return OK.
1615      */
1616     int localCount = 50;
1617     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
__anon32d7e6601d02(const std::string &tableName, VBucket &extend) 1618     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1619         extend.erase("assets");
1620     });
1621     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1622     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1623 
1624     /**
1625      * @tc.steps:step2. local insert assets and modify assetId to empty
1626      * @tc.expected: step2. return OK.
1627      */
1628     int addLocalCount = 10;
1629     InsertLocalData(db, localCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601e02(const std::string &tableName, VBucket &extend) 1630     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1631         if (extend.find("assets") != extend.end()) {
1632             for (auto &asset : std::get<Assets>(extend["assets"])) {
1633                 asset.assetId = "";
1634             }
1635         }
1636     });
1637     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1638     int beginFailFillNum = 101;
1639     int endFailFillNum = 120;
1640     std::set<int> index;
1641     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1642         index.insert(i);
1643     }
1644     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1645 
1646     /**
1647      * @tc.steps:step3. local insert assets and modify assetId info such as asset.name
1648      * @tc.expected: step3. return OK.
1649      */
1650     InsertLocalData(db, localCount + addLocalCount, addLocalCount, ASSETS_TABLE_NAME);
__anon32d7e6601f02(const std::string &tableName, VBucket &extend) 1651     g_virtualCloudDb->ForkUpload([](const std::string &tableName, VBucket &extend) {
1652         if (extend.find("assets") != extend.end()) {
1653             for (auto &asset : std::get<Assets>(extend["assets"])) {
1654                 asset.name = "mod_pat";
1655             }
1656         }
1657     });
1658     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1659     beginFailFillNum = 121;
1660     endFailFillNum = 140;
1661     std::set<int> newIndex;
1662     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1663         newIndex.insert(i);
1664     }
1665     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", newIndex);
1666 
1667     /**
1668      * @tc.steps:step4. local update assets and sync, check the local assetId.
1669      * @tc.expected: step4. sync success.
1670      */
1671     g_virtualCloudDb->ForkUpload(nullptr);
1672     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1673     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1674 }
1675 
1676 /**
1677  * @tc.name: FillAssetId021
1678  * @tc.desc: Test if local assets missing, one records's assets missing will not mark the whole sync progress failure
1679  * @tc.type: FUNC
1680  * @tc.require:
1681  * @tc.author: zhangtao
1682  */
1683 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId021, TestSize.Level0)
1684 {
1685     CloudSyncConfig config;
1686     config.maxUploadCount = 200; // max upload 200
1687     g_delegate->SetCloudSyncConfig(config);
1688 
1689     /**
1690      * @tc.steps:step1. local insert assets and erase assets extends
1691      * @tc.expected: step1. return OK.
1692      */
1693     int localCount = 50;
1694     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1695 
1696     /**
1697      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1698      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1699      */
1700     int uploadFailId = 0;
1701     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602002(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1702         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1703         uploadFailId++;
1704         if (uploadFailId == 25) { // 25 is the middle record
1705             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1706             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1707         }
1708         return OK;
1709     });
1710 
1711     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1712     int beginFailFillNum = 49;
1713     int endFailFillNum = 50;
1714     std::set<int> index;
1715     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1716         index.insert(i);
1717     }
1718     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1719     g_virtualCloudDb->ForkUpload(nullptr);
1720 }
1721 
1722 /**
1723  * @tc.name: FillAssetId023
1724  * @tc.desc: Test if BatchUpdate with local assets missing
1725  * @tc.type: FUNC
1726  * @tc.require:
1727  * @tc.author: zhangtao
1728  */
1729 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId023, TestSize.Level0)
1730 {
1731     /**
1732      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1733      * @tc.expected: step1. return OK.
1734      */
1735     int localCount = 50;
1736     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1737     std::atomic<int> count = 1;
1738     g_virtualCloudDb->SetClearExtend(count);
1739     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1740     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1741 
1742     /**
1743      * @tc.steps:step2. set extend size normal and BatchUpdate with local assets missing then sync, check the asseid.
1744      * @tc.expected: step2. return OK.
1745      */
1746     g_virtualCloudDb->SetClearExtend(0);
1747 
1748     int uploadFailId = 0;
1749     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602102(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1750         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1751         uploadFailId++;
1752         if (uploadFailId == 25) { // 25 is the middle record
1753             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1754             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1755         }
1756         return OK;
1757     });
1758     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1759     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1760 }
1761 
1762 /**
1763  * @tc.name: FillAssetId024
1764  * @tc.desc: Test if BatchUpdate with multiple local assets missing
1765  * @tc.type: FUNC
1766  * @tc.require:
1767  * @tc.author: zhangtao
1768  */
1769 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId024, TestSize.Level0)
1770 {
1771     /**
1772      * @tc.steps:step1. set extend size missing then sync, check the asseid.
1773      * @tc.expected: step1. return OK.
1774      */
1775     int localCount = 50;
1776     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1777     std::atomic<int> count = 1;
1778     g_virtualCloudDb->SetClearExtend(count);
1779     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
1780     CheckLocaLAssets(ASSETS_TABLE_NAME, "0", {});
1781 
1782     /**
1783      * @tc.steps:step2. set extend size normal and BatchUpdate with 3 local assets missing then sync, check the asseid.
1784      * @tc.expected: step2. return OK.
1785      */
1786     g_virtualCloudDb->SetClearExtend(0);
1787 
1788     int uploadFailId = 0;
1789     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602202(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1790         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1791         uploadFailId++;
1792         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1793             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1794             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1795         }
1796         return OK;
1797     });
1798     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1799     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", {});
1800 }
1801 
1802 /**
1803  * @tc.name: FillAssetId022
1804  * @tc.desc: Test if local assets missing, many records's assets missing will not mark the whole sync progress failure
1805  * @tc.type: FUNC
1806  * @tc.require:
1807  * @tc.author: zhangtao
1808  */
1809 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, FillAssetId022, TestSize.Level0)
1810 {
1811     CloudSyncConfig config;
1812     config.maxUploadCount = 200; // max upload 200
1813     g_delegate->SetCloudSyncConfig(config);
1814 
1815     /**
1816      * @tc.steps:step1. local insert assets and erase assets extends
1817      * @tc.expected: step1. return OK.
1818      */
1819     int localCount = 50;
1820     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME);
1821 
1822     /**
1823      * @tc.steps:step2. ForkInsertConflict, make one record assets missing during batch insert
1824      * @tc.expected: step2. SyncProgress return OK. One record's assets missing will not block other progress.
1825      */
1826     int uploadFailId = 0;
1827     g_virtualCloudDb->ForkInsertConflict([&uploadFailId](const std::string &tableName, VBucket &extend, VBucket &record,
__anon32d7e6602302(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 1828         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
1829         uploadFailId++;
1830         if (uploadFailId >= 25 && uploadFailId <= 27) { // 25-27 is the middle record
1831             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::LOCAL_ASSET_NOT_FOUND);
1832             return DBStatus::LOCAL_ASSET_NOT_FOUND;
1833         }
1834         return OK;
1835     });
1836 
1837     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::OK);
1838     int beginFailFillNum = 49;
1839     int endFailFillNum = 54;
1840     std::set<int> index;
1841     for (int i = beginFailFillNum; i <= endFailFillNum; i++) {
1842         index.insert(i);
1843     }
1844     CheckLocaLAssets(ASSETS_TABLE_NAME, "10", index);
1845     g_virtualCloudDb->ForkUpload(nullptr);
1846 }
1847 
1848 /**
1849  * @tc.name: ConsistentFlagTest001
1850  * @tc.desc:Assets are the different, check the 0x20 bit of flag after sync
1851  * @tc.type: FUNC
1852  * @tc.require:
1853  * @tc.author: bty
1854  */
1855 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest001, TestSize.Level0)
1856 {
1857     /**
1858      * @tc.steps:step1. init data for the different asset, sync and check flag
1859      * @tc.expected: step1. return OK.
1860      */
1861     int localCount = 10; // 10 is num of local
1862     int cloudCount = 20; // 20 is num of cloud
1863     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1864     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1865     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1866     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1867     CheckConsistentCount(db, cloudCount);
1868 
1869     /**
1870      * @tc.steps:step2. update local data, sync and check flag
1871      * @tc.expected: step2. return OK.
1872      */
1873     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1874     DeleteCloudDBData(1, 1, ASSETS_TABLE_NAME);
1875     CheckConsistentCount(db, 0L);
1876     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1877     CheckConsistentCount(db, cloudCount);
1878 }
1879 
1880 /**
1881  * @tc.name: ConsistentFlagTest002
1882  * @tc.desc: Assets are the same, check the 0x20 bit of flag after sync
1883  * @tc.type: FUNC
1884  * @tc.require:
1885  * @tc.author: bty
1886  */
1887 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest002, TestSize.Level0)
1888 {
1889     /**
1890      * @tc.steps:step1. init data for the same asset, sync and check flag
1891      * @tc.expected: step1. return OK.
1892      */
1893     int cloudCount = 20; // 20 is num of cloud
1894     InsertLocalData(db, 0, cloudCount, ASSETS_TABLE_NAME, true);
1895     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1896     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1897     CheckConsistentCount(db, cloudCount);
1898 
1899     /**
1900      * @tc.steps:step2. update local data, sync and check flag
1901      * @tc.expected: step2. return OK.
1902      */
1903     int deleteLocalCount = 5;
1904     DeleteLocalRecord(db, 0, deleteLocalCount, ASSETS_TABLE_NAME);
1905     CheckConsistentCount(db, cloudCount - deleteLocalCount);
1906     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1907     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1908     CheckConsistentCount(db, cloudCount);
1909 }
1910 
1911 /**
1912  * @tc.name: ConsistentFlagTest003
1913  * @tc.desc: Download returns a conflict, check the 0x20 bit of flag after sync
1914  * @tc.type: FUNC
1915  * @tc.require:
1916  * @tc.author: bty
1917  */
1918 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest003, TestSize.Level0)
1919 {
1920     /**
1921      * @tc.steps:step1. init data
1922      * @tc.expected: step1. return OK.
1923      */
1924     int localCount = 20; // 20 is num of local
1925     int cloudCount = 10; // 10 is num of cloud
1926     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
1927     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
1928     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
1929 
1930     /**
1931      * @tc.steps:step2. fork download, return CLOUD_RECORD_EXIST_CONFLICT once
1932      * @tc.expected: step2. return OK.
1933      */
1934     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1935     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1936     int index = 0;
1937     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1938         .WillRepeatedly(
__anon32d7e6602402(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 1939             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
1940                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
1941                 if (index == 1) { // 1 is first download
1942                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
1943                 }
1944                 return DBStatus::OK;
1945             });
1946 
1947     /**
1948      * @tc.steps:step3. fork upload, check consistent count
1949      * @tc.expected: step3. return OK.
1950      */
1951     int upIdx = 0;
__anon32d7e6602502(const std::string &tableName, VBucket &extend) 1952     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
1953         LOGD("upload index:%d", ++upIdx);
1954         if (upIdx == 1) { // 1 is first upload
1955             CheckConsistentCount(db, localCount - cloudCount - 1);
1956         }
1957     });
1958 
1959     /**
1960      * @tc.steps:step4. fork query, check consistent count
1961      * @tc.expected: step4. return OK.
1962      */
1963     int queryIdx = 0;
__anon32d7e6602602(const std::string &, VBucket &) 1964     g_virtualCloudDb->ForkQuery([this, localCount, &queryIdx](const std::string &, VBucket &) {
1965         LOGD("query index:%d", ++queryIdx);
1966         if (queryIdx == 3) { // 3 is the last query
1967             CheckConsistentCount(db, localCount - 1);
1968         }
1969     });
1970     int count = 0;
__anon32d7e6602702() 1971     g_cloudStoreHook->SetSyncFinishHook([&count]() {
1972         count++;
1973         if (count == 2) { // 2 is compensated sync
1974             g_processCondition.notify_one();
1975         }
1976     });
1977     /**
1978      * @tc.steps:step5. sync, check consistent count
1979      * @tc.expected: step5. return OK.
1980      */
1981     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1982     WaitForSync(count);
1983     CheckConsistentCount(db, localCount);
1984 }
1985 
1986 /**
1987  * @tc.name: ConsistentFlagTest004
1988  * @tc.desc: Upload returns error, check the 0x20 bit of flag after sync
1989  * @tc.type: FUNC
1990  * @tc.require:
1991  * @tc.author: bty
1992  */
1993 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest004, TestSize.Level0)
1994 {
1995     /**
1996      * @tc.steps:step1. init data
1997      * @tc.expected: step1. return OK.
1998      */
1999     int localCount = 20; // 20 is num of local
2000     int cloudCount = 10; // 10 is num of cloud
2001     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2002     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2003     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2004 
2005     /**
2006      * @tc.steps:step2. fork upload, not return error filed of CLOUD_NETWORK_ERROR
2007      * @tc.expected: step2. return OK.
2008      */
2009     int upIdx = 0;
__anon32d7e6602802(const std::string &tableName, VBucket &extend) 2010     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2011         LOGD("upload index:%d", ++upIdx);
2012         if (upIdx == 1) {
2013             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, static_cast<int64_t>(DBStatus::CLOUD_NETWORK_ERROR));
2014         }
2015     });
2016 
2017     /**
2018      * @tc.steps:step3. sync, check consistent count
2019      * @tc.expected: step3. return OK.
2020      */
2021     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2022     CheckConsistentCount(db, localCount - 1);
2023 
2024     /**
2025      * @tc.steps:step4. update local data, fork upload, return error filed of type int64_t
2026      * @tc.expected: step4. return OK.
2027      */
2028     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2029     upIdx = 0;
__anon32d7e6602902(const std::string &tableName, VBucket &extend) 2030     g_virtualCloudDb->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
2031         LOGD("upload index:%d", ++upIdx);
2032         if (upIdx == 1) {
2033             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2034             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2035         }
2036         if (upIdx == 2) {
2037             int64_t err = DBStatus::CLOUD_RECORD_EXIST_CONFLICT + 1;
2038             extend.insert_or_assign(CloudDbConstant::ERROR_FIELD, err);
2039         }
2040     });
2041 
2042     /**
2043      * @tc.steps:step5. sync, check consistent count
2044      * @tc.expected: step5. return OK.
2045      */
2046     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2047     CheckConsistentCount(db, localCount - 2);
2048 }
2049 
2050 /**
2051  * @tc.name: ConsistentFlagTest005
2052  * @tc.desc: Local data changes during download, check the 0x20 bit of flag after sync
2053  * @tc.type: FUNC
2054  * @tc.require:
2055  * @tc.author: bty
2056  */
2057 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest005, TestSize.Level0)
2058 {
2059     /**
2060      * @tc.steps:step1. init data
2061      * @tc.expected: step1. return OK.
2062      */
2063     int localCount = 20; // 20 is num of local
2064     int cloudCount = 10; // 10 is num of cloud
2065     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, false);
2066     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2067     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2068 
2069     /**
2070      * @tc.steps:step2. fork download, update local assets where id=2
2071      * @tc.expected: step2. return OK.
2072      */
2073     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2074     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2075     int index = 0;
2076     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2077         .WillRepeatedly(
2078             [this, &index](const std::string &, const std::string &gid, const Type &,
__anon32d7e6602a02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2079                 std::map<std::string, Assets> &assets) {
2080                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2081                 if (index == 1) { // 1 is first download
2082                     std::string sql = "UPDATE " + ASSETS_TABLE_NAME + " SET assets=NULL where id=2;";
2083                     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
2084                 }
2085                 return DBStatus::OK;
2086             });
2087 
2088     /**
2089      * @tc.steps:step3. fork upload, check consistent count
2090      * @tc.expected: step3. return OK.
2091      */
2092     int upIdx = 0;
__anon32d7e6602b02(const std::string &tableName, VBucket &extend) 2093     g_virtualCloudDb->ForkUpload([this, localCount, cloudCount, &upIdx](const std::string &tableName, VBucket &extend) {
2094         LOGD("upload index:%d", ++upIdx);
2095         if (upIdx == 1) { // 1 is first upload
2096             CheckConsistentCount(db, localCount - cloudCount - 1);
2097         }
2098     });
2099 
2100     /**
2101      * @tc.steps:step4. sync, check consistent count
2102      * @tc.expected: step4. return OK.
2103      */
2104     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2105     CheckConsistentCount(db, localCount);
2106 }
2107 
2108 /**
2109  * @tc.name: ConsistentFlagTest006
2110  * @tc.desc:
2111  * @tc.type: FUNC
2112  * @tc.require:
2113  * @tc.author: bty
2114  */
2115 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, ConsistentFlagTest006, TestSize.Level0)
2116 {
2117     /**
2118      * @tc.steps:step1. init data
2119      * @tc.expected: step1. return OK.
2120      */
2121     int cloudCount = 10; // 10 is num of cloud
2122     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2123     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2124 
2125     /**
2126      * @tc.steps:step2. fork download, update local assets where id=2
2127      * @tc.expected: step2. return OK.
2128      */
2129     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1);
2130     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2131     int delCount = 3; // 3 is num of cloud
2132     DeleteCloudDBData(1, delCount, ASSETS_TABLE_NAME);
2133     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2134     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2135     int index = 0;
2136     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2137         .WillRepeatedly(
2138             [&index](const std::string &, const std::string &gid, const Type &,
__anon32d7e6602c02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2139                 std::map<std::string, Assets> &assets) {
2140                 LOGD("download gid:%s, index:%d", gid.c_str(), ++index);
2141                 if (index == 1) { // 1 is first download
2142                     return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
2143                 }
2144                 return DBStatus::OK;
2145             });
2146 
2147     /**
2148      * @tc.steps:step3. fork upload, check consistent count
2149      * @tc.expected: step3. return OK.
2150      */
2151     int upIdx = 0;
__anon32d7e6602d02(const std::string &tableName, VBucket &extend) 2152     g_virtualCloudDb->ForkUpload([this, delCount, &upIdx](const std::string &tableName, VBucket &extend) {
2153         LOGD("upload index:%d", ++upIdx);
2154         if (upIdx == 1) { // 1 is first upload
2155             CheckConsistentCount(db, delCount);
2156             CheckCompensatedCount(db, 0L);
2157         }
2158     });
2159 
2160     /**
2161      * @tc.steps:step4. sync, check consistent count
2162      * @tc.expected: step4. return OK.
2163      */
2164     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2165     CheckConsistentCount(db, cloudCount);
2166 }
2167 
2168 /**
2169  * @tc.name: SyncDataStatusTest001
2170  * @tc.desc: No need to download asset, check status after sync
2171  * @tc.type: FUNC
2172  * @tc.require:
2173  * @tc.author: bty
2174  */
2175 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest001, TestSize.Level0)
2176 {
2177     DataStatusTest001(false);
2178 }
2179 
2180 /**
2181  * @tc.name: SyncDataStatusTest002
2182  * @tc.desc: Need to download asset, check status after sync
2183  * @tc.type: FUNC
2184  * @tc.require:
2185  * @tc.author: bty
2186  */
2187 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest002, TestSize.Level0)
2188 {
2189     DataStatusTest001(true);
2190 }
2191 
2192 /**
2193  * @tc.name: SyncDataStatusTest003
2194  * @tc.desc: Lock during download and check status
2195  * @tc.type: FUNC
2196  * @tc.require:
2197  * @tc.author: bty
2198  */
2199 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest003, TestSize.Level0)
2200 {
2201     DataStatusTest003();
2202 }
2203 
2204 /**
2205  * @tc.name: SyncDataStatusTest004
2206  * @tc.desc: Lock and delete during download, check status
2207  * @tc.type: FUNC
2208  * @tc.require:
2209  * @tc.author: bty
2210  */
2211 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest004, TestSize.Level0)
2212 {
2213     DataStatusTest004();
2214 }
2215 
2216 /**
2217  * @tc.name: SyncDataStatusTest005
2218  * @tc.desc: Lock and update during download, check status
2219  * @tc.type: FUNC
2220  * @tc.require:
2221  * @tc.author: bty
2222  */
2223 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest005, TestSize.Level0)
2224 {
2225     DataStatusTest005();
2226 }
2227 
2228 /**
2229  * @tc.name: SyncDataStatusTest006
2230  * @tc.desc: Lock and update and Unlock during download, check status
2231  * @tc.type: FUNC
2232  * @tc.require:
2233  * @tc.author: bty
2234  */
2235 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest006, TestSize.Level0)
2236 {
2237     DataStatusTest006();
2238 }
2239 
2240 /**
2241  * @tc.name: SyncDataStatusTest007
2242  * @tc.desc: Download return error, check status
2243  * @tc.type: FUNC
2244  * @tc.require:
2245  * @tc.author: bty
2246  */
2247 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest007, TestSize.Level0)
2248 {
2249     DataStatusTest007();
2250 }
2251 
2252 /**
2253  * @tc.name: SyncDataStatusTest008
2254  * @tc.desc: Test upload process when data locked
2255  * @tc.type: FUNC
2256  * @tc.require:
2257  * @tc.author: bty
2258  */
2259 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, SyncDataStatusTest008, TestSize.Level0)
2260 {
2261     /**
2262      * @tc.steps:step1. init local data
2263      * @tc.expected: step1. return OK.
2264      */
2265     int localCount = 40;
2266     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2267     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2268     std::string sql = "update " + logName + " SET status = 2 where data_key >=20;";
2269     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2270 
2271     /**
2272      * @tc.steps:step2. sync and check process
2273      * @tc.expected: step2. return OK.
2274      */
2275     g_syncProcess = {};
2276     Query query = Query::Select().FromTable({ ASSETS_TABLE_NAME });
2277     std::vector<TableProcessInfo> expectProcess = {
2278         { PROCESSING, { 0, 0, 0, 0 }, { 0, 0, 0, 0 } },
2279         { FINISHED, { 0, 0, 0, 0 }, { 1, 40, 40, 0 } } // 1 is index, 40 is count
2280     };
2281     int index = 0;
2282     CloudSyncConfig config;
2283     config.maxUploadCount = 100; // max upload 100
2284     g_delegate->SetCloudSyncConfig(config);
__anon32d7e6602e02(const std::map<std::string, SyncProcess> &process) 2285     CloudSyncStatusCallback callback = [&index, &expectProcess](const std::map<std::string, SyncProcess> &process) {
2286         g_syncProcess = std::move(process.begin()->second);
2287         ASSERT_LT(index, 2);
2288         for (const auto &[tableName, info]: g_syncProcess.tableProcess) {
2289             EXPECT_EQ(info.process, expectProcess[index].process);
2290             EXPECT_EQ(info.upLoadInfo.batchIndex, expectProcess[index].upLoadInfo.batchIndex);
2291             EXPECT_EQ(info.upLoadInfo.total, expectProcess[index].upLoadInfo.total);
2292             EXPECT_EQ(info.upLoadInfo.successCount, expectProcess[index].upLoadInfo.successCount);
2293             EXPECT_EQ(tableName, ASSETS_TABLE_NAME);
2294         }
2295         index++;
2296         if (g_syncProcess.process == FINISHED) {
2297             g_processCondition.notify_one();
2298             ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2299         }
2300     };
2301     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), OK);
2302     WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
2303 }
2304 
2305 /**
2306  * @tc.name: DownloadAssetTest001
2307  * @tc.desc: Test the asset status after the share table sync
2308  * @tc.type: FUNC
2309  * @tc.require:
2310  * @tc.author: bty
2311  */
2312 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest001, TestSize.Level0)
2313 {
2314     /**
2315      * @tc.steps:step1. init data and sync
2316      * @tc.expected: step1. return OK.
2317      */
2318     int cloudCount = 10; // 10 is num of cloud
2319     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME_SHARED);
2320     CallSync({ASSETS_TABLE_NAME_SHARED}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2321 
2322     /**
2323      * @tc.steps:step2. check asset status
2324      * @tc.expected: step2. return OK.
2325      */
2326     SqlCondition condition;
2327     condition.sql = "select assets from " + ASSETS_TABLE_NAME_SHARED + " where _rowid_ = 1;";
2328     condition.readOnly = true;
2329     std::vector<VBucket> records;
2330     EXPECT_EQ(g_delegate->ExecuteSql(condition, records), OK);
2331     for (const auto &data: records) {
2332         Assets assets;
2333         CloudStorageUtils::GetValueFromVBucket(COL_ASSETS, data, assets);
2334         for (const auto &asset: assets) {
2335             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2336         }
2337     }
2338 }
2339 
2340 /**
2341  * @tc.name: DownloadAssetTest002
2342  * @tc.desc: Test asset download failed and re download
2343  * @tc.type: FUNC
2344  * @tc.require:
2345  * @tc.author: liaoyonghuang
2346  */
2347 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, DownloadAssetTest002, TestSize.Level0)
2348 {
2349     /**
2350      * @tc.steps:step1. init data
2351      * @tc.expected: step1. return OK.
2352      */
2353     int cloudCount = 10; // 10 is num of cloud
2354     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2355 
2356     /**
2357      * @tc.steps:step2. Set asset download status error and sync
2358      * @tc.expected: step2. sync successful but download assets fail.
2359      */
2360     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2361     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2362 
2363     /**
2364      * @tc.steps:step3. Set asset download status OK and sync
2365      * @tc.expected: step3. return OK.
2366      */
2367     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2368     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2369 
2370     /**
2371      * @tc.steps:step4. Check assets status
2372      * @tc.expected: step4. status is NORMAL.
2373      */
2374     std::string sql = "SELECT assets FROM " + ASSETS_TABLE_NAME + ";";
2375     sqlite3_stmt *stmt = nullptr;
2376     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2377     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2378         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
2379         Type cloudValue;
2380         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
2381         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(cloudValue));
2382         for (const auto &asset : assets) {
2383             EXPECT_EQ(asset.status, AssetStatus::NORMAL);
2384         }
2385     }
2386     int errCode = E_OK;
2387     SQLiteUtils::ResetStatement(stmt, true, errCode);
2388     EXPECT_EQ(errCode, E_OK);
2389 }
2390 
2391 /**
2392  * @tc.name: RecordLockFuncTest001
2393  * @tc.desc: UNLOCKING->UNLOCKING Synchronous download failure wholly.
2394  * @tc.type: FUNC
2395  * @tc.author: lijun
2396  */
2397 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest001, TestSize.Level0)
2398 {
2399     /**
2400      * @tc.steps:step1. init local data
2401      * @tc.expected: step1. return OK.
2402      */
2403     int localCount = 100;
2404     int cloudCount = 100;
2405     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2406     std::string logName = DBCommon::GetLogTableName(ASSETS_TABLE_NAME);
2407     std::string sql = "update " + logName + " SET status = 2 where data_key >=70;";
2408     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), E_OK);
2409     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2410     CheckLockStatus(db, 70, 99, LockStatus::LOCK);
2411     DeleteLocalRecord(db, 70, 30, ASSETS_TABLE_NAME);
2412 
2413     /**
2414      * @tc.steps:step2. init cloud data
2415      * @tc.expected: step2. return OK.
2416      */
2417     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
2418     UpdateCloudDBData(0, 70, 0, 0, ASSETS_TABLE_NAME);
2419 
2420     std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
2421     ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
2422     int index = 0;
2423     EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
2424         .WillRepeatedly(
__anon32d7e6602f02(const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) 2425             [&index](const std::string &, const std::string &gid, const Type &, std::map<std::string, Assets> &assets) {
2426                 LOGD("Download GID:%s  %d", gid.c_str(), index);
2427                 index++;
2428                 if (index <= 30) {
2429                     return DBStatus::CLOUD_ERROR;
2430                 } else {
2431                     return DBStatus::OK;
2432                 }
2433 
2434             });
2435 
2436     std::mutex mtx;
2437     std::condition_variable cv;
2438     int queryIdx = 0;
__anon32d7e6603002(const std::string &, VBucket &) 2439     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2440         LOGD("query index:%d", ++queryIdx);
2441         if (queryIdx == 2) { // 2 is compensated sync
2442             mtx.lock();
2443             cv.notify_one();
2444             mtx.unlock();
2445             std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2446         }
2447     });
2448     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_ERROR);
2449     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK, DBStatus::CLOUD_ERROR);
2450 
2451     {
2452         std::unique_lock<std::mutex> lock(mtx);
2453         cv.wait(lock);
2454     }
2455     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
2456 
2457     /**
2458      * @tc.steps:step3. check before compensated sync
2459      * @tc.expected: 70-99 is UNLOCKING.
2460      */
2461     CheckLockStatus(db, 0, 69, LockStatus::UNLOCK);
2462     CheckLockStatus(db, 70, 99, LockStatus::UNLOCKING);
2463 
2464     std::this_thread::sleep_for(std::chrono::seconds(3));
2465     /**
2466      * @tc.steps:step4. check after compensated sync
2467      * @tc.expected: all is UNLOCKING.
2468      */
2469     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2470 }
2471 
2472 /**
2473  * @tc.name: RecordLockFuncTest002
2474  * @tc.desc: Compensated synchronization, Locked data has not been synchronized. The first synchronization data is
2475  * based on the cloud, and the last synchronization data is based on the device.
2476  * @tc.type: FUNC
2477  * @tc.author: lijun
2478  */
2479 HWTEST_F(DistributedDBCloudSyncerDownloadAssetsTest, RecordLockFuncTest002, TestSize.Level0)
2480 {
2481     /**
2482      * @tc.steps:step1. init local data, modify data Status and initiate synchronization
2483      * @tc.expected: step1. return OK.
2484      */
2485     int localCount = 120;
2486     InsertLocalData(db, 0, localCount, ASSETS_TABLE_NAME, true);
2487     std::vector<std::vector<uint8_t>> hashKey;
2488     CloudDBSyncUtilsTest::GetHashKey(ASSETS_TABLE_NAME, " data_key >=100 ", db, hashKey);
2489     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2490     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2491     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2492     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PULL, DBStatus::OK);
2493 
2494     /**
2495      * @tc.steps:step2. Check the synchronization result and log table status
2496      * @tc.expected: step2.100-109 is LOCK_CHANGE.
2497      */
2498     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2499     CheckLockStatus(db, 100, 119, LockStatus::LOCK);
2500     UpdateLocalData(db, ASSETS_TABLE_NAME, ASSETS_COPY1, 100, 109);
2501     CheckLockStatus(db, 100, 109, LockStatus::LOCK_CHANGE);
2502     CheckLockStatus(db, 110, 119, LockStatus::LOCK);
2503 
2504     /**
2505      * @tc.steps:step3. Synchronize and check the lock_change data status
2506      * @tc.expected: step3.100-119 is LOCK_CHANGE.
2507      */
2508     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2509     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2510     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2511 
2512     /**
2513      * @tc.steps:step4. Unlock,the lock_change data status changes to unlocking
2514      * @tc.expected: step4.100-119 is UNLOCKING.
2515      */
2516     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2517     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2518     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2519 
2520     /**
2521      * @tc.steps:step5. Lock,the unlocking data status changes to lock_change
2522      * @tc.expected: step5.100-119 is LOCK_CHANGE.
2523      */
2524     EXPECT_EQ(Lock(ASSETS_TABLE_NAME, hashKey, db), OK);
2525     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2526     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2527 
2528     /**
2529      * @tc.steps:step6. Synchronize and check the lock_change data status
2530      * @tc.expected: step6.100-119 is LOCK_CHANGE.
2531      */
2532     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2533     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2534     CheckLockStatus(db, 100, 119, LockStatus::LOCK_CHANGE);
2535 
2536     /**
2537      * @tc.steps:step7. Unlock,the lock_change data status changes to unlocking
2538      * @tc.expected: step7.100-119 is UNLOCKING.
2539      */
2540     EXPECT_EQ(UnLock(ASSETS_TABLE_NAME, hashKey, db), WAIT_COMPENSATED_SYNC);
2541     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2542 
2543     /**
2544      * @tc.steps:step8. Synchronize data
2545      * @tc.expected: step8.return OK.
2546      */
2547     std::mutex mtx;
2548     std::condition_variable cv;
2549     int queryIdx = 0;
__anon32d7e6603102(const std::string &, VBucket &) 2550     g_virtualCloudDb->ForkQuery([&](const std::string &, VBucket &) {
2551         LOGD("query index:%d", ++queryIdx);
2552         if (queryIdx == 5) { // 5 is compensated sync
2553             mtx.lock();
2554             cv.notify_one();
2555             mtx.unlock();
2556             std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2557         }
2558     });
2559     CallSync({ASSETS_TABLE_NAME}, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2560     {
2561         std::unique_lock<std::mutex> lock(mtx);
2562         cv.wait(lock);
2563     }
2564 
2565     /**
2566      * @tc.steps:step9. check before compensated sync
2567      * @tc.expected: 100-119 is UNLOCKING.
2568      */
2569     CheckLockStatus(db, 0, 99, LockStatus::UNLOCK);
2570     CheckLockStatus(db, 100, 119, LockStatus::UNLOCKING);
2571 
2572     std::this_thread::sleep_for(std::chrono::seconds(3));
2573     /**
2574      * @tc.steps:step10. check after compensated sync
2575      * @tc.expected: all is UNLOCK.
2576      */
2577     CheckLockStatus(db, 0, 119, LockStatus::UNLOCK);
2578 }
2579 } // namespace
2580 #endif // RELATIONAL_STORE
2581