1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd_core.h"
18
19 namespace android {
20 namespace snapshot {
21
22 using namespace android;
23 using namespace android::dm;
24 using android::base::unique_fd;
25
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)26 int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
27 std::vector<const CowOperation*>* replace_zero_vec) {
28 int num_ops = *pending_ops;
29 int nr_consecutive = 0;
30 bool checkOrderedOp = (replace_zero_vec == nullptr);
31
32 do {
33 if (!cowop_iter_->Done() && num_ops) {
34 const CowOperation* cow_op = &cowop_iter_->Get();
35 if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
36 break;
37 }
38
39 *source_offset = cow_op->new_block * BLOCK_SZ;
40 if (!checkOrderedOp) {
41 replace_zero_vec->push_back(cow_op);
42 }
43
44 cowop_iter_->Next();
45 num_ops -= 1;
46 nr_consecutive = 1;
47
48 while (!cowop_iter_->Done() && num_ops) {
49 const CowOperation* op = &cowop_iter_->Get();
50 if (checkOrderedOp && !IsOrderedOp(*op)) {
51 break;
52 }
53
54 uint64_t next_offset = op->new_block * BLOCK_SZ;
55 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
56 break;
57 }
58
59 if (!checkOrderedOp) {
60 replace_zero_vec->push_back(op);
61 }
62
63 nr_consecutive += 1;
64 num_ops -= 1;
65 cowop_iter_->Next();
66 }
67 }
68 } while (0);
69
70 return nr_consecutive;
71 }
72
MergeReplaceZeroOps()73 bool Worker::MergeReplaceZeroOps() {
74 // Flush after merging 2MB. Since all ops are independent and there is no
75 // dependency between COW ops, we will flush the data and the number
76 // of ops merged in COW block device. If there is a crash, we will
77 // end up replaying some of the COW ops which were already merged. That is
78 // ok.
79 //
80 // Although increasing this greater than 2MB may help in improving merge
81 // times; however, on devices with low memory, this can be problematic
82 // when there are multiple merge threads in parallel.
83 int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2;
84 int num_ops_merged = 0;
85
86 SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
87
88 while (!cowop_iter_->Done()) {
89 int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
90 std::vector<const CowOperation*> replace_zero_vec;
91 uint64_t source_offset;
92
93 int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
94 if (linear_blocks == 0) {
95 // Merge complete
96 CHECK(cowop_iter_->Done());
97 break;
98 }
99
100 for (size_t i = 0; i < replace_zero_vec.size(); i++) {
101 const CowOperation* cow_op = replace_zero_vec[i];
102 if (cow_op->type == kCowReplaceOp) {
103 if (!ProcessReplaceOp(cow_op)) {
104 SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
105 return false;
106 }
107 } else {
108 CHECK(cow_op->type == kCowZeroOp);
109 if (!ProcessZeroOp()) {
110 SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
111 return false;
112 }
113 }
114
115 bufsink_.UpdateBufferOffset(BLOCK_SZ);
116 }
117
118 size_t io_size = linear_blocks * BLOCK_SZ;
119
120 // Merge - Write the contents back to base device
121 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
122 io_size, source_offset));
123 if (ret < 0 || ret != io_size) {
124 SNAP_LOG(ERROR)
125 << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
126 << " at offset: " << source_offset << " io_size: " << io_size;
127 return false;
128 }
129
130 num_ops_merged += linear_blocks;
131
132 if (num_ops_merged >= total_ops_merged_per_commit) {
133 // Flush the data
134 if (fsync(base_path_merge_fd_.get()) < 0) {
135 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
136 return false;
137 }
138
139 // Track the merge completion
140 if (!snapuserd_->CommitMerge(num_ops_merged)) {
141 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
142 return false;
143 }
144
145 num_ops_merged = 0;
146 }
147
148 bufsink_.ResetBufferOffset();
149
150 if (snapuserd_->IsIOTerminated()) {
151 SNAP_LOG(ERROR)
152 << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
153 return false;
154 }
155 }
156
157 // Any left over ops not flushed yet.
158 if (num_ops_merged) {
159 // Flush the data
160 if (fsync(base_path_merge_fd_.get()) < 0) {
161 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
162 return false;
163 }
164
165 if (!snapuserd_->CommitMerge(num_ops_merged)) {
166 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
167 return false;
168 }
169
170 num_ops_merged = 0;
171 }
172
173 return true;
174 }
175
MergeOrderedOpsAsync()176 bool Worker::MergeOrderedOpsAsync() {
177 void* mapped_addr = snapuserd_->GetMappedAddr();
178 void* read_ahead_buffer =
179 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
180
181 SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
182
183 while (!cowop_iter_->Done()) {
184 const CowOperation* cow_op = &cowop_iter_->Get();
185 if (!IsOrderedOp(*cow_op)) {
186 break;
187 }
188
189 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
190 // Wait for RA thread to notify that the merge window
191 // is ready for merging.
192 if (!snapuserd_->WaitForMergeBegin()) {
193 return false;
194 }
195
196 snapuserd_->SetMergeInProgress(ra_block_index_);
197
198 loff_t offset = 0;
199 int num_ops = snapuserd_->GetTotalBlocksToMerge();
200
201 int pending_sqe = queue_depth_;
202 int pending_ios_to_submit = 0;
203 bool flush_required = false;
204 blocks_merged_in_group_ = 0;
205
206 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
207 while (num_ops) {
208 uint64_t source_offset;
209
210 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
211
212 if (linear_blocks != 0) {
213 size_t io_size = (linear_blocks * BLOCK_SZ);
214
215 // Get an SQE entry from the ring and populate the I/O variables
216 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
217 if (!sqe) {
218 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
219 return false;
220 }
221
222 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
223 (char*)read_ahead_buffer + offset, io_size, source_offset);
224
225 offset += io_size;
226 num_ops -= linear_blocks;
227 blocks_merged_in_group_ += linear_blocks;
228
229 pending_sqe -= 1;
230 pending_ios_to_submit += 1;
231 // These flags are important - We need to make sure that the
232 // blocks are linked and are written in the same order as
233 // populated. This is because of overlapping block writes.
234 //
235 // If there are no dependency, we can optimize this further by
236 // allowing parallel writes; but for now, just link all the SQ
237 // entries.
238 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
239 }
240
241 // Ring is full or no more COW ops to be merged in this batch
242 if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
243 // If this is a last set of COW ops to be merged in this batch, we need
244 // to sync the merged data. We will try to grab an SQE entry
245 // and set the FSYNC command; additionally, make sure that
246 // the fsync is done after all the I/O operations queued
247 // in the ring is completed by setting IOSQE_IO_DRAIN.
248 //
249 // If there is no space in the ring, we will flush it later
250 // by explicitly calling fsync() system call.
251 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
252 if (pending_sqe != 0) {
253 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
254 if (!sqe) {
255 // very unlikely but let's continue and not fail the
256 // merge - we will flush it later
257 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
258 flush_required = true;
259 } else {
260 io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
261 // Drain the queue before fsync
262 io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
263 pending_sqe -= 1;
264 flush_required = false;
265 pending_ios_to_submit += 1;
266 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
267 }
268 } else {
269 flush_required = true;
270 }
271 }
272
273 // Submit the IO for all the COW ops in a single syscall
274 int ret = io_uring_submit(ring_.get());
275 if (ret != pending_ios_to_submit) {
276 SNAP_PLOG(ERROR)
277 << "io_uring_submit failed for read-ahead: "
278 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
279 return false;
280 }
281
282 int pending_ios_to_complete = pending_ios_to_submit;
283 pending_ios_to_submit = 0;
284
285 bool status = true;
286
287 // Reap I/O completions
288 while (pending_ios_to_complete) {
289 struct io_uring_cqe* cqe;
290
291 // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
292 // these error codes are not truly I/O errors; we can retry them
293 // by re-populating the SQE entries and submitting the I/O
294 // request back. However, we don't do that now; instead we
295 // will fallback to synchronous I/O.
296 ret = io_uring_wait_cqe(ring_.get(), &cqe);
297 if (ret) {
298 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret;
299 status = false;
300 break;
301 }
302
303 if (cqe->res < 0) {
304 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
305 status = false;
306 break;
307 }
308
309 io_uring_cqe_seen(ring_.get(), cqe);
310 pending_ios_to_complete -= 1;
311 }
312
313 if (!status) {
314 return false;
315 }
316
317 pending_sqe = queue_depth_;
318 }
319
320 if (linear_blocks == 0) {
321 break;
322 }
323 }
324
325 // Verify all ops are merged
326 CHECK(num_ops == 0);
327
328 // Flush the data
329 if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
330 SNAP_LOG(ERROR) << " Failed to fsync merged data";
331 return false;
332 }
333
334 // Merge is done and data is on disk. Update the COW Header about
335 // the merge completion
336 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
337 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
338 return false;
339 }
340
341 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
342
343 // Mark the block as merge complete
344 snapuserd_->SetMergeCompleted(ra_block_index_);
345
346 // Notify RA thread that the merge thread is ready to merge the next
347 // window
348 snapuserd_->NotifyRAForMergeReady();
349
350 // Get the next block
351 ra_block_index_ += 1;
352 }
353
354 return true;
355 }
356
MergeOrderedOps()357 bool Worker::MergeOrderedOps() {
358 void* mapped_addr = snapuserd_->GetMappedAddr();
359 void* read_ahead_buffer =
360 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
361
362 SNAP_LOG(INFO) << "MergeOrderedOps started....";
363
364 while (!cowop_iter_->Done()) {
365 const CowOperation* cow_op = &cowop_iter_->Get();
366 if (!IsOrderedOp(*cow_op)) {
367 break;
368 }
369
370 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
371 // Wait for RA thread to notify that the merge window
372 // is ready for merging.
373 if (!snapuserd_->WaitForMergeBegin()) {
374 snapuserd_->SetMergeFailed(ra_block_index_);
375 return false;
376 }
377
378 snapuserd_->SetMergeInProgress(ra_block_index_);
379
380 loff_t offset = 0;
381 int num_ops = snapuserd_->GetTotalBlocksToMerge();
382 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
383 while (num_ops) {
384 uint64_t source_offset;
385
386 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
387 if (linear_blocks == 0) {
388 break;
389 }
390
391 size_t io_size = (linear_blocks * BLOCK_SZ);
392 // Write to the base device. Data is already in the RA buffer. Note
393 // that XOR ops is already handled by the RA thread. We just write
394 // the contents out.
395 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
396 (char*)read_ahead_buffer + offset, io_size,
397 source_offset));
398 if (ret < 0 || ret != io_size) {
399 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
400 << " at offset: " << source_offset << " io_size: " << io_size;
401 snapuserd_->SetMergeFailed(ra_block_index_);
402 return false;
403 }
404
405 offset += io_size;
406 num_ops -= linear_blocks;
407 }
408
409 // Verify all ops are merged
410 CHECK(num_ops == 0);
411
412 // Flush the data
413 if (fsync(base_path_merge_fd_.get()) < 0) {
414 SNAP_LOG(ERROR) << " Failed to fsync merged data";
415 snapuserd_->SetMergeFailed(ra_block_index_);
416 return false;
417 }
418
419 // Merge is done and data is on disk. Update the COW Header about
420 // the merge completion
421 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
422 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
423 snapuserd_->SetMergeFailed(ra_block_index_);
424 return false;
425 }
426
427 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
428 // Mark the block as merge complete
429 snapuserd_->SetMergeCompleted(ra_block_index_);
430
431 // Notify RA thread that the merge thread is ready to merge the next
432 // window
433 snapuserd_->NotifyRAForMergeReady();
434
435 // Get the next block
436 ra_block_index_ += 1;
437 }
438
439 return true;
440 }
441
AsyncMerge()442 bool Worker::AsyncMerge() {
443 if (!MergeOrderedOpsAsync()) {
444 SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
445 // Reset the iter so that we retry the merge
446 while (blocks_merged_in_group_ && !cowop_iter_->RDone()) {
447 cowop_iter_->Prev();
448 blocks_merged_in_group_ -= 1;
449 }
450
451 return false;
452 }
453
454 SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
455 return true;
456 }
457
SyncMerge()458 bool Worker::SyncMerge() {
459 if (!MergeOrderedOps()) {
460 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
461 return false;
462 }
463
464 SNAP_LOG(INFO) << "MergeOrderedOps completed";
465 return true;
466 }
467
Merge()468 bool Worker::Merge() {
469 cowop_iter_ = reader_->GetOpIter(true);
470
471 bool retry = false;
472 bool ordered_ops_merge_status;
473
474 // Start Async Merge
475 if (merge_async_) {
476 ordered_ops_merge_status = AsyncMerge();
477 if (!ordered_ops_merge_status) {
478 FinalizeIouring();
479 retry = true;
480 merge_async_ = false;
481 }
482 }
483
484 // Check if we need to fallback and retry the merge
485 //
486 // If the device doesn't support async merge, we
487 // will directly enter here (aka devices with 4.x kernels)
488 const bool sync_merge_required = (retry || !merge_async_);
489
490 if (sync_merge_required) {
491 ordered_ops_merge_status = SyncMerge();
492 if (!ordered_ops_merge_status) {
493 // Merge failed. Device will continue to be mounted
494 // off snapshots; merge will be retried during
495 // next reboot
496 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
497 snapuserd_->MergeFailed();
498 return false;
499 }
500 }
501
502 // Replace and Zero ops
503 if (!MergeReplaceZeroOps()) {
504 SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
505 snapuserd_->MergeFailed();
506 return false;
507 }
508
509 snapuserd_->MergeCompleted();
510
511 return true;
512 }
513
InitializeIouring()514 bool Worker::InitializeIouring() {
515 if (!snapuserd_->IsIouringSupported()) {
516 return false;
517 }
518
519 ring_ = std::make_unique<struct io_uring>();
520
521 int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
522 if (ret) {
523 LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
524 return false;
525 }
526
527 merge_async_ = true;
528
529 LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
530 return true;
531 }
532
FinalizeIouring()533 void Worker::FinalizeIouring() {
534 if (merge_async_) {
535 io_uring_queue_exit(ring_.get());
536 }
537 }
538
RunMergeThread()539 bool Worker::RunMergeThread() {
540 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
541 if (!snapuserd_->WaitForMergeBegin()) {
542 SNAP_LOG(ERROR) << "Merge terminated early...";
543 return true;
544 }
545
546 if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
547 SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
548 }
549
550 SNAP_LOG(INFO) << "Merge starting..";
551
552 if (!Init()) {
553 SNAP_LOG(ERROR) << "Merge thread initialization failed...";
554 snapuserd_->MergeFailed();
555 return false;
556 }
557
558 InitializeIouring();
559
560 if (!Merge()) {
561 return false;
562 }
563
564 FinalizeIouring();
565 CloseFds();
566 reader_->CloseCowFd();
567
568 SNAP_LOG(INFO) << "Snapshot-Merge completed";
569
570 return true;
571 }
572
573 } // namespace snapshot
574 } // namespace android
575