1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24 int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25 if (errCode != E_OK) {
26 return errCode;
27 }
28 auto callback = [this] { SchemaChangeCallback(); };
29 return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30 RegisterSchemaChangedCallback(callback);
31 }
32
Sync(const SyncParma & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
34 {
35 if (QuerySyncPreCheck(param) != E_OK) {
36 return -E_NOT_SUPPORT;
37 }
38 return GenericSyncer::Sync(param, connectionId);
39 }
40
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)41 int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
42 {
43 const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
44 std::vector<QuerySyncObject> tablesQuery;
45 if (param.isQuerySync) {
46 tablesQuery.push_back(param.syncQuery);
47 } else {
48 tablesQuery = syncInterface->GetTablesQuery();
49 }
50 std::set<uint32_t> subSyncIdSet;
51 int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
52 if (errCode != E_OK) {
53 DoRollBack(subSyncIdSet);
54 return errCode;
55 }
56 if (param.wait) {
57 bool connectionClose = false;
58 {
59 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
60 connectionClose = connectionIdMap_.find(connectionId) == connectionIdMap_.end();
61 }
62 if (!connectionClose) {
63 DoOnComplete(param, syncId);
64 }
65 }
66 return E_OK;
67 }
68
GenerateEachSyncTask(const SyncParma & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)69 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId,
70 const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
71 {
72 SyncParma subParam = param;
73 subParam.isQuerySync = true;
74 int errCode = E_OK;
75 for (const QuerySyncObject &table : tablesQuery) {
76 uint32_t subSyncId = GenerateSyncId();
77 std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
78 LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
79 subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
80 subParam.syncQuery = table;
81 subParam.onComplete = [this, subSyncId, syncId, param](const std::map<std::string, int> &devicesMap) {
82 DoOnSubSyncComplete(subSyncId, syncId, param, devicesMap);
83 };
84 {
85 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
86 fullSyncIdMap_[syncId].insert(subSyncId);
87 }
88 errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
89 if (errCode != E_OK) {
90 LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
91 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
92 fullSyncIdMap_[syncId].erase(subSyncId);
93 break;
94 }
95 subSyncIdSet.insert(subSyncId);
96 }
97 return errCode;
98 }
99
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParma & param,const std::map<std::string,int> & devicesMap)100 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
101 const SyncParma ¶m, const std::map<std::string, int> &devicesMap)
102 {
103 bool allFinish = true;
104 {
105 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
106 fullSyncIdMap_[syncId].erase(subSyncId);
107 allFinish = fullSyncIdMap_[syncId].empty();
108 for (const auto &item : devicesMap) {
109 resMap_[syncId][item.first][param.syncQuery.GetRelationTableName()] = static_cast<int>(item.second);
110 }
111 }
112 // block sync do callback in sync function
113 if (allFinish && !param.wait) {
114 DoOnComplete(param, syncId);
115 }
116 }
117
DoRollBack(std::set<uint32_t> & subSyncIdSet)118 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
119 {
120 for (const auto &removeId : subSyncIdSet) {
121 int retCode = RemoveSyncOperation(static_cast<int>(removeId));
122 if (retCode != E_OK) {
123 LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
124 }
125 }
126 }
127
DoOnComplete(const SyncParma & param,uint32_t syncId)128 void SingleVerRelationalSyncer::DoOnComplete(const SyncParma ¶m, uint32_t syncId)
129 {
130 if (!param.relationOnComplete) {
131 return;
132 }
133 std::map<std::string, std::vector<TableStatus>> syncRes;
134 std::map<std::string, std::map<std::string, int>> tmpMap;
135 {
136 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
137 tmpMap = resMap_[syncId];
138 }
139 for (const auto &devicesRes : tmpMap) {
140 for (const auto &tableRes : devicesRes.second) {
141 syncRes[devicesRes.first].push_back(
142 {tableRes.first, static_cast<DBStatus>(tableRes.second)});
143 }
144 }
145 param.relationOnComplete(syncRes);
146 {
147 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
148 resMap_.erase(syncId);
149 fullSyncIdMap_.erase(syncId);
150 }
151 }
152
EnableAutoSync(bool enable)153 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
154 {
155 (void)enable;
156 }
157
LocalDataChanged(int notifyEvent)158 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
159 {
160 (void)notifyEvent;
161 }
162
SchemaChangeCallback()163 void SingleVerRelationalSyncer::SchemaChangeCallback()
164 {
165 if (syncEngine_ == nullptr) {
166 return;
167 }
168 syncEngine_->SchemaChange();
169 int errCode = UpgradeSchemaVerInMeta();
170 if (errCode != E_OK) {
171 LOGE("[SingleVerRelationalSyncer] upgrade schema version in meta failed:%d", errCode);
172 }
173 }
174
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const175 int SingleVerRelationalSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine,
176 ISyncInterface *storage) const
177 {
178 if (!param.isQuerySync) {
179 return E_OK;
180 }
181 QuerySyncObject query = param.syncQuery;
182 int errCode = static_cast<RelationalDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
183 if (errCode != E_OK) {
184 LOGE("[SingleVerRelationalSyncer] QuerySyncObject check failed");
185 return errCode;
186 }
187 if (param.mode == SUBSCRIBE_QUERY) {
188 return -E_NOT_SUPPORT;
189 }
190 return E_OK;
191 }
192
QuerySyncPreCheck(const SyncParma & param) const193 int SingleVerRelationalSyncer::QuerySyncPreCheck(const SyncParma ¶m) const
194 {
195 if (!param.syncQuery.GetRelationTableNames().empty()) {
196 LOGE("[SingleVerRelationalSyncer] sync with not support query");
197 return -E_NOT_SUPPORT;
198 }
199 if (!param.isQuerySync) {
200 return E_OK;
201 }
202 if (param.mode == SYNC_MODE_PUSH_PULL) {
203 LOGE("[SingleVerRelationalSyncer] sync with not support push_pull mode");
204 return -E_NOT_SUPPORT;
205 }
206 if (param.syncQuery.GetRelationTableName().empty()) {
207 LOGE("[SingleVerRelationalSyncer] sync with empty table");
208 return -E_NOT_SUPPORT;
209 }
210 return E_OK;
211 }
212 }
213 #endif