1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <sys/types.h>
18 #include <unistd.h>
19 
20 #include <limits>
21 #include <queue>
22 
23 #include <android-base/file.h>
24 #include <android-base/logging.h>
25 #include <android-base/unique_fd.h>
26 #include <brotli/encode.h>
27 #include <libsnapshot/cow_format.h>
28 #include <libsnapshot/cow_reader.h>
29 #include <libsnapshot/cow_writer.h>
30 #include <lz4.h>
31 #include <zlib.h>
32 
33 namespace android {
34 namespace snapshot {
Compress(const void * data,size_t length)35 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
36     return Compress(compression_, data, length);
37 }
38 
Compress(CowCompressionAlgorithm compression,const void * data,size_t length)39 std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
40                                                     const void* data, size_t length) {
41     switch (compression) {
42         case kCowCompressGz: {
43             const auto bound = compressBound(length);
44             std::basic_string<uint8_t> buffer(bound, '\0');
45 
46             uLongf dest_len = bound;
47             auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data),
48                                 length, Z_BEST_COMPRESSION);
49             if (rv != Z_OK) {
50                 LOG(ERROR) << "compress2 returned: " << rv;
51                 return {};
52             }
53             buffer.resize(dest_len);
54             return buffer;
55         }
56         case kCowCompressBrotli: {
57             const auto bound = BrotliEncoderMaxCompressedSize(length);
58             if (!bound) {
59                 LOG(ERROR) << "BrotliEncoderMaxCompressedSize returned 0";
60                 return {};
61             }
62             std::basic_string<uint8_t> buffer(bound, '\0');
63 
64             size_t encoded_size = bound;
65             auto rv = BrotliEncoderCompress(
66                     BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, length,
67                     reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data());
68             if (!rv) {
69                 LOG(ERROR) << "BrotliEncoderCompress failed";
70                 return {};
71             }
72             buffer.resize(encoded_size);
73             return buffer;
74         }
75         case kCowCompressLz4: {
76             const auto bound = LZ4_compressBound(length);
77             if (!bound) {
78                 LOG(ERROR) << "LZ4_compressBound returned 0";
79                 return {};
80             }
81             std::basic_string<uint8_t> buffer(bound, '\0');
82 
83             const auto compressed_size = LZ4_compress_default(
84                     static_cast<const char*>(data), reinterpret_cast<char*>(buffer.data()), length,
85                     buffer.size());
86             if (compressed_size <= 0) {
87                 LOG(ERROR) << "LZ4_compress_default failed, input size: " << length
88                            << ", compression bound: " << bound << ", ret: " << compressed_size;
89                 return {};
90             }
91             // Don't run compression if the compressed output is larger
92             if (compressed_size >= length) {
93                 buffer.resize(length);
94                 memcpy(buffer.data(), data, length);
95             } else {
96                 buffer.resize(compressed_size);
97             }
98             return buffer;
99         }
100         default:
101             LOG(ERROR) << "unhandled compression type: " << compression;
102             break;
103     }
104     return {};
105 }
CompressBlocks(const void * buffer,size_t num_blocks,std::vector<std::basic_string<uint8_t>> * compressed_data)106 bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
107                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
108     return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
109 }
110 
CompressBlocks(CowCompressionAlgorithm compression,size_t block_size,const void * buffer,size_t num_blocks,std::vector<std::basic_string<uint8_t>> * compressed_data)111 bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
112                                     const void* buffer, size_t num_blocks,
113                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
114     const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
115     while (num_blocks) {
116         auto data = Compress(compression, iter, block_size);
117         if (data.empty()) {
118             PLOG(ERROR) << "CompressBlocks: Compression failed";
119             return false;
120         }
121         if (data.size() > std::numeric_limits<uint16_t>::max()) {
122             LOG(ERROR) << "Compressed block is too large: " << data.size();
123             return false;
124         }
125 
126         compressed_data->emplace_back(std::move(data));
127         num_blocks -= 1;
128         iter += block_size;
129     }
130     return true;
131 }
132 
RunThread()133 bool CompressWorker::RunThread() {
134     while (true) {
135         // Wait for work
136         CompressWork blocks;
137         {
138             std::unique_lock<std::mutex> lock(lock_);
139             while (work_queue_.empty() && !stopped_) {
140                 cv_.wait(lock);
141             }
142 
143             if (stopped_) {
144                 return true;
145             }
146 
147             blocks = std::move(work_queue_.front());
148             work_queue_.pop();
149         }
150 
151         // Compress blocks
152         bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
153         blocks.compression_status = ret;
154         {
155             std::lock_guard<std::mutex> lock(lock_);
156             compressed_queue_.push(std::move(blocks));
157         }
158 
159         // Notify completion
160         cv_.notify_all();
161 
162         if (!ret) {
163             LOG(ERROR) << "CompressBlocks failed";
164             return false;
165         }
166     }
167 
168     return true;
169 }
170 
EnqueueCompressBlocks(const void * buffer,size_t num_blocks)171 void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
172     {
173         std::lock_guard<std::mutex> lock(lock_);
174 
175         CompressWork blocks = {};
176         blocks.buffer = buffer;
177         blocks.num_blocks = num_blocks;
178         work_queue_.push(std::move(blocks));
179     }
180     cv_.notify_all();
181 }
182 
GetCompressedBuffers(std::vector<std::basic_string<uint8_t>> * compressed_buf)183 bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
184     {
185         std::unique_lock<std::mutex> lock(lock_);
186         while (compressed_queue_.empty() && !stopped_) {
187             cv_.wait(lock);
188         }
189 
190         if (stopped_) {
191             return true;
192         }
193     }
194 
195     {
196         std::lock_guard<std::mutex> lock(lock_);
197         while (compressed_queue_.size() > 0) {
198             CompressWork blocks = std::move(compressed_queue_.front());
199             compressed_queue_.pop();
200 
201             if (blocks.compression_status) {
202                 compressed_buf->insert(compressed_buf->end(),
203                                        std::make_move_iterator(blocks.compressed_data.begin()),
204                                        std::make_move_iterator(blocks.compressed_data.end()));
205             } else {
206                 LOG(ERROR) << "Block compression failed";
207                 return false;
208             }
209         }
210     }
211 
212     return true;
213 }
214 
Finalize()215 void CompressWorker::Finalize() {
216     {
217         std::unique_lock<std::mutex> lock(lock_);
218         stopped_ = true;
219     }
220     cv_.notify_all();
221 }
222 
CompressWorker(CowCompressionAlgorithm compression,uint32_t block_size)223 CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
224     : compression_(compression), block_size_(block_size) {}
225 
226 }  // namespace snapshot
227 }  // namespace android
228