1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd.h"
18
19 #include <csignal>
20 #include <optional>
21 #include <set>
22
23 #include <snapuserd/snapuserd_client.h>
24
25 namespace android {
26 namespace snapshot {
27
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)35 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
36 const std::string& control_device, const std::string& misc_name,
37 std::shared_ptr<Snapuserd> snapuserd) {
38 cow_device_ = cow_device;
39 backing_store_device_ = backing_device;
40 control_device_ = control_device;
41 misc_name_ = misc_name;
42 snapuserd_ = snapuserd;
43 exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
44 }
45
InitializeFds()46 bool WorkerThread::InitializeFds() {
47 backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
48 if (backing_store_fd_ < 0) {
49 SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
50 return false;
51 }
52
53 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
54 if (cow_fd_ < 0) {
55 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
56 return false;
57 }
58
59 ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
60 if (ctrl_fd_ < 0) {
61 SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
62 return false;
63 }
64
65 return true;
66 }
67
InitReader()68 bool WorkerThread::InitReader() {
69 reader_ = snapuserd_->CloneReaderForWorker();
70
71 if (!reader_->InitForMerge(std::move(cow_fd_))) {
72 return false;
73 }
74 return true;
75 }
76
77 // Construct kernel COW header in memory
78 // This header will be in sector 0. The IO
79 // request will always be 4k. After constructing
80 // the header, zero out the remaining block.
ConstructKernelCowHeader()81 void WorkerThread::ConstructKernelCowHeader() {
82 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
83
84 memset(buffer, 0, BLOCK_SZ);
85
86 struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
87
88 dh->magic = SNAP_MAGIC;
89 dh->valid = SNAPSHOT_VALID;
90 dh->version = SNAPSHOT_DISK_VERSION;
91 dh->chunk_size = CHUNK_SIZE;
92 }
93
94 // Start the replace operation. This will read the
95 // internal COW format and if the block is compressed,
96 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)97 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
98 if (!reader_->ReadData(*cow_op, &bufsink_)) {
99 SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
100 return false;
101 }
102
103 return true;
104 }
105
ReadFromBaseDevice(const CowOperation * cow_op)106 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
107 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
108 if (buffer == nullptr) {
109 SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
110 return false;
111 }
112 SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
113 << " Source: " << cow_op->source;
114 uint64_t offset = cow_op->source;
115 if (cow_op->type == kCowCopyOp) {
116 offset *= BLOCK_SZ;
117 }
118 if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
119 SNAP_PLOG(ERROR) << "Copy op failed. Read from backing store: " << backing_store_device_
120 << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
121 return false;
122 }
123
124 return true;
125 }
126
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)127 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
128 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
129 if (buffer == nullptr) {
130 SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
131 return false;
132 }
133
134 if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
135 return false;
136 }
137
138 return true;
139 }
140
141 // Start the copy operation. This will read the backing
142 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)143 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
144 if (!GetReadAheadPopulatedBuffer(cow_op)) {
145 SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
146 << " new_block: " << cow_op->new_block;
147 if (!ReadFromBaseDevice(cow_op)) {
148 return false;
149 }
150 }
151
152 return true;
153 }
154
ProcessZeroOp()155 bool WorkerThread::ProcessZeroOp() {
156 // Zero out the entire block
157 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
158 if (buffer == nullptr) {
159 SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
160 return false;
161 }
162
163 memset(buffer, 0, BLOCK_SZ);
164 return true;
165 }
166
ProcessCowOp(const CowOperation * cow_op)167 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
168 if (cow_op == nullptr) {
169 SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
170 return false;
171 }
172
173 switch (cow_op->type) {
174 case kCowReplaceOp: {
175 return ProcessReplaceOp(cow_op);
176 }
177
178 case kCowZeroOp: {
179 return ProcessZeroOp();
180 }
181
182 case kCowCopyOp: {
183 return ProcessCopyOp(cow_op);
184 }
185
186 default: {
187 SNAP_LOG(ERROR) << "Unsupported operation-type found: " << cow_op->type;
188 }
189 }
190 return false;
191 }
192
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)193 int WorkerThread::ReadUnalignedSector(
194 sector_t sector, size_t size,
195 std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
196 size_t skip_sector_size = 0;
197
198 SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
199 << " Aligned sector: " << it->first;
200
201 if (!ProcessCowOp(it->second)) {
202 SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
203 << " Aligned sector: " << it->first;
204 return -1;
205 }
206
207 int num_sectors_skip = sector - it->first;
208
209 if (num_sectors_skip > 0) {
210 skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
211 char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
212 struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
213
214 if (skip_sector_size == BLOCK_SZ) {
215 SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
216 << " Base-sector: " << it->first;
217 return -1;
218 }
219
220 memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
221 (BLOCK_SZ - skip_sector_size));
222 }
223
224 bufsink_.ResetBufferOffset();
225 return std::min(size, (BLOCK_SZ - skip_sector_size));
226 }
227
228 /*
229 * Read the data for a given COW Operation.
230 *
231 * Kernel can issue IO at a sector granularity.
232 * Hence, an IO may end up with reading partial
233 * data from a COW operation or we may also
234 * end up with interspersed request between
235 * two COW operations.
236 *
237 */
ReadData(sector_t sector,size_t size)238 int WorkerThread::ReadData(sector_t sector, size_t size) {
239 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
240 std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
241 /*
242 * chunk_map stores COW operation at 4k granularity.
243 * If the requested IO with the sector falls on the 4k
244 * boundary, then we can read the COW op directly without
245 * any issue.
246 *
247 * However, if the requested sector is not 4K aligned,
248 * then we will have the find the nearest COW operation
249 * and chop the 4K block to fetch the requested sector.
250 */
251 it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
252 Snapuserd::compare);
253
254 bool read_end_of_device = false;
255 if (it == chunk_vec.end()) {
256 // |-------|-------|-------|
257 // 0 1 2 3
258 //
259 // Block 0 - op 1
260 // Block 1 - op 2
261 // Block 2 - op 3
262 //
263 // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
264 //
265 // Each block is 4k bytes. Thus, the last block will span 8 sectors
266 // ranging till block 3 (However, block 3 won't be in chunk_vec as
267 // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
268 // spanning between block 2 and block 3, we need to step back
269 // and get hold of the last element.
270 //
271 // Additionally, dm-snapshot makes sure that I/O request beyond block 3
272 // will not be routed to the daemon. Hence, it is safe to assume that
273 // if a sector is not available in the chunk_vec, the I/O falls in the
274 // end of region.
275 it = std::prev(chunk_vec.end());
276 read_end_of_device = true;
277 }
278
279 // We didn't find the required sector; hence find the previous sector
280 // as lower_bound will gives us the value greater than
281 // the requested sector
282 if (it->first != sector) {
283 if (it != chunk_vec.begin() && !read_end_of_device) {
284 --it;
285 }
286
287 /*
288 * If the IO is spanned between two COW operations,
289 * split the IO into two parts:
290 *
291 * 1: Read the first part from the single COW op
292 * 2: Read the second part from the next COW op.
293 *
294 * Ex: Let's say we have a 1024 Bytes IO request.
295 *
296 * 0 COW OP-1 4096 COW OP-2 8192
297 * |******************|*******************|
298 * |*****|*****|
299 * 3584 4608
300 * <- 1024B - >
301 *
302 * We have two COW operations which are 4k blocks.
303 * The IO is requested for 1024 Bytes which are spanned
304 * between two COW operations. We will split this IO
305 * into two parts:
306 *
307 * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
308 * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
309 */
310 return ReadUnalignedSector(sector, size, it);
311 }
312
313 int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
314 sector_t read_sector = sector;
315 while (num_ops) {
316 // We have to make sure that the reads are
317 // sequential; there shouldn't be a data
318 // request merged with a metadata IO.
319 if (it->first != read_sector) {
320 SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
321 << " cow-op sector: " << it->first;
322 return -1;
323 } else if (!ProcessCowOp(it->second)) {
324 return -1;
325 }
326 num_ops -= 1;
327 read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
328
329 it++;
330
331 if (it == chunk_vec.end() && num_ops) {
332 SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
333 << " COW ops completed; pending read-request: " << num_ops;
334 return -1;
335 }
336 // Update the buffer offset
337 bufsink_.UpdateBufferOffset(BLOCK_SZ);
338 }
339
340 // Reset the buffer offset
341 bufsink_.ResetBufferOffset();
342 return size;
343 }
344
345 /*
346 * dm-snap does prefetch reads while reading disk-exceptions.
347 * By default, prefetch value is set to 12; this means that
348 * dm-snap will issue 12 areas wherein each area is a 4k page
349 * of disk-exceptions.
350 *
351 * If during prefetch, if the chunk-id seen is beyond the
352 * actual number of metadata page, fill the buffer with zero.
353 * When dm-snap starts parsing the buffer, it will stop
354 * reading metadata page once the buffer content is zero.
355 */
ZerofillDiskExceptions(size_t read_size)356 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
357 size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
358
359 if (read_size > size) {
360 return false;
361 }
362
363 void* buffer = bufsink_.GetPayloadBuffer(size);
364 if (buffer == nullptr) {
365 SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
366 return false;
367 }
368
369 memset(buffer, 0, size);
370 return true;
371 }
372
373 /*
374 * A disk exception is a simple mapping of old_chunk to new_chunk.
375 * When dm-snapshot device is created, kernel requests these mapping.
376 *
377 * Each disk exception is of size 16 bytes. Thus a single 4k page can
378 * have:
379 *
380 * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
381 * is considered a metadata page and it is represented by chunk ID.
382 *
383 * Convert the chunk ID to index into the vector which gives us
384 * the metadata page.
385 */
ReadDiskExceptions(chunk_t chunk,size_t read_size)386 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
387 uint32_t stride = exceptions_per_area_ + 1;
388 size_t size;
389 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
390
391 // ChunkID to vector index
392 lldiv_t divresult = lldiv(chunk, stride);
393
394 if (divresult.quot < vec.size()) {
395 size = exceptions_per_area_ * sizeof(struct disk_exception);
396
397 if (read_size != size) {
398 SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
399 << " does not match with size: " << size;
400 return false;
401 }
402
403 void* buffer = bufsink_.GetPayloadBuffer(size);
404 if (buffer == nullptr) {
405 SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
406 return false;
407 }
408
409 memcpy(buffer, vec[divresult.quot].get(), size);
410 } else {
411 return ZerofillDiskExceptions(read_size);
412 }
413
414 return true;
415 }
416
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)417 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
418 int* unmerged_exceptions) {
419 loff_t offset = 0;
420 *unmerged_exceptions = 0;
421
422 while (*unmerged_exceptions <= exceptions_per_area_) {
423 struct disk_exception* merged_de =
424 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
425 struct disk_exception* cow_de =
426 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
427
428 // Unmerged op by the kernel
429 if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
430 if (!(merged_de->old_chunk == cow_de->old_chunk)) {
431 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
432 << merged_de->old_chunk
433 << "cow_de->old_chunk: " << cow_de->old_chunk;
434 return -1;
435 }
436
437 if (!(merged_de->new_chunk == cow_de->new_chunk)) {
438 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
439 << merged_de->new_chunk
440 << "cow_de->new_chunk: " << cow_de->new_chunk;
441 return -1;
442 }
443
444 offset += sizeof(struct disk_exception);
445 *unmerged_exceptions += 1;
446 continue;
447 }
448
449 break;
450 }
451
452 SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
453 return offset;
454 }
455
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * ordered_op,bool * commit)456 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
457 int unmerged_exceptions, bool* ordered_op, bool* commit) {
458 int merged_ops_cur_iter = 0;
459 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
460 *ordered_op = false;
461 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
462
463 // Find the operations which are merged in this cycle.
464 while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
465 struct disk_exception* merged_de =
466 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
467 struct disk_exception* cow_de =
468 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
469
470 if (!(merged_de->new_chunk == 0)) {
471 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
472 return -1;
473 }
474
475 if (!(merged_de->old_chunk == 0)) {
476 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
477 return -1;
478 }
479
480 if (cow_de->new_chunk != 0) {
481 merged_ops_cur_iter += 1;
482 offset += sizeof(struct disk_exception);
483 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
484 std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
485 Snapuserd::compare);
486
487 if (!(it != chunk_vec.end())) {
488 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
489 return -1;
490 }
491
492 if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
493 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
494 return -1;
495 }
496 const CowOperation* cow_op = it->second;
497
498 if (snapuserd_->IsReadAheadFeaturePresent() && IsOrderedOp(*cow_op)) {
499 *ordered_op = true;
500 // Every single ordered operation has to come from read-ahead
501 // cache.
502 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
503 SNAP_LOG(ERROR)
504 << " Block: " << cow_op->new_block << " not found in read-ahead cache"
505 << " Source: " << cow_op->source;
506 return -1;
507 }
508 // If this is a final block merged in the read-ahead buffer
509 // region, notify the read-ahead thread to make forward
510 // progress
511 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
512 *commit = true;
513 }
514 }
515
516 // zero out to indicate that operation is merged.
517 cow_de->old_chunk = 0;
518 cow_de->new_chunk = 0;
519 } else if (cow_de->old_chunk == 0) {
520 // Already merged op in previous iteration or
521 // This could also represent a partially filled area.
522 //
523 // If the op was merged in previous cycle, we don't have
524 // to count them.
525 break;
526 } else {
527 SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
528 << " merged_de-old-chunk: " << merged_de->old_chunk
529 << " merged_de-new-chunk: " << merged_de->new_chunk
530 << " cow_de-old-chunk: " << cow_de->old_chunk
531 << " cow_de-new-chunk: " << cow_de->new_chunk
532 << " unmerged_exceptions: " << unmerged_exceptions
533 << " merged_ops_cur_iter: " << merged_ops_cur_iter
534 << " offset: " << offset;
535 return -1;
536 }
537 }
538 return merged_ops_cur_iter;
539 }
540
ProcessMergeComplete(chunk_t chunk,void * buffer)541 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
542 uint32_t stride = exceptions_per_area_ + 1;
543 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
544 bool ordered_op = false;
545 bool commit = false;
546
547 // ChunkID to vector index
548 lldiv_t divresult = lldiv(chunk, stride);
549
550 if (!(divresult.quot < vec.size())) {
551 SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
552 << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
553 return false;
554 }
555
556 SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
557 << " Metadata-Index: " << divresult.quot;
558
559 int unmerged_exceptions = 0;
560 loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
561
562 if (offset < 0) {
563 SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
564 << unmerged_exceptions;
565 return false;
566 }
567
568 int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
569 unmerged_exceptions, &ordered_op, &commit);
570
571 // There should be at least one operation merged in this cycle
572 if (!(merged_ops_cur_iter > 0)) {
573 SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
574 return false;
575 }
576
577 if (ordered_op) {
578 if (commit) {
579 // Push the flushing logic to read-ahead thread so that merge thread
580 // can make forward progress. Sync will happen in the background
581 snapuserd_->StartReadAhead();
582 }
583 } else {
584 // Non-copy ops and all ops in older COW format
585 if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
586 SNAP_LOG(ERROR) << "CommitMerge failed...";
587 return false;
588 }
589 }
590
591 SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
592 return true;
593 }
594
595 // Read Header from dm-user misc device. This gives
596 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()597 bool WorkerThread::ReadDmUserHeader() {
598 if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
599 if (errno != ENOTBLK) {
600 SNAP_PLOG(ERROR) << "Control-read failed";
601 }
602
603 return false;
604 }
605
606 return true;
607 }
608
609 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)610 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
611 size_t payload_size = size;
612 void* buf = bufsink_.GetPayloadBufPtr();
613 if (header_response) {
614 payload_size += sizeof(struct dm_user_header);
615 buf = bufsink_.GetBufPtr();
616 }
617
618 if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
619 SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
620 return false;
621 }
622
623 return true;
624 }
625
ReadDmUserPayload(void * buffer,size_t size)626 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
627 if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
628 SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
629 return false;
630 }
631
632 return true;
633 }
634
DmuserWriteRequest()635 bool WorkerThread::DmuserWriteRequest() {
636 struct dm_user_header* header = bufsink_.GetHeaderPtr();
637
638 // device mapper has the capability to allow
639 // targets to flush the cache when writes are completed. This
640 // is controlled by each target by a flag "flush_supported".
641 // This flag is set by dm-user. When flush is supported,
642 // a number of zero-length bio's will be submitted to
643 // the target for the purpose of flushing cache. It is the
644 // responsibility of the target driver - which is dm-user in this
645 // case, to remap these bio's to the underlying device. Since,
646 // there is no underlying device for dm-user, this zero length
647 // bio's gets routed to daemon.
648 //
649 // Flush operations are generated post merge by dm-snap by having
650 // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
651 // to flush per se; hence, just respond back with a success message.
652 if (header->sector == 0) {
653 if (!(header->len == 0)) {
654 SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
655 header->type = DM_USER_RESP_ERROR;
656 } else {
657 header->type = DM_USER_RESP_SUCCESS;
658 }
659
660 if (!WriteDmUserPayload(0, true)) {
661 return false;
662 }
663 return true;
664 }
665
666 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
667 size_t remaining_size = header->len;
668 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
669
670 chunk_t chunk = SectorToChunk(header->sector);
671 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
672 std::make_pair(header->sector, nullptr), Snapuserd::compare);
673
674 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
675
676 if (not_found) {
677 void* buffer = bufsink_.GetPayloadBuffer(read_size);
678 if (buffer == nullptr) {
679 SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
680 << read_size;
681 header->type = DM_USER_RESP_ERROR;
682 } else {
683 header->type = DM_USER_RESP_SUCCESS;
684
685 if (!ReadDmUserPayload(buffer, read_size)) {
686 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
687 << "Sector: " << header->sector;
688 header->type = DM_USER_RESP_ERROR;
689 }
690
691 if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
692 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
693 << "Sector: " << header->sector;
694 header->type = DM_USER_RESP_ERROR;
695 }
696 }
697 } else {
698 SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
699 header->type = DM_USER_RESP_ERROR;
700 }
701
702 if (!WriteDmUserPayload(0, true)) {
703 return false;
704 }
705
706 return true;
707 }
708
DmuserReadRequest()709 bool WorkerThread::DmuserReadRequest() {
710 struct dm_user_header* header = bufsink_.GetHeaderPtr();
711 size_t remaining_size = header->len;
712 loff_t offset = 0;
713 sector_t sector = header->sector;
714 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
715 bool header_response = true;
716 do {
717 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
718
719 int ret = read_size;
720 header->type = DM_USER_RESP_SUCCESS;
721 chunk_t chunk = SectorToChunk(header->sector);
722
723 // Request to sector 0 is always for kernel
724 // representation of COW header. This IO should be only
725 // once during dm-snapshot device creation. We should
726 // never see multiple IO requests. Additionally this IO
727 // will always be a single 4k.
728 if (header->sector == 0) {
729 if (read_size == BLOCK_SZ) {
730 ConstructKernelCowHeader();
731 SNAP_LOG(DEBUG) << "Kernel header constructed";
732 } else {
733 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
734 header->type = DM_USER_RESP_ERROR;
735 }
736 } else {
737 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
738 std::make_pair(header->sector, nullptr), Snapuserd::compare);
739 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
740 if (!offset && (read_size == BLOCK_SZ) && not_found) {
741 if (!ReadDiskExceptions(chunk, read_size)) {
742 SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
743 << "Sector: " << header->sector;
744 header->type = DM_USER_RESP_ERROR;
745 } else {
746 SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
747 << "Sector: " << header->sector;
748 }
749 } else {
750 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
751
752 ret = ReadData(sector + num_sectors_read, read_size);
753 if (ret < 0) {
754 SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
755 << " Sector: " << (sector + num_sectors_read)
756 << " size: " << read_size << " header-len: " << header->len;
757 header->type = DM_USER_RESP_ERROR;
758 } else {
759 SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
760 << "Sector: " << header->sector;
761 }
762 }
763 }
764
765 // Just return the header if it is an error
766 if (header->type == DM_USER_RESP_ERROR) {
767 SNAP_LOG(ERROR) << "IO read request failed...";
768 ret = 0;
769 }
770
771 if (!header_response) {
772 CHECK(header->type == DM_USER_RESP_SUCCESS)
773 << " failed for sector: " << sector << " header->len: " << header->len
774 << " remaining_size: " << remaining_size;
775 }
776
777 // Daemon will not be terminated if there is any error. We will
778 // just send the error back to dm-user.
779 if (!WriteDmUserPayload(ret, header_response)) {
780 return false;
781 }
782
783 if (header->type == DM_USER_RESP_ERROR) {
784 break;
785 }
786
787 remaining_size -= ret;
788 offset += ret;
789 header_response = false;
790 } while (remaining_size > 0);
791
792 return true;
793 }
794
InitializeBufsink()795 void WorkerThread::InitializeBufsink() {
796 // Allocate the buffer which is used to communicate between
797 // daemon and dm-user. The buffer comprises of header and a fixed payload.
798 // If the dm-user requests a big IO, the IO will be broken into chunks
799 // of PAYLOAD_SIZE.
800 size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
801 bufsink_.Initialize(buf_size);
802 }
803
RunThread()804 bool WorkerThread::RunThread() {
805 InitializeBufsink();
806
807 if (!InitializeFds()) {
808 return false;
809 }
810
811 if (!InitReader()) {
812 return false;
813 }
814
815 // Start serving IO
816 while (true) {
817 if (!ProcessIORequest()) {
818 break;
819 }
820 }
821
822 CloseFds();
823 reader_->CloseCowFd();
824
825 return true;
826 }
827
ProcessIORequest()828 bool WorkerThread::ProcessIORequest() {
829 struct dm_user_header* header = bufsink_.GetHeaderPtr();
830
831 if (!ReadDmUserHeader()) {
832 return false;
833 }
834
835 SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
836 SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
837 SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
838 SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
839 SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
840
841 switch (header->type) {
842 case DM_USER_REQ_MAP_READ: {
843 if (!DmuserReadRequest()) {
844 return false;
845 }
846 break;
847 }
848
849 case DM_USER_REQ_MAP_WRITE: {
850 if (!DmuserWriteRequest()) {
851 return false;
852 }
853 break;
854 }
855 }
856
857 return true;
858 }
859
860 } // namespace snapshot
861 } // namespace android
862