1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_core.h"
18 
19 namespace android {
20 namespace snapshot {
21 
22 using namespace android;
23 using namespace android::dm;
24 using android::base::unique_fd;
25 
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)26 int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
27                          std::vector<const CowOperation*>* replace_zero_vec) {
28     int num_ops = *pending_ops;
29     int nr_consecutive = 0;
30     bool checkOrderedOp = (replace_zero_vec == nullptr);
31 
32     do {
33         if (!cowop_iter_->Done() && num_ops) {
34             const CowOperation* cow_op = &cowop_iter_->Get();
35             if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
36                 break;
37             }
38 
39             *source_offset = cow_op->new_block * BLOCK_SZ;
40             if (!checkOrderedOp) {
41                 replace_zero_vec->push_back(cow_op);
42             }
43 
44             cowop_iter_->Next();
45             num_ops -= 1;
46             nr_consecutive = 1;
47 
48             while (!cowop_iter_->Done() && num_ops) {
49                 const CowOperation* op = &cowop_iter_->Get();
50                 if (checkOrderedOp && !IsOrderedOp(*op)) {
51                     break;
52                 }
53 
54                 uint64_t next_offset = op->new_block * BLOCK_SZ;
55                 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
56                     break;
57                 }
58 
59                 if (!checkOrderedOp) {
60                     replace_zero_vec->push_back(op);
61                 }
62 
63                 nr_consecutive += 1;
64                 num_ops -= 1;
65                 cowop_iter_->Next();
66             }
67         }
68     } while (0);
69 
70     return nr_consecutive;
71 }
72 
MergeReplaceZeroOps()73 bool Worker::MergeReplaceZeroOps() {
74     // Flush after merging 2MB. Since all ops are independent and there is no
75     // dependency between COW ops, we will flush the data and the number
76     // of ops merged in COW block device. If there is a crash, we will
77     // end up replaying some of the COW ops which were already merged. That is
78     // ok.
79     //
80     // Although increasing this greater than 2MB may help in improving merge
81     // times; however, on devices with low memory, this can be problematic
82     // when there are multiple merge threads in parallel.
83     int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2;
84     int num_ops_merged = 0;
85 
86     SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
87 
88     while (!cowop_iter_->Done()) {
89         int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
90         std::vector<const CowOperation*> replace_zero_vec;
91         uint64_t source_offset;
92 
93         int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
94         if (linear_blocks == 0) {
95             // Merge complete
96             CHECK(cowop_iter_->Done());
97             break;
98         }
99 
100         for (size_t i = 0; i < replace_zero_vec.size(); i++) {
101             const CowOperation* cow_op = replace_zero_vec[i];
102             if (cow_op->type == kCowReplaceOp) {
103                 if (!ProcessReplaceOp(cow_op)) {
104                     SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
105                     return false;
106                 }
107             } else {
108                 CHECK(cow_op->type == kCowZeroOp);
109                 if (!ProcessZeroOp()) {
110                     SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
111                     return false;
112                 }
113             }
114 
115             bufsink_.UpdateBufferOffset(BLOCK_SZ);
116         }
117 
118         size_t io_size = linear_blocks * BLOCK_SZ;
119 
120         // Merge - Write the contents back to base device
121         int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
122                                             io_size, source_offset));
123         if (ret < 0 || ret != io_size) {
124             SNAP_LOG(ERROR)
125                     << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
126                     << " at offset: " << source_offset << " io_size: " << io_size;
127             return false;
128         }
129 
130         num_ops_merged += linear_blocks;
131 
132         if (num_ops_merged >= total_ops_merged_per_commit) {
133             // Flush the data
134             if (fsync(base_path_merge_fd_.get()) < 0) {
135                 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
136                 return false;
137             }
138 
139             // Track the merge completion
140             if (!snapuserd_->CommitMerge(num_ops_merged)) {
141                 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
142                 return false;
143             }
144 
145             num_ops_merged = 0;
146         }
147 
148         bufsink_.ResetBufferOffset();
149 
150         if (snapuserd_->IsIOTerminated()) {
151             SNAP_LOG(ERROR)
152                     << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
153             return false;
154         }
155     }
156 
157     // Any left over ops not flushed yet.
158     if (num_ops_merged) {
159         // Flush the data
160         if (fsync(base_path_merge_fd_.get()) < 0) {
161             SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
162             return false;
163         }
164 
165         if (!snapuserd_->CommitMerge(num_ops_merged)) {
166             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
167             return false;
168         }
169 
170         num_ops_merged = 0;
171     }
172 
173     return true;
174 }
175 
MergeOrderedOpsAsync()176 bool Worker::MergeOrderedOpsAsync() {
177     void* mapped_addr = snapuserd_->GetMappedAddr();
178     void* read_ahead_buffer =
179             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
180 
181     SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
182 
183     while (!cowop_iter_->Done()) {
184         const CowOperation* cow_op = &cowop_iter_->Get();
185         if (!IsOrderedOp(*cow_op)) {
186             break;
187         }
188 
189         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
190         // Wait for RA thread to notify that the merge window
191         // is ready for merging.
192         if (!snapuserd_->WaitForMergeBegin()) {
193             return false;
194         }
195 
196         snapuserd_->SetMergeInProgress(ra_block_index_);
197 
198         loff_t offset = 0;
199         int num_ops = snapuserd_->GetTotalBlocksToMerge();
200 
201         int pending_sqe = queue_depth_;
202         int pending_ios_to_submit = 0;
203         bool flush_required = false;
204         blocks_merged_in_group_ = 0;
205 
206         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
207         while (num_ops) {
208             uint64_t source_offset;
209 
210             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
211 
212             if (linear_blocks != 0) {
213                 size_t io_size = (linear_blocks * BLOCK_SZ);
214 
215                 // Get an SQE entry from the ring and populate the I/O variables
216                 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
217                 if (!sqe) {
218                     SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
219                     return false;
220                 }
221 
222                 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
223                                     (char*)read_ahead_buffer + offset, io_size, source_offset);
224 
225                 offset += io_size;
226                 num_ops -= linear_blocks;
227                 blocks_merged_in_group_ += linear_blocks;
228 
229                 pending_sqe -= 1;
230                 pending_ios_to_submit += 1;
231                 // These flags are important - We need to make sure that the
232                 // blocks are linked and are written in the same order as
233                 // populated. This is because of overlapping block writes.
234                 //
235                 // If there are no dependency, we can optimize this further by
236                 // allowing parallel writes; but for now, just link all the SQ
237                 // entries.
238                 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
239             }
240 
241             // Ring is full or no more COW ops to be merged in this batch
242             if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
243                 // If this is a last set of COW ops to be merged in this batch, we need
244                 // to sync the merged data. We will try to grab an SQE entry
245                 // and set the FSYNC command; additionally, make sure that
246                 // the fsync is done after all the I/O operations queued
247                 // in the ring is completed by setting IOSQE_IO_DRAIN.
248                 //
249                 // If there is no space in the ring, we will flush it later
250                 // by explicitly calling fsync() system call.
251                 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
252                     if (pending_sqe != 0) {
253                         struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
254                         if (!sqe) {
255                             // very unlikely but let's continue and not fail the
256                             // merge - we will flush it later
257                             SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
258                             flush_required = true;
259                         } else {
260                             io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
261                             // Drain the queue before fsync
262                             io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
263                             pending_sqe -= 1;
264                             flush_required = false;
265                             pending_ios_to_submit += 1;
266                             sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
267                         }
268                     } else {
269                         flush_required = true;
270                     }
271                 }
272 
273                 // Submit the IO for all the COW ops in a single syscall
274                 int ret = io_uring_submit(ring_.get());
275                 if (ret != pending_ios_to_submit) {
276                     SNAP_PLOG(ERROR)
277                             << "io_uring_submit failed for read-ahead: "
278                             << " io submit: " << ret << " expected: " << pending_ios_to_submit;
279                     return false;
280                 }
281 
282                 int pending_ios_to_complete = pending_ios_to_submit;
283                 pending_ios_to_submit = 0;
284 
285                 bool status = true;
286 
287                 // Reap I/O completions
288                 while (pending_ios_to_complete) {
289                     struct io_uring_cqe* cqe;
290 
291                     // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
292                     // these error codes are not truly I/O errors; we can retry them
293                     // by re-populating the SQE entries and submitting the I/O
294                     // request back. However, we don't do that now; instead we
295                     // will fallback to synchronous I/O.
296                     ret = io_uring_wait_cqe(ring_.get(), &cqe);
297                     if (ret) {
298                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret;
299                         status = false;
300                         break;
301                     }
302 
303                     if (cqe->res < 0) {
304                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
305                         status = false;
306                         break;
307                     }
308 
309                     io_uring_cqe_seen(ring_.get(), cqe);
310                     pending_ios_to_complete -= 1;
311                 }
312 
313                 if (!status) {
314                     return false;
315                 }
316 
317                 pending_sqe = queue_depth_;
318             }
319 
320             if (linear_blocks == 0) {
321                 break;
322             }
323         }
324 
325         // Verify all ops are merged
326         CHECK(num_ops == 0);
327 
328         // Flush the data
329         if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
330             SNAP_LOG(ERROR) << " Failed to fsync merged data";
331             return false;
332         }
333 
334         // Merge is done and data is on disk. Update the COW Header about
335         // the merge completion
336         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
337             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
338             return false;
339         }
340 
341         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
342 
343         // Mark the block as merge complete
344         snapuserd_->SetMergeCompleted(ra_block_index_);
345 
346         // Notify RA thread that the merge thread is ready to merge the next
347         // window
348         snapuserd_->NotifyRAForMergeReady();
349 
350         // Get the next block
351         ra_block_index_ += 1;
352     }
353 
354     return true;
355 }
356 
MergeOrderedOps()357 bool Worker::MergeOrderedOps() {
358     void* mapped_addr = snapuserd_->GetMappedAddr();
359     void* read_ahead_buffer =
360             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
361 
362     SNAP_LOG(INFO) << "MergeOrderedOps started....";
363 
364     while (!cowop_iter_->Done()) {
365         const CowOperation* cow_op = &cowop_iter_->Get();
366         if (!IsOrderedOp(*cow_op)) {
367             break;
368         }
369 
370         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
371         // Wait for RA thread to notify that the merge window
372         // is ready for merging.
373         if (!snapuserd_->WaitForMergeBegin()) {
374             snapuserd_->SetMergeFailed(ra_block_index_);
375             return false;
376         }
377 
378         snapuserd_->SetMergeInProgress(ra_block_index_);
379 
380         loff_t offset = 0;
381         int num_ops = snapuserd_->GetTotalBlocksToMerge();
382         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
383         while (num_ops) {
384             uint64_t source_offset;
385 
386             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
387             if (linear_blocks == 0) {
388                 break;
389             }
390 
391             size_t io_size = (linear_blocks * BLOCK_SZ);
392             // Write to the base device. Data is already in the RA buffer. Note
393             // that XOR ops is already handled by the RA thread. We just write
394             // the contents out.
395             int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
396                                                 (char*)read_ahead_buffer + offset, io_size,
397                                                 source_offset));
398             if (ret < 0 || ret != io_size) {
399                 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
400                                 << " at offset: " << source_offset << " io_size: " << io_size;
401                 snapuserd_->SetMergeFailed(ra_block_index_);
402                 return false;
403             }
404 
405             offset += io_size;
406             num_ops -= linear_blocks;
407         }
408 
409         // Verify all ops are merged
410         CHECK(num_ops == 0);
411 
412         // Flush the data
413         if (fsync(base_path_merge_fd_.get()) < 0) {
414             SNAP_LOG(ERROR) << " Failed to fsync merged data";
415             snapuserd_->SetMergeFailed(ra_block_index_);
416             return false;
417         }
418 
419         // Merge is done and data is on disk. Update the COW Header about
420         // the merge completion
421         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
422             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
423             snapuserd_->SetMergeFailed(ra_block_index_);
424             return false;
425         }
426 
427         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
428         // Mark the block as merge complete
429         snapuserd_->SetMergeCompleted(ra_block_index_);
430 
431         // Notify RA thread that the merge thread is ready to merge the next
432         // window
433         snapuserd_->NotifyRAForMergeReady();
434 
435         // Get the next block
436         ra_block_index_ += 1;
437     }
438 
439     return true;
440 }
441 
AsyncMerge()442 bool Worker::AsyncMerge() {
443     if (!MergeOrderedOpsAsync()) {
444         SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
445         // Reset the iter so that we retry the merge
446         while (blocks_merged_in_group_ && !cowop_iter_->RDone()) {
447             cowop_iter_->Prev();
448             blocks_merged_in_group_ -= 1;
449         }
450 
451         return false;
452     }
453 
454     SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
455     return true;
456 }
457 
SyncMerge()458 bool Worker::SyncMerge() {
459     if (!MergeOrderedOps()) {
460         SNAP_LOG(ERROR) << "Merge failed for ordered ops";
461         return false;
462     }
463 
464     SNAP_LOG(INFO) << "MergeOrderedOps completed";
465     return true;
466 }
467 
Merge()468 bool Worker::Merge() {
469     cowop_iter_ = reader_->GetOpIter(true);
470 
471     bool retry = false;
472     bool ordered_ops_merge_status;
473 
474     // Start Async Merge
475     if (merge_async_) {
476         ordered_ops_merge_status = AsyncMerge();
477         if (!ordered_ops_merge_status) {
478             FinalizeIouring();
479             retry = true;
480             merge_async_ = false;
481         }
482     }
483 
484     // Check if we need to fallback and retry the merge
485     //
486     // If the device doesn't support async merge, we
487     // will directly enter here (aka devices with 4.x kernels)
488     const bool sync_merge_required = (retry || !merge_async_);
489 
490     if (sync_merge_required) {
491         ordered_ops_merge_status = SyncMerge();
492         if (!ordered_ops_merge_status) {
493             // Merge failed. Device will continue to be mounted
494             // off snapshots; merge will be retried during
495             // next reboot
496             SNAP_LOG(ERROR) << "Merge failed for ordered ops";
497             snapuserd_->MergeFailed();
498             return false;
499         }
500     }
501 
502     // Replace and Zero ops
503     if (!MergeReplaceZeroOps()) {
504         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
505         snapuserd_->MergeFailed();
506         return false;
507     }
508 
509     snapuserd_->MergeCompleted();
510 
511     return true;
512 }
513 
InitializeIouring()514 bool Worker::InitializeIouring() {
515     if (!snapuserd_->IsIouringSupported()) {
516         return false;
517     }
518 
519     ring_ = std::make_unique<struct io_uring>();
520 
521     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
522     if (ret) {
523         LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
524         return false;
525     }
526 
527     merge_async_ = true;
528 
529     LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
530     return true;
531 }
532 
FinalizeIouring()533 void Worker::FinalizeIouring() {
534     if (merge_async_) {
535         io_uring_queue_exit(ring_.get());
536     }
537 }
538 
RunMergeThread()539 bool Worker::RunMergeThread() {
540     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
541     if (!snapuserd_->WaitForMergeBegin()) {
542         SNAP_LOG(ERROR) << "Merge terminated early...";
543         return true;
544     }
545 
546     if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
547         SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
548     }
549 
550     SNAP_LOG(INFO) << "Merge starting..";
551 
552     if (!Init()) {
553         SNAP_LOG(ERROR) << "Merge thread initialization failed...";
554         snapuserd_->MergeFailed();
555         return false;
556     }
557 
558     InitializeIouring();
559 
560     if (!Merge()) {
561         return false;
562     }
563 
564     FinalizeIouring();
565     CloseFds();
566     reader_->CloseCowFd();
567 
568     SNAP_LOG(INFO) << "Snapshot-Merge completed";
569 
570     return true;
571 }
572 
573 }  // namespace snapshot
574 }  // namespace android
575