1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd.h"
18 
19 #include <csignal>
20 #include <optional>
21 #include <set>
22 
23 #include <snapuserd/snapuserd_client.h>
24 
25 namespace android {
26 namespace snapshot {
27 
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31 
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34 
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)35 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
36                            const std::string& control_device, const std::string& misc_name,
37                            std::shared_ptr<Snapuserd> snapuserd) {
38     cow_device_ = cow_device;
39     backing_store_device_ = backing_device;
40     control_device_ = control_device;
41     misc_name_ = misc_name;
42     snapuserd_ = snapuserd;
43     exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
44 }
45 
InitializeFds()46 bool WorkerThread::InitializeFds() {
47     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
48     if (backing_store_fd_ < 0) {
49         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
50         return false;
51     }
52 
53     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
54     if (cow_fd_ < 0) {
55         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
56         return false;
57     }
58 
59     ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
60     if (ctrl_fd_ < 0) {
61         SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
62         return false;
63     }
64 
65     return true;
66 }
67 
InitReader()68 bool WorkerThread::InitReader() {
69     reader_ = snapuserd_->CloneReaderForWorker();
70 
71     if (!reader_->InitForMerge(std::move(cow_fd_))) {
72         return false;
73     }
74     return true;
75 }
76 
77 // Construct kernel COW header in memory
78 // This header will be in sector 0. The IO
79 // request will always be 4k. After constructing
80 // the header, zero out the remaining block.
ConstructKernelCowHeader()81 void WorkerThread::ConstructKernelCowHeader() {
82     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
83 
84     memset(buffer, 0, BLOCK_SZ);
85 
86     struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
87 
88     dh->magic = SNAP_MAGIC;
89     dh->valid = SNAPSHOT_VALID;
90     dh->version = SNAPSHOT_DISK_VERSION;
91     dh->chunk_size = CHUNK_SIZE;
92 }
93 
94 // Start the replace operation. This will read the
95 // internal COW format and if the block is compressed,
96 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)97 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
98     if (!reader_->ReadData(*cow_op, &bufsink_)) {
99         SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
100         return false;
101     }
102 
103     return true;
104 }
105 
ReadFromBaseDevice(const CowOperation * cow_op)106 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
107     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
108     if (buffer == nullptr) {
109         SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
110         return false;
111     }
112     SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
113                     << " Source: " << cow_op->source;
114     uint64_t offset = cow_op->source;
115     if (cow_op->type == kCowCopyOp) {
116         offset *= BLOCK_SZ;
117     }
118     if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
119         SNAP_PLOG(ERROR) << "Copy op failed. Read from backing store: " << backing_store_device_
120                          << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
121         return false;
122     }
123 
124     return true;
125 }
126 
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)127 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
128     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
129     if (buffer == nullptr) {
130         SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
131         return false;
132     }
133 
134     if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
135         return false;
136     }
137 
138     return true;
139 }
140 
141 // Start the copy operation. This will read the backing
142 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)143 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
144     if (!GetReadAheadPopulatedBuffer(cow_op)) {
145         SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
146                         << " new_block: " << cow_op->new_block;
147         if (!ReadFromBaseDevice(cow_op)) {
148             return false;
149         }
150     }
151 
152     return true;
153 }
154 
ProcessZeroOp()155 bool WorkerThread::ProcessZeroOp() {
156     // Zero out the entire block
157     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
158     if (buffer == nullptr) {
159         SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
160         return false;
161     }
162 
163     memset(buffer, 0, BLOCK_SZ);
164     return true;
165 }
166 
ProcessCowOp(const CowOperation * cow_op)167 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
168     if (cow_op == nullptr) {
169         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
170         return false;
171     }
172 
173     switch (cow_op->type) {
174         case kCowReplaceOp: {
175             return ProcessReplaceOp(cow_op);
176         }
177 
178         case kCowZeroOp: {
179             return ProcessZeroOp();
180         }
181 
182         case kCowCopyOp: {
183             return ProcessCopyOp(cow_op);
184         }
185 
186         default: {
187             SNAP_LOG(ERROR) << "Unsupported operation-type found: " << cow_op->type;
188         }
189     }
190     return false;
191 }
192 
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)193 int WorkerThread::ReadUnalignedSector(
194         sector_t sector, size_t size,
195         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
196     size_t skip_sector_size = 0;
197 
198     SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
199                     << " Aligned sector: " << it->first;
200 
201     if (!ProcessCowOp(it->second)) {
202         SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
203                         << " Aligned sector: " << it->first;
204         return -1;
205     }
206 
207     int num_sectors_skip = sector - it->first;
208 
209     if (num_sectors_skip > 0) {
210         skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
211         char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
212         struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
213 
214         if (skip_sector_size == BLOCK_SZ) {
215             SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
216                             << " Base-sector: " << it->first;
217             return -1;
218         }
219 
220         memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
221                 (BLOCK_SZ - skip_sector_size));
222     }
223 
224     bufsink_.ResetBufferOffset();
225     return std::min(size, (BLOCK_SZ - skip_sector_size));
226 }
227 
228 /*
229  * Read the data for a given COW Operation.
230  *
231  * Kernel can issue IO at a sector granularity.
232  * Hence, an IO may end up with reading partial
233  * data from a COW operation or we may also
234  * end up with interspersed request between
235  * two COW operations.
236  *
237  */
ReadData(sector_t sector,size_t size)238 int WorkerThread::ReadData(sector_t sector, size_t size) {
239     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
240     std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
241     /*
242      * chunk_map stores COW operation at 4k granularity.
243      * If the requested IO with the sector falls on the 4k
244      * boundary, then we can read the COW op directly without
245      * any issue.
246      *
247      * However, if the requested sector is not 4K aligned,
248      * then we will have the find the nearest COW operation
249      * and chop the 4K block to fetch the requested sector.
250      */
251     it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
252                           Snapuserd::compare);
253 
254     bool read_end_of_device = false;
255     if (it == chunk_vec.end()) {
256         // |-------|-------|-------|
257         // 0       1       2       3
258         //
259         // Block 0 - op 1
260         // Block 1 - op 2
261         // Block 2 - op 3
262         //
263         // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
264         //
265         // Each block is 4k bytes. Thus, the last block will span 8 sectors
266         // ranging till block 3 (However, block 3 won't be in chunk_vec as
267         // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
268         // spanning between block 2 and block 3, we need to step back
269         // and get hold of the last element.
270         //
271         // Additionally, dm-snapshot makes sure that I/O request beyond block 3
272         // will not be routed to the daemon. Hence, it is safe to assume that
273         // if a sector is not available in the chunk_vec, the I/O falls in the
274         // end of region.
275         it = std::prev(chunk_vec.end());
276         read_end_of_device = true;
277     }
278 
279     // We didn't find the required sector; hence find the previous sector
280     // as lower_bound will gives us the value greater than
281     // the requested sector
282     if (it->first != sector) {
283         if (it != chunk_vec.begin() && !read_end_of_device) {
284             --it;
285         }
286 
287         /*
288          * If the IO is spanned between two COW operations,
289          * split the IO into two parts:
290          *
291          * 1: Read the first part from the single COW op
292          * 2: Read the second part from the next COW op.
293          *
294          * Ex: Let's say we have a 1024 Bytes IO request.
295          *
296          * 0       COW OP-1  4096     COW OP-2  8192
297          * |******************|*******************|
298          *              |*****|*****|
299          *           3584           4608
300          *              <- 1024B - >
301          *
302          * We have two COW operations which are 4k blocks.
303          * The IO is requested for 1024 Bytes which are spanned
304          * between two COW operations. We will split this IO
305          * into two parts:
306          *
307          * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
308          * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
309          */
310         return ReadUnalignedSector(sector, size, it);
311     }
312 
313     int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
314     sector_t read_sector = sector;
315     while (num_ops) {
316         // We have to make sure that the reads are
317         // sequential; there shouldn't be a data
318         // request merged with a metadata IO.
319         if (it->first != read_sector) {
320             SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
321                             << " cow-op sector: " << it->first;
322             return -1;
323         } else if (!ProcessCowOp(it->second)) {
324             return -1;
325         }
326         num_ops -= 1;
327         read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
328 
329         it++;
330 
331         if (it == chunk_vec.end() && num_ops) {
332             SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
333                             << " COW ops completed; pending read-request: " << num_ops;
334             return -1;
335         }
336         // Update the buffer offset
337         bufsink_.UpdateBufferOffset(BLOCK_SZ);
338     }
339 
340     // Reset the buffer offset
341     bufsink_.ResetBufferOffset();
342     return size;
343 }
344 
345 /*
346  * dm-snap does prefetch reads while reading disk-exceptions.
347  * By default, prefetch value is set to 12; this means that
348  * dm-snap will issue 12 areas wherein each area is a 4k page
349  * of disk-exceptions.
350  *
351  * If during prefetch, if the chunk-id seen is beyond the
352  * actual number of metadata page, fill the buffer with zero.
353  * When dm-snap starts parsing the buffer, it will stop
354  * reading metadata page once the buffer content is zero.
355  */
ZerofillDiskExceptions(size_t read_size)356 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
357     size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
358 
359     if (read_size > size) {
360         return false;
361     }
362 
363     void* buffer = bufsink_.GetPayloadBuffer(size);
364     if (buffer == nullptr) {
365         SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
366         return false;
367     }
368 
369     memset(buffer, 0, size);
370     return true;
371 }
372 
373 /*
374  * A disk exception is a simple mapping of old_chunk to new_chunk.
375  * When dm-snapshot device is created, kernel requests these mapping.
376  *
377  * Each disk exception is of size 16 bytes. Thus a single 4k page can
378  * have:
379  *
380  * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
381  * is considered a metadata page and it is represented by chunk ID.
382  *
383  * Convert the chunk ID to index into the vector which gives us
384  * the metadata page.
385  */
ReadDiskExceptions(chunk_t chunk,size_t read_size)386 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
387     uint32_t stride = exceptions_per_area_ + 1;
388     size_t size;
389     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
390 
391     // ChunkID to vector index
392     lldiv_t divresult = lldiv(chunk, stride);
393 
394     if (divresult.quot < vec.size()) {
395         size = exceptions_per_area_ * sizeof(struct disk_exception);
396 
397         if (read_size != size) {
398             SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
399                             << " does not match with size: " << size;
400             return false;
401         }
402 
403         void* buffer = bufsink_.GetPayloadBuffer(size);
404         if (buffer == nullptr) {
405             SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
406             return false;
407         }
408 
409         memcpy(buffer, vec[divresult.quot].get(), size);
410     } else {
411         return ZerofillDiskExceptions(read_size);
412     }
413 
414     return true;
415 }
416 
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)417 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
418                                          int* unmerged_exceptions) {
419     loff_t offset = 0;
420     *unmerged_exceptions = 0;
421 
422     while (*unmerged_exceptions <= exceptions_per_area_) {
423         struct disk_exception* merged_de =
424                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
425         struct disk_exception* cow_de =
426                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
427 
428         // Unmerged op by the kernel
429         if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
430             if (!(merged_de->old_chunk == cow_de->old_chunk)) {
431                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
432                                 << merged_de->old_chunk
433                                 << "cow_de->old_chunk: " << cow_de->old_chunk;
434                 return -1;
435             }
436 
437             if (!(merged_de->new_chunk == cow_de->new_chunk)) {
438                 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
439                                 << merged_de->new_chunk
440                                 << "cow_de->new_chunk: " << cow_de->new_chunk;
441                 return -1;
442             }
443 
444             offset += sizeof(struct disk_exception);
445             *unmerged_exceptions += 1;
446             continue;
447         }
448 
449         break;
450     }
451 
452     SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
453     return offset;
454 }
455 
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * ordered_op,bool * commit)456 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
457                                        int unmerged_exceptions, bool* ordered_op, bool* commit) {
458     int merged_ops_cur_iter = 0;
459     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
460     *ordered_op = false;
461     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
462 
463     // Find the operations which are merged in this cycle.
464     while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
465         struct disk_exception* merged_de =
466                 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
467         struct disk_exception* cow_de =
468                 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
469 
470         if (!(merged_de->new_chunk == 0)) {
471             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
472             return -1;
473         }
474 
475         if (!(merged_de->old_chunk == 0)) {
476             SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
477             return -1;
478         }
479 
480         if (cow_de->new_chunk != 0) {
481             merged_ops_cur_iter += 1;
482             offset += sizeof(struct disk_exception);
483             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
484                                        std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
485                                        Snapuserd::compare);
486 
487             if (!(it != chunk_vec.end())) {
488                 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
489                 return -1;
490             }
491 
492             if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
493                 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
494                 return -1;
495             }
496             const CowOperation* cow_op = it->second;
497 
498             if (snapuserd_->IsReadAheadFeaturePresent() && IsOrderedOp(*cow_op)) {
499                 *ordered_op = true;
500                 // Every single ordered operation has to come from read-ahead
501                 // cache.
502                 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
503                     SNAP_LOG(ERROR)
504                             << " Block: " << cow_op->new_block << " not found in read-ahead cache"
505                             << " Source: " << cow_op->source;
506                     return -1;
507                 }
508                 // If this is a final block merged in the read-ahead buffer
509                 // region, notify the read-ahead thread to make forward
510                 // progress
511                 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
512                     *commit = true;
513                 }
514             }
515 
516             // zero out to indicate that operation is merged.
517             cow_de->old_chunk = 0;
518             cow_de->new_chunk = 0;
519         } else if (cow_de->old_chunk == 0) {
520             // Already merged op in previous iteration or
521             // This could also represent a partially filled area.
522             //
523             // If the op was merged in previous cycle, we don't have
524             // to count them.
525             break;
526         } else {
527             SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
528                             << " merged_de-old-chunk: " << merged_de->old_chunk
529                             << " merged_de-new-chunk: " << merged_de->new_chunk
530                             << " cow_de-old-chunk: " << cow_de->old_chunk
531                             << " cow_de-new-chunk: " << cow_de->new_chunk
532                             << " unmerged_exceptions: " << unmerged_exceptions
533                             << " merged_ops_cur_iter: " << merged_ops_cur_iter
534                             << " offset: " << offset;
535             return -1;
536         }
537     }
538     return merged_ops_cur_iter;
539 }
540 
ProcessMergeComplete(chunk_t chunk,void * buffer)541 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
542     uint32_t stride = exceptions_per_area_ + 1;
543     const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
544     bool ordered_op = false;
545     bool commit = false;
546 
547     // ChunkID to vector index
548     lldiv_t divresult = lldiv(chunk, stride);
549 
550     if (!(divresult.quot < vec.size())) {
551         SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
552                         << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
553         return false;
554     }
555 
556     SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
557                     << " Metadata-Index: " << divresult.quot;
558 
559     int unmerged_exceptions = 0;
560     loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
561 
562     if (offset < 0) {
563         SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
564                         << unmerged_exceptions;
565         return false;
566     }
567 
568     int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
569                                                    unmerged_exceptions, &ordered_op, &commit);
570 
571     // There should be at least one operation merged in this cycle
572     if (!(merged_ops_cur_iter > 0)) {
573         SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
574         return false;
575     }
576 
577     if (ordered_op) {
578         if (commit) {
579             // Push the flushing logic to read-ahead thread so that merge thread
580             // can make forward progress. Sync will happen in the background
581             snapuserd_->StartReadAhead();
582         }
583     } else {
584         // Non-copy ops and all ops in older COW format
585         if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
586             SNAP_LOG(ERROR) << "CommitMerge failed...";
587             return false;
588         }
589     }
590 
591     SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
592     return true;
593 }
594 
595 // Read Header from dm-user misc device. This gives
596 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()597 bool WorkerThread::ReadDmUserHeader() {
598     if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
599         if (errno != ENOTBLK) {
600             SNAP_PLOG(ERROR) << "Control-read failed";
601         }
602 
603         return false;
604     }
605 
606     return true;
607 }
608 
609 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)610 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
611     size_t payload_size = size;
612     void* buf = bufsink_.GetPayloadBufPtr();
613     if (header_response) {
614         payload_size += sizeof(struct dm_user_header);
615         buf = bufsink_.GetBufPtr();
616     }
617 
618     if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
619         SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
620         return false;
621     }
622 
623     return true;
624 }
625 
ReadDmUserPayload(void * buffer,size_t size)626 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
627     if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
628         SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
629         return false;
630     }
631 
632     return true;
633 }
634 
DmuserWriteRequest()635 bool WorkerThread::DmuserWriteRequest() {
636     struct dm_user_header* header = bufsink_.GetHeaderPtr();
637 
638     // device mapper has the capability to allow
639     // targets to flush the cache when writes are completed. This
640     // is controlled by each target by a flag "flush_supported".
641     // This flag is set by dm-user. When flush is supported,
642     // a number of zero-length bio's will be submitted to
643     // the target for the purpose of flushing cache. It is the
644     // responsibility of the target driver - which is dm-user in this
645     // case, to remap these bio's to the underlying device. Since,
646     // there is no underlying device for dm-user, this zero length
647     // bio's gets routed to daemon.
648     //
649     // Flush operations are generated post merge by dm-snap by having
650     // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
651     // to flush per se; hence, just respond back with a success message.
652     if (header->sector == 0) {
653         if (!(header->len == 0)) {
654             SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
655             header->type = DM_USER_RESP_ERROR;
656         } else {
657             header->type = DM_USER_RESP_SUCCESS;
658         }
659 
660         if (!WriteDmUserPayload(0, true)) {
661             return false;
662         }
663         return true;
664     }
665 
666     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
667     size_t remaining_size = header->len;
668     size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
669 
670     chunk_t chunk = SectorToChunk(header->sector);
671     auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
672                                std::make_pair(header->sector, nullptr), Snapuserd::compare);
673 
674     bool not_found = (it == chunk_vec.end() || it->first != header->sector);
675 
676     if (not_found) {
677         void* buffer = bufsink_.GetPayloadBuffer(read_size);
678         if (buffer == nullptr) {
679             SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
680                             << read_size;
681             header->type = DM_USER_RESP_ERROR;
682         } else {
683             header->type = DM_USER_RESP_SUCCESS;
684 
685             if (!ReadDmUserPayload(buffer, read_size)) {
686                 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
687                                 << "Sector: " << header->sector;
688                 header->type = DM_USER_RESP_ERROR;
689             }
690 
691             if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
692                 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
693                                 << "Sector: " << header->sector;
694                 header->type = DM_USER_RESP_ERROR;
695             }
696         }
697     } else {
698         SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
699         header->type = DM_USER_RESP_ERROR;
700     }
701 
702     if (!WriteDmUserPayload(0, true)) {
703         return false;
704     }
705 
706     return true;
707 }
708 
DmuserReadRequest()709 bool WorkerThread::DmuserReadRequest() {
710     struct dm_user_header* header = bufsink_.GetHeaderPtr();
711     size_t remaining_size = header->len;
712     loff_t offset = 0;
713     sector_t sector = header->sector;
714     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
715     bool header_response = true;
716     do {
717         size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
718 
719         int ret = read_size;
720         header->type = DM_USER_RESP_SUCCESS;
721         chunk_t chunk = SectorToChunk(header->sector);
722 
723         // Request to sector 0 is always for kernel
724         // representation of COW header. This IO should be only
725         // once during dm-snapshot device creation. We should
726         // never see multiple IO requests. Additionally this IO
727         // will always be a single 4k.
728         if (header->sector == 0) {
729             if (read_size == BLOCK_SZ) {
730                 ConstructKernelCowHeader();
731                 SNAP_LOG(DEBUG) << "Kernel header constructed";
732             } else {
733                 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
734                 header->type = DM_USER_RESP_ERROR;
735             }
736         } else {
737             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
738                                        std::make_pair(header->sector, nullptr), Snapuserd::compare);
739             bool not_found = (it == chunk_vec.end() || it->first != header->sector);
740             if (!offset && (read_size == BLOCK_SZ) && not_found) {
741                 if (!ReadDiskExceptions(chunk, read_size)) {
742                     SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
743                                     << "Sector: " << header->sector;
744                     header->type = DM_USER_RESP_ERROR;
745                 } else {
746                     SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
747                                     << "Sector: " << header->sector;
748                 }
749             } else {
750                 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
751 
752                 ret = ReadData(sector + num_sectors_read, read_size);
753                 if (ret < 0) {
754                     SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
755                                     << " Sector: " << (sector + num_sectors_read)
756                                     << " size: " << read_size << " header-len: " << header->len;
757                     header->type = DM_USER_RESP_ERROR;
758                 } else {
759                     SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
760                                     << "Sector: " << header->sector;
761                 }
762             }
763         }
764 
765         // Just return the header if it is an error
766         if (header->type == DM_USER_RESP_ERROR) {
767             SNAP_LOG(ERROR) << "IO read request failed...";
768             ret = 0;
769         }
770 
771         if (!header_response) {
772             CHECK(header->type == DM_USER_RESP_SUCCESS)
773                     << " failed for sector: " << sector << " header->len: " << header->len
774                     << " remaining_size: " << remaining_size;
775         }
776 
777         // Daemon will not be terminated if there is any error. We will
778         // just send the error back to dm-user.
779         if (!WriteDmUserPayload(ret, header_response)) {
780             return false;
781         }
782 
783         if (header->type == DM_USER_RESP_ERROR) {
784             break;
785         }
786 
787         remaining_size -= ret;
788         offset += ret;
789         header_response = false;
790     } while (remaining_size > 0);
791 
792     return true;
793 }
794 
InitializeBufsink()795 void WorkerThread::InitializeBufsink() {
796     // Allocate the buffer which is used to communicate between
797     // daemon and dm-user. The buffer comprises of header and a fixed payload.
798     // If the dm-user requests a big IO, the IO will be broken into chunks
799     // of PAYLOAD_SIZE.
800     size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
801     bufsink_.Initialize(buf_size);
802 }
803 
RunThread()804 bool WorkerThread::RunThread() {
805     InitializeBufsink();
806 
807     if (!InitializeFds()) {
808         return false;
809     }
810 
811     if (!InitReader()) {
812         return false;
813     }
814 
815     // Start serving IO
816     while (true) {
817         if (!ProcessIORequest()) {
818             break;
819         }
820     }
821 
822     CloseFds();
823     reader_->CloseCowFd();
824 
825     return true;
826 }
827 
ProcessIORequest()828 bool WorkerThread::ProcessIORequest() {
829     struct dm_user_header* header = bufsink_.GetHeaderPtr();
830 
831     if (!ReadDmUserHeader()) {
832         return false;
833     }
834 
835     SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
836     SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
837     SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
838     SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
839     SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
840 
841     switch (header->type) {
842         case DM_USER_REQ_MAP_READ: {
843             if (!DmuserReadRequest()) {
844                 return false;
845             }
846             break;
847         }
848 
849         case DM_USER_REQ_MAP_WRITE: {
850             if (!DmuserWriteRequest()) {
851                 return false;
852             }
853             break;
854         }
855     }
856 
857     return true;
858 }
859 
860 }  // namespace snapshot
861 }  // namespace android
862