1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd.h"
18
19 #include <csignal>
20 #include <optional>
21 #include <set>
22
23 #include <libsnapshot/snapuserd_client.h>
24
25 namespace android {
26 namespace snapshot {
27
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31
32 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
33 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
34
Initialize(size_t size)35 void BufferSink::Initialize(size_t size) {
36 buffer_size_ = size;
37 buffer_offset_ = 0;
38 buffer_ = std::make_unique<uint8_t[]>(size);
39 }
40
GetPayloadBuffer(size_t size)41 void* BufferSink::GetPayloadBuffer(size_t size) {
42 if ((buffer_size_ - buffer_offset_) < size) return nullptr;
43
44 char* buffer = reinterpret_cast<char*>(GetBufPtr());
45 struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
46 return (char*)msg->payload.buf + buffer_offset_;
47 }
48
GetBuffer(size_t requested,size_t * actual)49 void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
50 void* buf = GetPayloadBuffer(requested);
51 if (!buf) {
52 *actual = 0;
53 return nullptr;
54 }
55 *actual = requested;
56 return buf;
57 }
58
GetHeaderPtr()59 struct dm_user_header* BufferSink::GetHeaderPtr() {
60 if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
61 return nullptr;
62 }
63 char* buf = reinterpret_cast<char*>(GetBufPtr());
64 struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
65 return header;
66 }
67
GetPayloadBufPtr()68 void* BufferSink::GetPayloadBufPtr() {
69 char* buffer = reinterpret_cast<char*>(GetBufPtr());
70 struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
71 return msg->payload.buf;
72 }
73
WorkerThread(const std::string & cow_device,const std::string & backing_device,const std::string & control_device,const std::string & misc_name,std::shared_ptr<Snapuserd> snapuserd)74 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
75 const std::string& control_device, const std::string& misc_name,
76 std::shared_ptr<Snapuserd> snapuserd) {
77 cow_device_ = cow_device;
78 backing_store_device_ = backing_device;
79 control_device_ = control_device;
80 misc_name_ = misc_name;
81 snapuserd_ = snapuserd;
82 exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
83 }
84
InitializeFds()85 bool WorkerThread::InitializeFds() {
86 backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
87 if (backing_store_fd_ < 0) {
88 SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
89 return false;
90 }
91
92 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
93 if (cow_fd_ < 0) {
94 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
95 return false;
96 }
97
98 ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
99 if (ctrl_fd_ < 0) {
100 SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
101 return false;
102 }
103
104 return true;
105 }
106
InitReader()107 bool WorkerThread::InitReader() {
108 reader_ = std::make_unique<CowReader>();
109 if (!reader_->InitForMerge(std::move(cow_fd_))) {
110 return false;
111 }
112
113 return true;
114 }
115
116 // Construct kernel COW header in memory
117 // This header will be in sector 0. The IO
118 // request will always be 4k. After constructing
119 // the header, zero out the remaining block.
ConstructKernelCowHeader()120 void WorkerThread::ConstructKernelCowHeader() {
121 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
122
123 memset(buffer, 0, BLOCK_SZ);
124
125 struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
126
127 dh->magic = SNAP_MAGIC;
128 dh->valid = SNAPSHOT_VALID;
129 dh->version = SNAPSHOT_DISK_VERSION;
130 dh->chunk_size = CHUNK_SIZE;
131 }
132
133 // Start the replace operation. This will read the
134 // internal COW format and if the block is compressed,
135 // it will be de-compressed.
ProcessReplaceOp(const CowOperation * cow_op)136 bool WorkerThread::ProcessReplaceOp(const CowOperation* cow_op) {
137 if (!reader_->ReadData(*cow_op, &bufsink_)) {
138 SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
139 return false;
140 }
141
142 return true;
143 }
144
ReadFromBaseDevice(const CowOperation * cow_op)145 bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
146 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
147 if (buffer == nullptr) {
148 SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
149 return false;
150 }
151 SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
152 << " Source: " << cow_op->source;
153 if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
154 cow_op->source * BLOCK_SZ)) {
155 SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
156 << "at block :" << cow_op->source;
157 return false;
158 }
159
160 return true;
161 }
162
GetReadAheadPopulatedBuffer(const CowOperation * cow_op)163 bool WorkerThread::GetReadAheadPopulatedBuffer(const CowOperation* cow_op) {
164 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
165 if (buffer == nullptr) {
166 SNAP_LOG(ERROR) << "GetReadAheadPopulatedBuffer: Failed to get payload buffer";
167 return false;
168 }
169
170 if (!snapuserd_->GetReadAheadPopulatedBuffer(cow_op->new_block, buffer)) {
171 return false;
172 }
173
174 return true;
175 }
176
177 // Start the copy operation. This will read the backing
178 // block device which is represented by cow_op->source.
ProcessCopyOp(const CowOperation * cow_op)179 bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
180 if (!GetReadAheadPopulatedBuffer(cow_op)) {
181 SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
182 << " new_block: " << cow_op->new_block;
183 if (!ReadFromBaseDevice(cow_op)) {
184 return false;
185 }
186 }
187
188 return true;
189 }
190
ProcessZeroOp()191 bool WorkerThread::ProcessZeroOp() {
192 // Zero out the entire block
193 void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
194 if (buffer == nullptr) {
195 SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
196 return false;
197 }
198
199 memset(buffer, 0, BLOCK_SZ);
200 return true;
201 }
202
ProcessCowOp(const CowOperation * cow_op)203 bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
204 if (cow_op == nullptr) {
205 SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
206 return false;
207 }
208
209 switch (cow_op->type) {
210 case kCowReplaceOp: {
211 return ProcessReplaceOp(cow_op);
212 }
213
214 case kCowZeroOp: {
215 return ProcessZeroOp();
216 }
217
218 case kCowCopyOp: {
219 return ProcessCopyOp(cow_op);
220 }
221
222 default: {
223 SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
224 }
225 }
226 return false;
227 }
228
ReadUnalignedSector(sector_t sector,size_t size,std::vector<std::pair<sector_t,const CowOperation * >>::iterator & it)229 int WorkerThread::ReadUnalignedSector(
230 sector_t sector, size_t size,
231 std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
232 size_t skip_sector_size = 0;
233
234 SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
235 << " Aligned sector: " << it->first;
236
237 if (!ProcessCowOp(it->second)) {
238 SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
239 << " Aligned sector: " << it->first;
240 return -1;
241 }
242
243 int num_sectors_skip = sector - it->first;
244
245 if (num_sectors_skip > 0) {
246 skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
247 char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
248 struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
249
250 if (skip_sector_size == BLOCK_SZ) {
251 SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
252 << " Base-sector: " << it->first;
253 return -1;
254 }
255
256 memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
257 (BLOCK_SZ - skip_sector_size));
258 }
259
260 bufsink_.ResetBufferOffset();
261 return std::min(size, (BLOCK_SZ - skip_sector_size));
262 }
263
264 /*
265 * Read the data for a given COW Operation.
266 *
267 * Kernel can issue IO at a sector granularity.
268 * Hence, an IO may end up with reading partial
269 * data from a COW operation or we may also
270 * end up with interspersed request between
271 * two COW operations.
272 *
273 */
ReadData(sector_t sector,size_t size)274 int WorkerThread::ReadData(sector_t sector, size_t size) {
275 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
276 std::vector<std::pair<sector_t, const CowOperation*>>::iterator it;
277 /*
278 * chunk_map stores COW operation at 4k granularity.
279 * If the requested IO with the sector falls on the 4k
280 * boundary, then we can read the COW op directly without
281 * any issue.
282 *
283 * However, if the requested sector is not 4K aligned,
284 * then we will have the find the nearest COW operation
285 * and chop the 4K block to fetch the requested sector.
286 */
287 it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
288 Snapuserd::compare);
289
290 bool read_end_of_device = false;
291 if (it == chunk_vec.end()) {
292 // |-------|-------|-------|
293 // 0 1 2 3
294 //
295 // Block 0 - op 1
296 // Block 1 - op 2
297 // Block 2 - op 3
298 //
299 // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
300 //
301 // Each block is 4k bytes. Thus, the last block will span 8 sectors
302 // ranging till block 3 (However, block 3 won't be in chunk_vec as
303 // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
304 // spanning between block 2 and block 3, we need to step back
305 // and get hold of the last element.
306 //
307 // Additionally, dm-snapshot makes sure that I/O request beyond block 3
308 // will not be routed to the daemon. Hence, it is safe to assume that
309 // if a sector is not available in the chunk_vec, the I/O falls in the
310 // end of region.
311 it = std::prev(chunk_vec.end());
312 read_end_of_device = true;
313 }
314
315 // We didn't find the required sector; hence find the previous sector
316 // as lower_bound will gives us the value greater than
317 // the requested sector
318 if (it->first != sector) {
319 if (it != chunk_vec.begin() && !read_end_of_device) {
320 --it;
321 }
322
323 /*
324 * If the IO is spanned between two COW operations,
325 * split the IO into two parts:
326 *
327 * 1: Read the first part from the single COW op
328 * 2: Read the second part from the next COW op.
329 *
330 * Ex: Let's say we have a 1024 Bytes IO request.
331 *
332 * 0 COW OP-1 4096 COW OP-2 8192
333 * |******************|*******************|
334 * |*****|*****|
335 * 3584 4608
336 * <- 1024B - >
337 *
338 * We have two COW operations which are 4k blocks.
339 * The IO is requested for 1024 Bytes which are spanned
340 * between two COW operations. We will split this IO
341 * into two parts:
342 *
343 * 1: IO of size 512B from offset 3584 bytes (COW OP-1)
344 * 2: IO of size 512B from offset 4096 bytes (COW OP-2)
345 */
346 return ReadUnalignedSector(sector, size, it);
347 }
348
349 int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
350 sector_t read_sector = sector;
351 while (num_ops) {
352 // We have to make sure that the reads are
353 // sequential; there shouldn't be a data
354 // request merged with a metadata IO.
355 if (it->first != read_sector) {
356 SNAP_LOG(ERROR) << "Invalid IO request: read_sector: " << read_sector
357 << " cow-op sector: " << it->first;
358 return -1;
359 } else if (!ProcessCowOp(it->second)) {
360 return -1;
361 }
362 num_ops -= 1;
363 read_sector += (BLOCK_SZ >> SECTOR_SHIFT);
364
365 it++;
366
367 if (it == chunk_vec.end() && num_ops) {
368 SNAP_LOG(ERROR) << "Invalid IO request at sector " << sector
369 << " COW ops completed; pending read-request: " << num_ops;
370 return -1;
371 }
372 // Update the buffer offset
373 bufsink_.UpdateBufferOffset(BLOCK_SZ);
374 }
375
376 // Reset the buffer offset
377 bufsink_.ResetBufferOffset();
378 return size;
379 }
380
381 /*
382 * dm-snap does prefetch reads while reading disk-exceptions.
383 * By default, prefetch value is set to 12; this means that
384 * dm-snap will issue 12 areas wherein each area is a 4k page
385 * of disk-exceptions.
386 *
387 * If during prefetch, if the chunk-id seen is beyond the
388 * actual number of metadata page, fill the buffer with zero.
389 * When dm-snap starts parsing the buffer, it will stop
390 * reading metadata page once the buffer content is zero.
391 */
ZerofillDiskExceptions(size_t read_size)392 bool WorkerThread::ZerofillDiskExceptions(size_t read_size) {
393 size_t size = exceptions_per_area_ * sizeof(struct disk_exception);
394
395 if (read_size > size) {
396 return false;
397 }
398
399 void* buffer = bufsink_.GetPayloadBuffer(size);
400 if (buffer == nullptr) {
401 SNAP_LOG(ERROR) << "ZerofillDiskExceptions: Failed to get payload buffer";
402 return false;
403 }
404
405 memset(buffer, 0, size);
406 return true;
407 }
408
409 /*
410 * A disk exception is a simple mapping of old_chunk to new_chunk.
411 * When dm-snapshot device is created, kernel requests these mapping.
412 *
413 * Each disk exception is of size 16 bytes. Thus a single 4k page can
414 * have:
415 *
416 * exceptions_per_area_ = 4096/16 = 256. This entire 4k page
417 * is considered a metadata page and it is represented by chunk ID.
418 *
419 * Convert the chunk ID to index into the vector which gives us
420 * the metadata page.
421 */
ReadDiskExceptions(chunk_t chunk,size_t read_size)422 bool WorkerThread::ReadDiskExceptions(chunk_t chunk, size_t read_size) {
423 uint32_t stride = exceptions_per_area_ + 1;
424 size_t size;
425 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
426
427 // ChunkID to vector index
428 lldiv_t divresult = lldiv(chunk, stride);
429
430 if (divresult.quot < vec.size()) {
431 size = exceptions_per_area_ * sizeof(struct disk_exception);
432
433 if (read_size != size) {
434 SNAP_LOG(ERROR) << "ReadDiskExceptions: read_size: " << read_size
435 << " does not match with size: " << size;
436 return false;
437 }
438
439 void* buffer = bufsink_.GetPayloadBuffer(size);
440 if (buffer == nullptr) {
441 SNAP_LOG(ERROR) << "ReadDiskExceptions: Failed to get payload buffer of size: " << size;
442 return false;
443 }
444
445 memcpy(buffer, vec[divresult.quot].get(), size);
446 } else {
447 return ZerofillDiskExceptions(read_size);
448 }
449
450 return true;
451 }
452
GetMergeStartOffset(void * merged_buffer,void * unmerged_buffer,int * unmerged_exceptions)453 loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
454 int* unmerged_exceptions) {
455 loff_t offset = 0;
456 *unmerged_exceptions = 0;
457
458 while (*unmerged_exceptions <= exceptions_per_area_) {
459 struct disk_exception* merged_de =
460 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
461 struct disk_exception* cow_de =
462 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
463
464 // Unmerged op by the kernel
465 if (merged_de->old_chunk != 0 || merged_de->new_chunk != 0) {
466 if (!(merged_de->old_chunk == cow_de->old_chunk)) {
467 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->old_chunk: "
468 << merged_de->old_chunk
469 << "cow_de->old_chunk: " << cow_de->old_chunk;
470 return -1;
471 }
472
473 if (!(merged_de->new_chunk == cow_de->new_chunk)) {
474 SNAP_LOG(ERROR) << "GetMergeStartOffset: merged_de->new_chunk: "
475 << merged_de->new_chunk
476 << "cow_de->new_chunk: " << cow_de->new_chunk;
477 return -1;
478 }
479
480 offset += sizeof(struct disk_exception);
481 *unmerged_exceptions += 1;
482 continue;
483 }
484
485 break;
486 }
487
488 SNAP_LOG(DEBUG) << "Unmerged_Exceptions: " << *unmerged_exceptions << " Offset: " << offset;
489 return offset;
490 }
491
GetNumberOfMergedOps(void * merged_buffer,void * unmerged_buffer,loff_t offset,int unmerged_exceptions,bool * copy_op,bool * commit)492 int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
493 int unmerged_exceptions, bool* copy_op, bool* commit) {
494 int merged_ops_cur_iter = 0;
495 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
496 *copy_op = false;
497 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
498
499 // Find the operations which are merged in this cycle.
500 while ((unmerged_exceptions + merged_ops_cur_iter) < exceptions_per_area_) {
501 struct disk_exception* merged_de =
502 reinterpret_cast<struct disk_exception*>((char*)merged_buffer + offset);
503 struct disk_exception* cow_de =
504 reinterpret_cast<struct disk_exception*>((char*)unmerged_buffer + offset);
505
506 if (!(merged_de->new_chunk == 0)) {
507 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid new-chunk: " << merged_de->new_chunk;
508 return -1;
509 }
510
511 if (!(merged_de->old_chunk == 0)) {
512 SNAP_LOG(ERROR) << "GetNumberOfMergedOps: Invalid old-chunk: " << merged_de->old_chunk;
513 return -1;
514 }
515
516 if (cow_de->new_chunk != 0) {
517 merged_ops_cur_iter += 1;
518 offset += sizeof(struct disk_exception);
519 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
520 std::make_pair(ChunkToSector(cow_de->new_chunk), nullptr),
521 Snapuserd::compare);
522
523 if (!(it != chunk_vec.end())) {
524 SNAP_LOG(ERROR) << "Sector not found: " << ChunkToSector(cow_de->new_chunk);
525 return -1;
526 }
527
528 if (!(it->first == ChunkToSector(cow_de->new_chunk))) {
529 SNAP_LOG(ERROR) << "Invalid sector: " << ChunkToSector(cow_de->new_chunk);
530 return -1;
531 }
532 const CowOperation* cow_op = it->second;
533
534 if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) {
535 *copy_op = true;
536 // Every single copy operation has to come from read-ahead
537 // cache.
538 if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
539 SNAP_LOG(ERROR)
540 << " Block: " << cow_op->new_block << " not found in read-ahead cache"
541 << " Source: " << cow_op->source;
542 return -1;
543 }
544 // If this is a final block merged in the read-ahead buffer
545 // region, notify the read-ahead thread to make forward
546 // progress
547 if (cow_op->new_block == snapuserd_->GetFinalBlockMerged()) {
548 *commit = true;
549 }
550 }
551
552 // zero out to indicate that operation is merged.
553 cow_de->old_chunk = 0;
554 cow_de->new_chunk = 0;
555 } else if (cow_de->old_chunk == 0) {
556 // Already merged op in previous iteration or
557 // This could also represent a partially filled area.
558 //
559 // If the op was merged in previous cycle, we don't have
560 // to count them.
561 break;
562 } else {
563 SNAP_LOG(ERROR) << "Error in merge operation. Found invalid metadata: "
564 << " merged_de-old-chunk: " << merged_de->old_chunk
565 << " merged_de-new-chunk: " << merged_de->new_chunk
566 << " cow_de-old-chunk: " << cow_de->old_chunk
567 << " cow_de-new-chunk: " << cow_de->new_chunk
568 << " unmerged_exceptions: " << unmerged_exceptions
569 << " merged_ops_cur_iter: " << merged_ops_cur_iter
570 << " offset: " << offset;
571 return -1;
572 }
573 }
574 return merged_ops_cur_iter;
575 }
576
ProcessMergeComplete(chunk_t chunk,void * buffer)577 bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
578 uint32_t stride = exceptions_per_area_ + 1;
579 const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
580 bool copy_op = false;
581 bool commit = false;
582
583 // ChunkID to vector index
584 lldiv_t divresult = lldiv(chunk, stride);
585
586 if (!(divresult.quot < vec.size())) {
587 SNAP_LOG(ERROR) << "ProcessMergeComplete: Invalid chunk: " << chunk
588 << " Metadata-Index: " << divresult.quot << " Area-size: " << vec.size();
589 return false;
590 }
591
592 SNAP_LOG(DEBUG) << "ProcessMergeComplete: chunk: " << chunk
593 << " Metadata-Index: " << divresult.quot;
594
595 int unmerged_exceptions = 0;
596 loff_t offset = GetMergeStartOffset(buffer, vec[divresult.quot].get(), &unmerged_exceptions);
597
598 if (offset < 0) {
599 SNAP_LOG(ERROR) << "GetMergeStartOffset failed: unmerged_exceptions: "
600 << unmerged_exceptions;
601 return false;
602 }
603
604 int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
605 unmerged_exceptions, ©_op, &commit);
606
607 // There should be at least one operation merged in this cycle
608 if (!(merged_ops_cur_iter > 0)) {
609 SNAP_LOG(ERROR) << "Merge operation failed: " << merged_ops_cur_iter;
610 return false;
611 }
612
613 if (copy_op) {
614 if (commit) {
615 // Push the flushing logic to read-ahead thread so that merge thread
616 // can make forward progress. Sync will happen in the background
617 snapuserd_->StartReadAhead();
618 }
619 } else {
620 // Non-copy ops and all ops in older COW format
621 if (!snapuserd_->CommitMerge(merged_ops_cur_iter)) {
622 SNAP_LOG(ERROR) << "CommitMerge failed...";
623 return false;
624 }
625 }
626
627 SNAP_LOG(DEBUG) << "Merge success: " << merged_ops_cur_iter << "chunk: " << chunk;
628 return true;
629 }
630
631 // Read Header from dm-user misc device. This gives
632 // us the sector number for which IO is issued by dm-snapshot device
ReadDmUserHeader()633 bool WorkerThread::ReadDmUserHeader() {
634 if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
635 if (errno != ENOTBLK) {
636 SNAP_PLOG(ERROR) << "Control-read failed";
637 }
638
639 return false;
640 }
641
642 return true;
643 }
644
645 // Send the payload/data back to dm-user misc device.
WriteDmUserPayload(size_t size,bool header_response)646 bool WorkerThread::WriteDmUserPayload(size_t size, bool header_response) {
647 size_t payload_size = size;
648 void* buf = bufsink_.GetPayloadBufPtr();
649 if (header_response) {
650 payload_size += sizeof(struct dm_user_header);
651 buf = bufsink_.GetBufPtr();
652 }
653
654 if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
655 SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
656 return false;
657 }
658
659 return true;
660 }
661
ReadDmUserPayload(void * buffer,size_t size)662 bool WorkerThread::ReadDmUserPayload(void* buffer, size_t size) {
663 if (!android::base::ReadFully(ctrl_fd_, buffer, size)) {
664 SNAP_PLOG(ERROR) << "ReadDmUserPayload failed size: " << size;
665 return false;
666 }
667
668 return true;
669 }
670
DmuserWriteRequest()671 bool WorkerThread::DmuserWriteRequest() {
672 struct dm_user_header* header = bufsink_.GetHeaderPtr();
673
674 // device mapper has the capability to allow
675 // targets to flush the cache when writes are completed. This
676 // is controlled by each target by a flag "flush_supported".
677 // This flag is set by dm-user. When flush is supported,
678 // a number of zero-length bio's will be submitted to
679 // the target for the purpose of flushing cache. It is the
680 // responsibility of the target driver - which is dm-user in this
681 // case, to remap these bio's to the underlying device. Since,
682 // there is no underlying device for dm-user, this zero length
683 // bio's gets routed to daemon.
684 //
685 // Flush operations are generated post merge by dm-snap by having
686 // REQ_PREFLUSH flag set. Snapuser daemon doesn't have anything
687 // to flush per se; hence, just respond back with a success message.
688 if (header->sector == 0) {
689 if (!(header->len == 0)) {
690 SNAP_LOG(ERROR) << "Invalid header length received from sector 0: " << header->len;
691 header->type = DM_USER_RESP_ERROR;
692 } else {
693 header->type = DM_USER_RESP_SUCCESS;
694 }
695
696 if (!WriteDmUserPayload(0, true)) {
697 return false;
698 }
699 return true;
700 }
701
702 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
703 size_t remaining_size = header->len;
704 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
705
706 chunk_t chunk = SectorToChunk(header->sector);
707 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
708 std::make_pair(header->sector, nullptr), Snapuserd::compare);
709
710 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
711
712 if (not_found) {
713 void* buffer = bufsink_.GetPayloadBuffer(read_size);
714 if (buffer == nullptr) {
715 SNAP_LOG(ERROR) << "DmuserWriteRequest: Failed to get payload buffer of size: "
716 << read_size;
717 header->type = DM_USER_RESP_ERROR;
718 } else {
719 header->type = DM_USER_RESP_SUCCESS;
720
721 if (!ReadDmUserPayload(buffer, read_size)) {
722 SNAP_LOG(ERROR) << "ReadDmUserPayload failed for chunk id: " << chunk
723 << "Sector: " << header->sector;
724 header->type = DM_USER_RESP_ERROR;
725 }
726
727 if (header->type == DM_USER_RESP_SUCCESS && !ProcessMergeComplete(chunk, buffer)) {
728 SNAP_LOG(ERROR) << "ProcessMergeComplete failed for chunk id: " << chunk
729 << "Sector: " << header->sector;
730 header->type = DM_USER_RESP_ERROR;
731 }
732 }
733 } else {
734 SNAP_LOG(ERROR) << "DmuserWriteRequest: Invalid sector received: header->sector";
735 header->type = DM_USER_RESP_ERROR;
736 }
737
738 if (!WriteDmUserPayload(0, true)) {
739 return false;
740 }
741
742 return true;
743 }
744
DmuserReadRequest()745 bool WorkerThread::DmuserReadRequest() {
746 struct dm_user_header* header = bufsink_.GetHeaderPtr();
747 size_t remaining_size = header->len;
748 loff_t offset = 0;
749 sector_t sector = header->sector;
750 std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
751 bool header_response = true;
752 do {
753 size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
754
755 int ret = read_size;
756 header->type = DM_USER_RESP_SUCCESS;
757 chunk_t chunk = SectorToChunk(header->sector);
758
759 // Request to sector 0 is always for kernel
760 // representation of COW header. This IO should be only
761 // once during dm-snapshot device creation. We should
762 // never see multiple IO requests. Additionally this IO
763 // will always be a single 4k.
764 if (header->sector == 0) {
765 if (read_size == BLOCK_SZ) {
766 ConstructKernelCowHeader();
767 SNAP_LOG(DEBUG) << "Kernel header constructed";
768 } else {
769 SNAP_LOG(ERROR) << "Invalid read_size: " << read_size << " for sector 0";
770 header->type = DM_USER_RESP_ERROR;
771 }
772 } else {
773 auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
774 std::make_pair(header->sector, nullptr), Snapuserd::compare);
775 bool not_found = (it == chunk_vec.end() || it->first != header->sector);
776 if (!offset && (read_size == BLOCK_SZ) && not_found) {
777 if (!ReadDiskExceptions(chunk, read_size)) {
778 SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
779 << "Sector: " << header->sector;
780 header->type = DM_USER_RESP_ERROR;
781 } else {
782 SNAP_LOG(DEBUG) << "ReadDiskExceptions success for chunk id: " << chunk
783 << "Sector: " << header->sector;
784 }
785 } else {
786 chunk_t num_sectors_read = (offset >> SECTOR_SHIFT);
787
788 ret = ReadData(sector + num_sectors_read, read_size);
789 if (ret < 0) {
790 SNAP_LOG(ERROR) << "ReadData failed for chunk id: " << chunk
791 << " Sector: " << (sector + num_sectors_read)
792 << " size: " << read_size << " header-len: " << header->len;
793 header->type = DM_USER_RESP_ERROR;
794 } else {
795 SNAP_LOG(DEBUG) << "ReadData success for chunk id: " << chunk
796 << "Sector: " << header->sector;
797 }
798 }
799 }
800
801 // Just return the header if it is an error
802 if (header->type == DM_USER_RESP_ERROR) {
803 SNAP_LOG(ERROR) << "IO read request failed...";
804 ret = 0;
805 }
806
807 if (!header_response) {
808 CHECK(header->type == DM_USER_RESP_SUCCESS)
809 << " failed for sector: " << sector << " header->len: " << header->len
810 << " remaining_size: " << remaining_size;
811 }
812
813 // Daemon will not be terminated if there is any error. We will
814 // just send the error back to dm-user.
815 if (!WriteDmUserPayload(ret, header_response)) {
816 return false;
817 }
818
819 if (header->type == DM_USER_RESP_ERROR) {
820 break;
821 }
822
823 remaining_size -= ret;
824 offset += ret;
825 header_response = false;
826 } while (remaining_size > 0);
827
828 return true;
829 }
830
InitializeBufsink()831 void WorkerThread::InitializeBufsink() {
832 // Allocate the buffer which is used to communicate between
833 // daemon and dm-user. The buffer comprises of header and a fixed payload.
834 // If the dm-user requests a big IO, the IO will be broken into chunks
835 // of PAYLOAD_SIZE.
836 size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
837 bufsink_.Initialize(buf_size);
838 }
839
RunThread()840 bool WorkerThread::RunThread() {
841 InitializeBufsink();
842
843 if (!InitializeFds()) {
844 return false;
845 }
846
847 if (!InitReader()) {
848 return false;
849 }
850
851 // Start serving IO
852 while (true) {
853 if (!ProcessIORequest()) {
854 break;
855 }
856 }
857
858 CloseFds();
859 reader_->CloseCowFd();
860
861 return true;
862 }
863
ProcessIORequest()864 bool WorkerThread::ProcessIORequest() {
865 struct dm_user_header* header = bufsink_.GetHeaderPtr();
866
867 if (!ReadDmUserHeader()) {
868 return false;
869 }
870
871 SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
872 SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
873 SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
874 SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
875 SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
876
877 switch (header->type) {
878 case DM_USER_REQ_MAP_READ: {
879 if (!DmuserReadRequest()) {
880 return false;
881 }
882 break;
883 }
884
885 case DM_USER_REQ_MAP_WRITE: {
886 if (!DmuserWriteRequest()) {
887 return false;
888 }
889 break;
890 }
891 }
892
893 return true;
894 }
895
896 } // namespace snapshot
897 } // namespace android
898