1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd.h"
18 
19 #include <csignal>
20 #include <optional>
21 #include <set>
22 
23 #include <libsnapshot/snapuserd_client.h>
24 
25 namespace android {
26 namespace snapshot {
27 
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31 
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34 
Initialize(size_t size)35 void BufferSink::Initialize(size_t size) {
36     buffer_size_ = size;
37     buffer_offset_ = 0;
38     buffer_ = std::make_unique<uint8_t[]>(size);
39 }
40 
GetPayloadBuffer(size_t size)41 void* BufferSink::GetPayloadBuffer(size_t size) {
42     if ((buffer_size_ - buffer_offset_) < size) return nullptr;
43 
44     char* buffer = reinterpret_cast<char*>(GetBufPtr());
45     struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
46     return (char*)msg->payload.buf + buffer_offset_;
47 }
48 
GetBuffer(size_t requested,size_t * actual)49 void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
50     void* buf = GetPayloadBuffer(requested);
51     if (!buf) {
52         *actual = 0;
53         return nullptr;
54     }
55     *actual = requested;
56     return buf;
57 }
58 
GetHeaderPtr()59 struct dm_user_header* BufferSink::GetHeaderPtr() {
60     if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
61         return nullptr;
62     }
63     char* buf = reinterpret_cast<char*>(GetBufPtr());
64     struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
65     return header;
66 }
67 
GetPayloadBufPtr()68 void* BufferSink::GetPayloadBufPtr() {
69     char* buffer = reinterpret_cast<char*>(GetBufPtr());
70     struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
71     return msg->payload.buf;
72 }
73 
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)74 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
75                            const std::string& control_device, const std::string& misc_name,
76                            std::shared_ptr<Snapuserd> snapuserd) {
77     cow_device_ = cow_device;
78     backing_store_device_ = backing_device;
79     control_device_ = control_device;
80     misc_name_ = misc_name;
81     snapuserd_ = snapuserd;
82     exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
83 }
84 
InitializeFds()85 bool WorkerThread::InitializeFds() {
86     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
87     if (backing_store_fd_ < 0) {
88         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
89         return false;
90     }
91 
92     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
93     if (cow_fd_ < 0) {
94         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
95         return false;
96     }
97 
98     ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
99     if (ctrl_fd_ < 0) {
100         SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
101         return false;
102     }
103 
104     return true;
105 }
106 
InitReader()107 bool WorkerThread::InitReader() {
108     reader_ = std::make_unique<CowReader>();
109     if (!reader_->InitForMerge(std::move(cow_fd_))) {
110         return false;
111     }
112 
113     return true;
114 }
115 
116 // Construct kernel COW header in memory
117 // This header will be in sector 0. The IO
118 // request will always be 4k. After constructing
119 // the header, zero out the remaining block.
ConstructKernelCowHeader()120 void WorkerThread::ConstructKernelCowHeader() {
121     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
122 
123     memset(buffer, 0, BLOCK_SZ);
124 
125     struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
126 
127     dh->magic = SNAP_MAGIC;
128     dh->valid = SNAPSHOT_VALID;
129     dh->version = SNAPSHOT_DISK_VERSION;
130     dh->chunk_size = CHUNK_SIZE;
131 }
132 
133 // Start the replace operation. This will read the
134 // internal COW format and if the block is compressed,
135 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)136 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
137     if (!reader_->ReadData(*cow_op, &bufsink_)) {
138         SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
139         return false;
140     }
141 
142     return true;
143 }
144 
ReadFromBaseDevice(const CowOperation * cow_op)145 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
146     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
147     if (buffer == nullptr) {
148         SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
149         return false;
150     }
151     SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
152                     << " Source: " << cow_op->source;
153     if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
154                                           cow_op->source * BLOCK_SZ)) {
155         SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
156                          << "at block :" << cow_op->source;
157         return false;
158     }
159 
160     return true;
161 }
162 
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)163 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
164     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
165     if (buffer == nullptr) {
166         SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
167         return false;
168     }
169 
170     if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
171         return false;
172     }
173 
174     return true;
175 }
176 
177 // Start the copy operation. This will read the backing
178 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)179 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
180     if (!GetReadAheadPopulatedBuffer(cow_op)) {
181         SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
182                         << " new_block: " << cow_op->new_block;
183         if (!ReadFromBaseDevice(cow_op)) {
184             return false;
185         }
186     }
187 
188     return true;
189 }
190 
ProcessZeroOp()191 bool WorkerThread::ProcessZeroOp() {
192     // Zero out the entire block
193     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
194     if (buffer == nullptr) {
195         SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
196         return false;
197     }
198 
199     memset(buffer, 0, BLOCK_SZ);
200     return true;
201 }
202 
ProcessCowOp(const CowOperation * cow_op)203 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
204     if (cow_op == nullptr) {
205         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
206         return false;
207     }
208 
209     switch (cow_op->type) {
210         case kCowReplaceOp: {
211             return ProcessReplaceOp(cow_op);
212         }
213 
214         case kCowZeroOp: {
215             return ProcessZeroOp();
216         }
217 
218         case kCowCopyOp: {
219             return ProcessCopyOp(cow_op);
220         }
221 
222         default: {
223             SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
224         }
225     }
226     return false;
227 }
228 
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)229 int WorkerThread::ReadUnalignedSector(
230         sector_t sector, size_t size,
231         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
232     size_t skip_sector_size = 0;
233 
234     SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
235                     << " Aligned sector: " << it->first;
236 
237     if (!ProcessCowOp(it->second)) {
238         SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
239                         << " Aligned sector: " << it->first;
240         return -1;
241     }
242 
243     int num_sectors_skip = sector - it->first;
244 
245     if (num_sectors_skip > 0) {
246         skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
247         char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
248         struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
249 
250         if (skip_sector_size == BLOCK_SZ) {
251             SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
252                             << " Base-sector: " << it->first;
253             return -1;
254         }
255 
256         memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
257                 (BLOCK_SZ - skip_sector_size));
258     }
259 
260     bufsink_.ResetBufferOffset();
261     return std::min(size, (BLOCK_SZ - skip_sector_size));
262 }
263 
264 /*
265  * Read the data for a given COW Operation.
266  *
267  * Kernel can issue IO at a sector granularity.
268  * Hence, an IO may end up with reading partial
269  * data from a COW operation or we may also
270  * end up with interspersed request between
271  * two COW operations.
272  *
273  */
ReadData(sector_t sector,size_t size)274 int WorkerThread::ReadData(sector_t sector, size_t size) {
275     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
276     std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
277     /*
278      * chunk_map stores COW operation at 4k granularity.
279      * If the requested IO with the sector falls on the 4k
280      * boundary, then we can read the COW op directly without
281      * any issue.
282      *
283      * However, if the requested sector is not 4K aligned,
284      * then we will have the find the nearest COW operation
285      * and chop the 4K block to fetch the requested sector.
286      */
287     it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
288                           Snapuserd::compare);
289 
290     bool read_end_of_device = false;
291     if (it == chunk_vec.end()) {
292         // |-------|-------|-------|
293         // 0       1       2       3
294         //
295         // Block 0 - op 1
296         // Block 1 - op 2
297         // Block 2 - op 3
298         //
299         // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
300         //
301         // Each block is 4k bytes. Thus, the last block will span 8 sectors
302         // ranging till block 3 (However, block 3 won't be in chunk_vec as
303         // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
304         // spanning between block 2 and block 3, we need to step back
305         // and get hold of the last element.
306         //
307         // Additionally, dm-snapshot makes sure that I/O request beyond block 3
308         // will not be routed to the daemon. Hence, it is safe to assume that
309         // if a sector is not available in the chunk_vec, the I/O falls in the
310         // end of region.
311         it = std::prev(chunk_vec.end());
312         read_end_of_device = true;
313     }
314 
315     // We didn't find the required sector; hence find the previous sector
316     // as lower_bound will gives us the value greater than
317     // the requested sector
318     if (it->first != sector) {
319         if (it != chunk_vec.begin() && !read_end_of_device) {
320             --it;
321         }
322 
323         /*
324          * If the IO is spanned between two COW operations,
325          * split the IO into two parts:
326          *
327          * 1: Read the first part from the single COW op
328          * 2: Read the second part from the next COW op.
329          *
330          * Ex: Let's say we have a 1024 Bytes IO request.
331          *
332          * 0       COW OP-1  4096     COW OP-2  8192
333          * |******************|*******************|
334          *              |*****|*****|
335          *           3584           4608
336          *              <- 1024B - >
337          *
338          * We have two COW operations which are 4k blocks.
339          * The IO is requested for 1024 Bytes which are spanned
340          * between two COW operations. We will split this IO
341          * into two parts:
342          *
343          * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
344          * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
345          */
346         return ReadUnalignedSector(sector, size, it);
347     }
348 
349     int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
350     sector_t read_sector = sector;
351     while (num_ops) {
352         // We have to make sure that the reads are
353         // sequential; there shouldn't be a data
354         // request merged with a metadata IO.
355         if (it->first != read_sector) {
356             SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
357                             << " cow-op sector: " << it->first;
358             return -1;
359         } else if (!ProcessCowOp(it->second)) {
360             return -1;
361         }
362         num_ops -= 1;
363         read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
364 
365         it++;
366 
367         if (it == chunk_vec.end() && num_ops) {
368             SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
369                             << " COW ops completed; pending read-request: " << num_ops;
370             return -1;
371         }
372         // Update the buffer offset
373         bufsink_.UpdateBufferOffset(BLOCK_SZ);
374     }
375 
376     // Reset the buffer offset
377     bufsink_.ResetBufferOffset();
378     return size;
379 }
380 
381 /*
382  * dm-snap does prefetch reads while reading disk-exceptions.
383  * By default, prefetch value is set to 12; this means that
384  * dm-snap will issue 12 areas wherein each area is a 4k page
385  * of disk-exceptions.
386  *
387  * If during prefetch, if the chunk-id seen is beyond the
388  * actual number of metadata page, fill the buffer with zero.
389  * When dm-snap starts parsing the buffer, it will stop
390  * reading metadata page once the buffer content is zero.
391  */
ZerofillDiskExceptions(size_t read_size)392 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
393     size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
394 
395     if (read_size > size) {
396         return false;
397     }
398 
399     void* buffer = bufsink_.GetPayloadBuffer(size);
400     if (buffer == nullptr) {
401         SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
402         return false;
403     }
404 
405     memset(buffer, 0, size);
406     return true;
407 }
408 
409 /*
410  * A disk exception is a simple mapping of old_chunk to new_chunk.
411  * When dm-snapshot device is created, kernel requests these mapping.
412  *
413  * Each disk exception is of size 16 bytes. Thus a single 4k page can
414  * have:
415  *
416  * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
417  * is considered a metadata page and it is represented by chunk ID.
418  *
419  * Convert the chunk ID to index into the vector which gives us
420  * the metadata page.
421  */
ReadDiskExceptions(chunk_t chunk,size_t read_size)422 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
423     uint32_t stride = exceptions_per_area_ + 1;
424     size_t size;
425     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
426 
427     // ChunkID to vector index
428     lldiv_t divresult = lldiv(chunk, stride);
429 
430     if (divresult.quot < vec.size()) {
431         size = exceptions_per_area_ * sizeof(struct disk_exception);
432 
433         if (read_size != size) {
434             SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
435                             << " does not match with size: " << size;
436             return false;
437         }
438 
439         void* buffer = bufsink_.GetPayloadBuffer(size);
440         if (buffer == nullptr) {
441             SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
442             return false;
443         }
444 
445         memcpy(buffer, vec[divresult.quot].get(), size);
446     } else {
447         return ZerofillDiskExceptions(read_size);
448     }
449 
450     return true;
451 }
452 
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)453 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
454                                          int* unmerged_exceptions) {
455     loff_t offset = 0;
456     *unmerged_exceptions = 0;
457 
458     while (*unmerged_exceptions <= exceptions_per_area_) {
459         struct disk_exception* merged_de =
460                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
461         struct disk_exception* cow_de =
462                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
463 
464         // Unmerged op by the kernel
465         if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
466             if (!(merged_de->old_chunk == cow_de->old_chunk)) {
467                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
468                                 << merged_de->old_chunk
469                                 << "cow_de->old_chunk: " << cow_de->old_chunk;
470                 return -1;
471             }
472 
473             if (!(merged_de->new_chunk == cow_de->new_chunk)) {
474                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
475                                 << merged_de->new_chunk
476                                 << "cow_de->new_chunk: " << cow_de->new_chunk;
477                 return -1;
478             }
479 
480             offset += sizeof(struct disk_exception);
481             *unmerged_exceptions += 1;
482             continue;
483         }
484 
485         break;
486     }
487 
488     SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
489     return offset;
490 }
491 
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * copy_op,bool * commit)492 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
493                                        int unmerged_exceptions, bool* copy_op, bool* commit) {
494     int merged_ops_cur_iter = 0;
495     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
496     *copy_op = false;
497     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
498 
499     // Find the operations which are merged in this cycle.
500     while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
501         struct disk_exception* merged_de =
502                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
503         struct disk_exception* cow_de =
504                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
505 
506         if (!(merged_de->new_chunk == 0)) {
507             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
508             return -1;
509         }
510 
511         if (!(merged_de->old_chunk == 0)) {
512             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
513             return -1;
514         }
515 
516         if (cow_de->new_chunk != 0) {
517             merged_ops_cur_iter += 1;
518             offset += sizeof(struct disk_exception);
519             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
520                                        std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
521                                        Snapuserd::compare);
522 
523             if (!(it != chunk_vec.end())) {
524                 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
525                 return -1;
526             }
527 
528             if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
529                 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
530                 return -1;
531             }
532             const CowOperation* cow_op = it->second;
533 
534             if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) {
535                 *copy_op = true;
536                 // Every single copy operation has to come from read-ahead
537                 // cache.
538                 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
539                     SNAP_LOG(ERROR)
540                             << " Block: " << cow_op->new_block << " not found in read-ahead cache"
541                             << " Source: " << cow_op->source;
542                     return -1;
543                 }
544                 // If this is a final block merged in the read-ahead buffer
545                 // region, notify the read-ahead thread to make forward
546                 // progress
547                 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
548                     *commit = true;
549                 }
550             }
551 
552             // zero out to indicate that operation is merged.
553             cow_de->old_chunk = 0;
554             cow_de->new_chunk = 0;
555         } else if (cow_de->old_chunk == 0) {
556             // Already merged op in previous iteration or
557             // This could also represent a partially filled area.
558             //
559             // If the op was merged in previous cycle, we don't have
560             // to count them.
561             break;
562         } else {
563             SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
564                             << " merged_de-old-chunk: " << merged_de->old_chunk
565                             << " merged_de-new-chunk: " << merged_de->new_chunk
566                             << " cow_de-old-chunk: " << cow_de->old_chunk
567                             << " cow_de-new-chunk: " << cow_de->new_chunk
568                             << " unmerged_exceptions: " << unmerged_exceptions
569                             << " merged_ops_cur_iter: " << merged_ops_cur_iter
570                             << " offset: " << offset;
571             return -1;
572         }
573     }
574     return merged_ops_cur_iter;
575 }
576 
ProcessMergeComplete(chunk_t chunk,void * buffer)577 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
578     uint32_t stride = exceptions_per_area_ + 1;
579     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
580     bool copy_op = false;
581     bool commit = false;
582 
583     // ChunkID to vector index
584     lldiv_t divresult = lldiv(chunk, stride);
585 
586     if (!(divresult.quot < vec.size())) {
587         SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
588                         << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
589         return false;
590     }
591 
592     SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
593                     << " Metadata-Index: " << divresult.quot;
594 
595     int unmerged_exceptions = 0;
596     loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
597 
598     if (offset < 0) {
599         SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
600                         << unmerged_exceptions;
601         return false;
602     }
603 
604     int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
605                                                    unmerged_exceptions, &copy_op, &commit);
606 
607     // There should be at least one operation merged in this cycle
608     if (!(merged_ops_cur_iter > 0)) {
609         SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
610         return false;
611     }
612 
613     if (copy_op) {
614         if (commit) {
615             // Push the flushing logic to read-ahead thread so that merge thread
616             // can make forward progress. Sync will happen in the background
617             snapuserd_->StartReadAhead();
618         }
619     } else {
620         // Non-copy ops and all ops in older COW format
621         if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
622             SNAP_LOG(ERROR) << "CommitMerge failed...";
623             return false;
624         }
625     }
626 
627     SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
628     return true;
629 }
630 
631 // Read Header from dm-user misc device. This gives
632 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()633 bool WorkerThread::ReadDmUserHeader() {
634     if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
635         if (errno != ENOTBLK) {
636             SNAP_PLOG(ERROR) << "Control-read failed";
637         }
638 
639         return false;
640     }
641 
642     return true;
643 }
644 
645 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)646 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
647     size_t payload_size = size;
648     void* buf = bufsink_.GetPayloadBufPtr();
649     if (header_response) {
650         payload_size += sizeof(struct dm_user_header);
651         buf = bufsink_.GetBufPtr();
652     }
653 
654     if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
655         SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
656         return false;
657     }
658 
659     return true;
660 }
661 
ReadDmUserPayload(void * buffer,size_t size)662 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
663     if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
664         SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
665         return false;
666     }
667 
668     return true;
669 }
670 
DmuserWriteRequest()671 bool WorkerThread::DmuserWriteRequest() {
672     struct dm_user_header* header = bufsink_.GetHeaderPtr();
673 
674     // device mapper has the capability to allow
675     // targets to flush the cache when writes are completed. This
676     // is controlled by each target by a flag "flush_supported".
677     // This flag is set by dm-user. When flush is supported,
678     // a number of zero-length bio's will be submitted to
679     // the target for the purpose of flushing cache. It is the
680     // responsibility of the target driver - which is dm-user in this
681     // case, to remap these bio's to the underlying device. Since,
682     // there is no underlying device for dm-user, this zero length
683     // bio's gets routed to daemon.
684     //
685     // Flush operations are generated post merge by dm-snap by having
686     // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
687     // to flush per se; hence, just respond back with a success message.
688     if (header->sector == 0) {
689         if (!(header->len == 0)) {
690             SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
691             header->type = DM_USER_RESP_ERROR;
692         } else {
693             header->type = DM_USER_RESP_SUCCESS;
694         }
695 
696         if (!WriteDmUserPayload(0, true)) {
697             return false;
698         }
699         return true;
700     }
701 
702     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
703     size_t remaining_size = header->len;
704     size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
705 
706     chunk_t chunk = SectorToChunk(header->sector);
707     auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
708                                std::make_pair(header->sector, nullptr), Snapuserd::compare);
709 
710     bool not_found = (it == chunk_vec.end() || it->first != header->sector);
711 
712     if (not_found) {
713         void* buffer = bufsink_.GetPayloadBuffer(read_size);
714         if (buffer == nullptr) {
715             SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
716                             << read_size;
717             header->type = DM_USER_RESP_ERROR;
718         } else {
719             header->type = DM_USER_RESP_SUCCESS;
720 
721             if (!ReadDmUserPayload(buffer, read_size)) {
722                 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
723                                 << "Sector: " << header->sector;
724                 header->type = DM_USER_RESP_ERROR;
725             }
726 
727             if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
728                 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
729                                 << "Sector: " << header->sector;
730                 header->type = DM_USER_RESP_ERROR;
731             }
732         }
733     } else {
734         SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
735         header->type = DM_USER_RESP_ERROR;
736     }
737 
738     if (!WriteDmUserPayload(0, true)) {
739         return false;
740     }
741 
742     return true;
743 }
744 
DmuserReadRequest()745 bool WorkerThread::DmuserReadRequest() {
746     struct dm_user_header* header = bufsink_.GetHeaderPtr();
747     size_t remaining_size = header->len;
748     loff_t offset = 0;
749     sector_t sector = header->sector;
750     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
751     bool header_response = true;
752     do {
753         size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
754 
755         int ret = read_size;
756         header->type = DM_USER_RESP_SUCCESS;
757         chunk_t chunk = SectorToChunk(header->sector);
758 
759         // Request to sector 0 is always for kernel
760         // representation of COW header. This IO should be only
761         // once during dm-snapshot device creation. We should
762         // never see multiple IO requests. Additionally this IO
763         // will always be a single 4k.
764         if (header->sector == 0) {
765             if (read_size == BLOCK_SZ) {
766                 ConstructKernelCowHeader();
767                 SNAP_LOG(DEBUG) << "Kernel header constructed";
768             } else {
769                 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
770                 header->type = DM_USER_RESP_ERROR;
771             }
772         } else {
773             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
774                                        std::make_pair(header->sector, nullptr), Snapuserd::compare);
775             bool not_found = (it == chunk_vec.end() || it->first != header->sector);
776             if (!offset && (read_size == BLOCK_SZ) && not_found) {
777                 if (!ReadDiskExceptions(chunk, read_size)) {
778                     SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
779                                     << "Sector: " << header->sector;
780                     header->type = DM_USER_RESP_ERROR;
781                 } else {
782                     SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
783                                     << "Sector: " << header->sector;
784                 }
785             } else {
786                 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
787 
788                 ret = ReadData(sector + num_sectors_read, read_size);
789                 if (ret < 0) {
790                     SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
791                                     << " Sector: " << (sector + num_sectors_read)
792                                     << " size: " << read_size << " header-len: " << header->len;
793                     header->type = DM_USER_RESP_ERROR;
794                 } else {
795                     SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
796                                     << "Sector: " << header->sector;
797                 }
798             }
799         }
800 
801         // Just return the header if it is an error
802         if (header->type == DM_USER_RESP_ERROR) {
803             SNAP_LOG(ERROR) << "IO read request failed...";
804             ret = 0;
805         }
806 
807         if (!header_response) {
808             CHECK(header->type == DM_USER_RESP_SUCCESS)
809                     << " failed for sector: " << sector << " header->len: " << header->len
810                     << " remaining_size: " << remaining_size;
811         }
812 
813         // Daemon will not be terminated if there is any error. We will
814         // just send the error back to dm-user.
815         if (!WriteDmUserPayload(ret, header_response)) {
816             return false;
817         }
818 
819         if (header->type == DM_USER_RESP_ERROR) {
820             break;
821         }
822 
823         remaining_size -= ret;
824         offset += ret;
825         header_response = false;
826     } while (remaining_size > 0);
827 
828     return true;
829 }
830 
InitializeBufsink()831 void WorkerThread::InitializeBufsink() {
832     // Allocate the buffer which is used to communicate between
833     // daemon and dm-user. The buffer comprises of header and a fixed payload.
834     // If the dm-user requests a big IO, the IO will be broken into chunks
835     // of PAYLOAD_SIZE.
836     size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
837     bufsink_.Initialize(buf_size);
838 }
839 
RunThread()840 bool WorkerThread::RunThread() {
841     InitializeBufsink();
842 
843     if (!InitializeFds()) {
844         return false;
845     }
846 
847     if (!InitReader()) {
848         return false;
849     }
850 
851     // Start serving IO
852     while (true) {
853         if (!ProcessIORequest()) {
854             break;
855         }
856     }
857 
858     CloseFds();
859     reader_->CloseCowFd();
860 
861     return true;
862 }
863 
ProcessIORequest()864 bool WorkerThread::ProcessIORequest() {
865     struct dm_user_header* header = bufsink_.GetHeaderPtr();
866 
867     if (!ReadDmUserHeader()) {
868         return false;
869     }
870 
871     SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
872     SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
873     SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
874     SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
875     SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
876 
877     switch (header->type) {
878         case DM_USER_REQ_MAP_READ: {
879             if (!DmuserReadRequest()) {
880                 return false;
881             }
882             break;
883         }
884 
885         case DM_USER_REQ_MAP_WRITE: {
886             if (!DmuserWriteRequest()) {
887                 return false;
888             }
889             break;
890         }
891     }
892 
893     return true;
894 }
895 
896 }  // namespace snapshot
897 }  // namespace android
898