1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "KvStoreObserverProxy"
17
18 #include "ikvstore_observer.h"
19
20 #include <cinttypes>
21 #include <ipc_skeleton.h>
22 #include "kv_types_util.h"
23 #include "itypes_util.h"
24 #include "log_print.h"
25 #include "message_parcel.h"
26 namespace OHOS {
27 namespace DistributedKv {
28 using namespace std::chrono;
29
30 enum {
31 CLOUD_ONCHANGE,
32 ONCHANGE,
33 };
34
KvStoreObserverProxy(const sptr<IRemoteObject> & impl)35 KvStoreObserverProxy::KvStoreObserverProxy(const sptr<IRemoteObject> &impl) : IRemoteProxy<IKvStoreObserver>(impl)
36 {
37 }
38
GetBufferSize(const std::vector<Entry> & entries)39 int64_t GetBufferSize(const std::vector<Entry> &entries)
40 {
41 int64_t bufferSize = 0;
42 for (const auto &item : entries) {
43 bufferSize += item.key.RawSize() + item.value.RawSize();
44 }
45 return bufferSize;
46 }
47
OnChange(const ChangeNotification & changeNotification)48 void KvStoreObserverProxy::OnChange(const ChangeNotification &changeNotification)
49 {
50 MessageParcel data;
51 MessageParcel reply;
52 if (!data.WriteInterfaceToken(KvStoreObserverProxy::GetDescriptor())) {
53 ZLOGE("write descriptor failed");
54 return;
55 }
56 int64_t insertSize = ITypesUtil::GetTotalSize(changeNotification.GetInsertEntries());
57 int64_t updateSize = ITypesUtil::GetTotalSize(changeNotification.GetUpdateEntries());
58 int64_t deleteSize = ITypesUtil::GetTotalSize(changeNotification.GetDeleteEntries());
59 int64_t totalSize = insertSize + updateSize + deleteSize + sizeof(uint32_t);
60 if (insertSize < 0 || updateSize < 0 || deleteSize < 0 || !data.WriteInt32(totalSize)) {
61 ZLOGE("Write ChangeNotification buffer size to parcel failed.");
62 return;
63 }
64 ZLOGD("I(%" PRId64 ") U(%" PRId64 ") D(%" PRId64 ") T(%" PRId64 ")", insertSize, updateSize, deleteSize, totalSize);
65 if (totalSize < SWITCH_RAW_DATA_SIZE) {
66 if (!ITypesUtil::Marshal(data, changeNotification)) {
67 ZLOGW("Write ChangeNotification to parcel failed.");
68 return;
69 }
70 } else {
71 if (!ITypesUtil::Marshal(data, changeNotification.GetDeviceId(), uint32_t(changeNotification.IsClear())) ||
72 !ITypesUtil::MarshalToBuffer(changeNotification.GetInsertEntries(), insertSize, data) ||
73 !ITypesUtil::MarshalToBuffer(changeNotification.GetUpdateEntries(), updateSize, data) ||
74 !ITypesUtil::MarshalToBuffer(changeNotification.GetDeleteEntries(), deleteSize, data)) {
75 ZLOGE("WriteChangeList to Parcel by buffer failed");
76 return;
77 }
78 }
79
80 MessageOption mo{ MessageOption::TF_WAIT_TIME };
81 int error = Remote()->SendRequest(ONCHANGE, data, reply, mo);
82 if (error != 0) {
83 ZLOGE("SendRequest failed, error %d", error);
84 }
85 }
86
OnChange(const DataOrigin & origin,Keys && keys)87 void KvStoreObserverProxy::OnChange(const DataOrigin &origin, Keys &&keys)
88 {
89 MessageParcel data;
90 MessageParcel reply;
91 if (!data.WriteInterfaceToken(KvStoreObserverProxy::GetDescriptor())) {
92 ZLOGE("write descriptor failed");
93 return;
94 }
95 if (!ITypesUtil::Marshal(data, origin.store, keys[OP_INSERT], keys[OP_UPDATE], keys[OP_DELETE])) {
96 ZLOGE("WriteChangeInfo to Parcel failed.");
97 return;
98 }
99
100 MessageOption mo{ MessageOption::TF_WAIT_TIME };
101 int error = Remote()->SendRequest(CLOUD_ONCHANGE, data, reply, mo);
102 if (error != 0) {
103 ZLOGE("SendRequest failed, error %d", error);
104 }
105 }
106
OnRemoteRequest(uint32_t code,MessageParcel & data,MessageParcel & reply,MessageOption & option)107 int32_t KvStoreObserverStub::OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply,
108 MessageOption &option)
109 {
110 ZLOGD("code:%{public}u, callingPid:%{public}d", code, IPCSkeleton::GetCallingPid());
111 const int errorResult = -1;
112 if (KvStoreObserverStub::GetDescriptor() != data.ReadInterfaceToken()) {
113 ZLOGE("local descriptor is not equal to remote");
114 return errorResult;
115 }
116 switch (code) {
117 case ONCHANGE: {
118 if (data.ReadInt32() < SWITCH_RAW_DATA_SIZE) {
119 ChangeNotification notification({}, {}, {}, "", false);
120 if (!ITypesUtil::Unmarshal(data, notification)) {
121 ZLOGE("changeNotification is nullptr");
122 return errorResult;
123 }
124 OnChange(notification);
125 } else {
126 std::string deviceId;
127 uint32_t clear = 0;
128 std::vector<Entry> inserts;
129 std::vector<Entry> updates;
130 std::vector<Entry> deletes;
131 if (!ITypesUtil::Unmarshal(data, deviceId, clear) || !ITypesUtil::UnmarshalFromBuffer(data, inserts) ||
132 !ITypesUtil::UnmarshalFromBuffer(data, updates) ||
133 !ITypesUtil::UnmarshalFromBuffer(data, deletes)) {
134 ZLOGE("WriteChangeList to Parcel by buffer failed");
135 return errorResult;
136 }
137 ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), deviceId,
138 clear != 0);
139 OnChange(change);
140 }
141 return 0;
142 }
143 case CLOUD_ONCHANGE: {
144 std::string store;
145 Keys keys;
146 if (!ITypesUtil::Unmarshal(data, store, keys[OP_INSERT], keys[OP_UPDATE], keys[OP_DELETE])) {
147 ZLOGE("ReadChangeList from Parcel failed");
148 return errorResult;
149 }
150 OnChange({ .store = store }, std::move(keys));
151 return 0;
152 }
153 default:
154 return IPCObjectStub::OnRemoteRequest(code, data, reply, option);
155 }
156 }
157 } // namespace DistributedKv
158 } // namespace OHOS
159