1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <sys/types.h>
18 #include <unistd.h>
19
20 #include <limits>
21 #include <queue>
22
23 #include <android-base/file.h>
24 #include <android-base/logging.h>
25 #include <android-base/unique_fd.h>
26 #include <brotli/encode.h>
27 #include <libsnapshot/cow_format.h>
28 #include <libsnapshot/cow_reader.h>
29 #include <libsnapshot/cow_writer.h>
30 #include <lz4.h>
31 #include <zlib.h>
32
33 namespace android {
34 namespace snapshot {
Compress(const void * data,size_t length)35 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
36 return Compress(compression_, data, length);
37 }
38
Compress(CowCompressionAlgorithm compression,const void * data,size_t length)39 std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
40 const void* data, size_t length) {
41 switch (compression) {
42 case kCowCompressGz: {
43 const auto bound = compressBound(length);
44 std::basic_string<uint8_t> buffer(bound, '\0');
45
46 uLongf dest_len = bound;
47 auto rv = compress2(buffer.data(), &dest_len, reinterpret_cast<const Bytef*>(data),
48 length, Z_BEST_COMPRESSION);
49 if (rv != Z_OK) {
50 LOG(ERROR) << "compress2 returned: " << rv;
51 return {};
52 }
53 buffer.resize(dest_len);
54 return buffer;
55 }
56 case kCowCompressBrotli: {
57 const auto bound = BrotliEncoderMaxCompressedSize(length);
58 if (!bound) {
59 LOG(ERROR) << "BrotliEncoderMaxCompressedSize returned 0";
60 return {};
61 }
62 std::basic_string<uint8_t> buffer(bound, '\0');
63
64 size_t encoded_size = bound;
65 auto rv = BrotliEncoderCompress(
66 BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, length,
67 reinterpret_cast<const uint8_t*>(data), &encoded_size, buffer.data());
68 if (!rv) {
69 LOG(ERROR) << "BrotliEncoderCompress failed";
70 return {};
71 }
72 buffer.resize(encoded_size);
73 return buffer;
74 }
75 case kCowCompressLz4: {
76 const auto bound = LZ4_compressBound(length);
77 if (!bound) {
78 LOG(ERROR) << "LZ4_compressBound returned 0";
79 return {};
80 }
81 std::basic_string<uint8_t> buffer(bound, '\0');
82
83 const auto compressed_size = LZ4_compress_default(
84 static_cast<const char*>(data), reinterpret_cast<char*>(buffer.data()), length,
85 buffer.size());
86 if (compressed_size <= 0) {
87 LOG(ERROR) << "LZ4_compress_default failed, input size: " << length
88 << ", compression bound: " << bound << ", ret: " << compressed_size;
89 return {};
90 }
91 // Don't run compression if the compressed output is larger
92 if (compressed_size >= length) {
93 buffer.resize(length);
94 memcpy(buffer.data(), data, length);
95 } else {
96 buffer.resize(compressed_size);
97 }
98 return buffer;
99 }
100 default:
101 LOG(ERROR) << "unhandled compression type: " << compression;
102 break;
103 }
104 return {};
105 }
CompressBlocks(const void * buffer,size_t num_blocks,std::vector<std::basic_string<uint8_t>> * compressed_data)106 bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
107 std::vector<std::basic_string<uint8_t>>* compressed_data) {
108 return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
109 }
110
CompressBlocks(CowCompressionAlgorithm compression,size_t block_size,const void * buffer,size_t num_blocks,std::vector<std::basic_string<uint8_t>> * compressed_data)111 bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
112 const void* buffer, size_t num_blocks,
113 std::vector<std::basic_string<uint8_t>>* compressed_data) {
114 const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
115 while (num_blocks) {
116 auto data = Compress(compression, iter, block_size);
117 if (data.empty()) {
118 PLOG(ERROR) << "CompressBlocks: Compression failed";
119 return false;
120 }
121 if (data.size() > std::numeric_limits<uint16_t>::max()) {
122 LOG(ERROR) << "Compressed block is too large: " << data.size();
123 return false;
124 }
125
126 compressed_data->emplace_back(std::move(data));
127 num_blocks -= 1;
128 iter += block_size;
129 }
130 return true;
131 }
132
RunThread()133 bool CompressWorker::RunThread() {
134 while (true) {
135 // Wait for work
136 CompressWork blocks;
137 {
138 std::unique_lock<std::mutex> lock(lock_);
139 while (work_queue_.empty() && !stopped_) {
140 cv_.wait(lock);
141 }
142
143 if (stopped_) {
144 return true;
145 }
146
147 blocks = std::move(work_queue_.front());
148 work_queue_.pop();
149 }
150
151 // Compress blocks
152 bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
153 blocks.compression_status = ret;
154 {
155 std::lock_guard<std::mutex> lock(lock_);
156 compressed_queue_.push(std::move(blocks));
157 }
158
159 // Notify completion
160 cv_.notify_all();
161
162 if (!ret) {
163 LOG(ERROR) << "CompressBlocks failed";
164 return false;
165 }
166 }
167
168 return true;
169 }
170
EnqueueCompressBlocks(const void * buffer,size_t num_blocks)171 void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
172 {
173 std::lock_guard<std::mutex> lock(lock_);
174
175 CompressWork blocks = {};
176 blocks.buffer = buffer;
177 blocks.num_blocks = num_blocks;
178 work_queue_.push(std::move(blocks));
179 }
180 cv_.notify_all();
181 }
182
GetCompressedBuffers(std::vector<std::basic_string<uint8_t>> * compressed_buf)183 bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
184 {
185 std::unique_lock<std::mutex> lock(lock_);
186 while (compressed_queue_.empty() && !stopped_) {
187 cv_.wait(lock);
188 }
189
190 if (stopped_) {
191 return true;
192 }
193 }
194
195 {
196 std::lock_guard<std::mutex> lock(lock_);
197 while (compressed_queue_.size() > 0) {
198 CompressWork blocks = std::move(compressed_queue_.front());
199 compressed_queue_.pop();
200
201 if (blocks.compression_status) {
202 compressed_buf->insert(compressed_buf->end(),
203 std::make_move_iterator(blocks.compressed_data.begin()),
204 std::make_move_iterator(blocks.compressed_data.end()));
205 } else {
206 LOG(ERROR) << "Block compression failed";
207 return false;
208 }
209 }
210 }
211
212 return true;
213 }
214
Finalize()215 void CompressWorker::Finalize() {
216 {
217 std::unique_lock<std::mutex> lock(lock_);
218 stopped_ = true;
219 }
220 cv_.notify_all();
221 }
222
CompressWorker(CowCompressionAlgorithm compression,uint32_t block_size)223 CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
224 : compression_(compression), block_size_(block_size) {}
225
226 } // namespace snapshot
227 } // namespace android
228