1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20 
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24     int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25     if (errCode != E_OK) {
26         return errCode;
27     }
28     auto callback = [this] { SchemaChangeCallback(); };
29     return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30         RegisterSchemaChangedCallback(callback);
31 }
32 
Sync(const SyncParma & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParma &param, uint64_t connectionId)
34 {
35     if (QuerySyncPreCheck(param) != E_OK) {
36         return -E_NOT_SUPPORT;
37     }
38     return GenericSyncer::Sync(param, connectionId);
39 }
40 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)41 int SingleVerRelationalSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
42 {
43     const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
44     std::vector<QuerySyncObject> tablesQuery;
45     if (param.isQuerySync) {
46         tablesQuery.push_back(param.syncQuery);
47     } else {
48         tablesQuery = syncInterface->GetTablesQuery();
49     }
50     std::set<uint32_t> subSyncIdSet;
51     int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
52     if (errCode != E_OK) {
53         DoRollBack(subSyncIdSet);
54         return errCode;
55     }
56     if (param.wait) {
57         bool connectionClose = false;
58         {
59             std::lock_guard<std::mutex> lockGuard(syncIdLock_);
60             connectionClose = connectionIdMap_.find(connectionId) == connectionIdMap_.end();
61         }
62         if (!connectionClose) {
63             DoOnComplete(param, syncId);
64         }
65     }
66     return E_OK;
67 }
68 
GenerateEachSyncTask(const SyncParma & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)69 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma &param, uint32_t syncId,
70     const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
71 {
72     SyncParma subParam = param;
73     subParam.isQuerySync = true;
74     int errCode = E_OK;
75     for (const QuerySyncObject &table : tablesQuery) {
76         uint32_t subSyncId = GenerateSyncId();
77         std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
78         LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
79             subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
80         subParam.syncQuery = table;
81         subParam.onComplete = [this, subSyncId, syncId, param](const std::map<std::string, int> &devicesMap) {
82             DoOnSubSyncComplete(subSyncId, syncId, param, devicesMap);
83         };
84         {
85             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
86             fullSyncIdMap_[syncId].insert(subSyncId);
87         }
88         errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
89         if (errCode != E_OK) {
90             LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
91             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
92             fullSyncIdMap_[syncId].erase(subSyncId);
93             break;
94         }
95         subSyncIdSet.insert(subSyncId);
96     }
97     return errCode;
98 }
99 
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParma & param,const std::map<std::string,int> & devicesMap)100 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
101     const SyncParma &param, const std::map<std::string, int> &devicesMap)
102 {
103     bool allFinish = true;
104     {
105         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
106         fullSyncIdMap_[syncId].erase(subSyncId);
107         allFinish = fullSyncIdMap_[syncId].empty();
108         for (const auto &item : devicesMap) {
109             resMap_[syncId][item.first][param.syncQuery.GetRelationTableName()] = static_cast<int>(item.second);
110         }
111     }
112     // block sync do callback in sync function
113     if (allFinish && !param.wait) {
114         DoOnComplete(param, syncId);
115     }
116 }
117 
DoRollBack(std::set<uint32_t> & subSyncIdSet)118 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
119 {
120     for (const auto &removeId : subSyncIdSet) {
121         int retCode = RemoveSyncOperation(static_cast<int>(removeId));
122         if (retCode != E_OK) {
123             LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
124         }
125     }
126 }
127 
DoOnComplete(const SyncParma & param,uint32_t syncId)128 void SingleVerRelationalSyncer::DoOnComplete(const SyncParma &param, uint32_t syncId)
129 {
130     if (!param.relationOnComplete) {
131         return;
132     }
133     std::map<std::string, std::vector<TableStatus>> syncRes;
134     std::map<std::string, std::map<std::string, int>> tmpMap;
135     {
136         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
137         tmpMap = resMap_[syncId];
138     }
139     for (const auto &devicesRes : tmpMap) {
140         for (const auto &tableRes : devicesRes.second) {
141             syncRes[devicesRes.first].push_back(
142                 {tableRes.first, static_cast<DBStatus>(tableRes.second)});
143         }
144     }
145     param.relationOnComplete(syncRes);
146     {
147         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
148         resMap_.erase(syncId);
149         fullSyncIdMap_.erase(syncId);
150     }
151 }
152 
EnableAutoSync(bool enable)153 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
154 {
155     (void)enable;
156 }
157 
LocalDataChanged(int notifyEvent)158 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
159 {
160     (void)notifyEvent;
161 }
162 
SchemaChangeCallback()163 void SingleVerRelationalSyncer::SchemaChangeCallback()
164 {
165     if (syncEngine_ == nullptr) {
166         return;
167     }
168     syncEngine_->SchemaChange();
169     int errCode = UpgradeSchemaVerInMeta();
170     if (errCode != E_OK) {
171         LOGE("[SingleVerRelationalSyncer] upgrade schema version in meta failed:%d", errCode);
172     }
173 }
174 
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const175 int SingleVerRelationalSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine,
176     ISyncInterface *storage) const
177 {
178     if (!param.isQuerySync) {
179         return E_OK;
180     }
181     QuerySyncObject query = param.syncQuery;
182     int errCode = static_cast<RelationalDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
183     if (errCode != E_OK) {
184         LOGE("[SingleVerRelationalSyncer] QuerySyncObject check failed");
185         return errCode;
186     }
187     if (param.mode == SUBSCRIBE_QUERY) {
188         return -E_NOT_SUPPORT;
189     }
190     return E_OK;
191 }
192 
QuerySyncPreCheck(const SyncParma & param) const193 int SingleVerRelationalSyncer::QuerySyncPreCheck(const SyncParma &param) const
194 {
195     if (!param.syncQuery.GetRelationTableNames().empty()) {
196         LOGE("[SingleVerRelationalSyncer] sync with not support query");
197         return -E_NOT_SUPPORT;
198     }
199     if (!param.isQuerySync) {
200         return E_OK;
201     }
202     if (param.mode == SYNC_MODE_PUSH_PULL) {
203         LOGE("[SingleVerRelationalSyncer] sync with not support push_pull mode");
204         return -E_NOT_SUPPORT;
205     }
206     if (param.syncQuery.GetRelationTableName().empty()) {
207         LOGE("[SingleVerRelationalSyncer] sync with empty table");
208         return -E_NOT_SUPPORT;
209     }
210     return E_OK;
211 }
212 }
213 #endif