1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "prefetcher/minijail.h"
16 #include "common/cmd_utils.h"
17 #include "prefetcher/prefetcher_daemon.h"
18 #include "prefetcher/session_manager.h"
19 #include "prefetcher/session.h"
20
21 #include <android-base/logging.h>
22 #include <android-base/properties.h>
23
24 #include <deque>
25 #include <iomanip>
26 #include <string>
27 #include <sstream>
28 #include <vector>
29
30 #include <fcntl.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <sys/un.h>
36 #include <unistd.h>
37
38 namespace iorap::prefetcher {
39
40 // Gate super-spammy IPC logging behind a property.
41 // This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower.
LogVerboseIpc()42 static bool LogVerboseIpc() {
43 static bool initialized = false;
44 static bool verbose_ipc;
45
46 if (initialized == false) {
47 initialized = true;
48
49 verbose_ipc =
50 ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false);
51 }
52
53 return verbose_ipc;
54 }
55
56 static const bool kInstallMiniJail =
57 ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true);
58
59 static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd";
60
61 static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size
62
63 using ArgString = const char*;
64
operator <<(std::ostream & os,ReadAheadKind ps)65 std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) {
66 switch (ps) {
67 case ReadAheadKind::kFadvise:
68 os << "fadvise";
69 break;
70 case ReadAheadKind::kMmapLocked:
71 os << "mmap";
72 break;
73 case ReadAheadKind::kMlock:
74 os << "mlock";
75 break;
76 default:
77 os << "<invalid>";
78 }
79 return os;
80 }
81
operator <<(std::ostream & os,CommandChoice choice)82 std::ostream& operator<<(std::ostream& os, CommandChoice choice) {
83 switch (choice) {
84 case CommandChoice::kRegisterFilePath:
85 os << "kRegisterFilePath";
86 break;
87 case CommandChoice::kUnregisterFilePath:
88 os << "kUnregisterFilePath";
89 break;
90 case CommandChoice::kReadAhead:
91 os << "kReadAhead";
92 break;
93 case CommandChoice::kExit:
94 os << "kExit";
95 break;
96 case CommandChoice::kCreateSession:
97 os << "kCreateSession";
98 break;
99 case CommandChoice::kDestroySession:
100 os << "kDestroySession";
101 break;
102 case CommandChoice::kDumpSession:
103 os << "kDumpSession";
104 break;
105 case CommandChoice::kDumpEverything:
106 os << "kDumpEverything";
107 break;
108 case CommandChoice::kCreateFdSession:
109 os << "kCreateFdSession";
110 break;
111 default:
112 CHECK(false) << "forgot to handle this choice";
113 break;
114 }
115 return os;
116 }
117
operator <<(std::ostream & os,const Command & command)118 std::ostream& operator<<(std::ostream& os, const Command& command) {
119 os << "Command{";
120 os << "choice=" << command.choice << ",";
121
122 bool has_session_id = true;
123 bool has_id = true;
124 switch (command.choice) {
125 case CommandChoice::kDumpEverything:
126 case CommandChoice::kExit:
127 has_session_id = false;
128 FALLTHROUGH_INTENDED;
129 case CommandChoice::kCreateFdSession:
130 case CommandChoice::kCreateSession:
131 case CommandChoice::kDestroySession:
132 case CommandChoice::kDumpSession:
133 has_id = false;
134 break;
135 default:
136 break;
137 }
138
139 if (has_session_id) {
140 os << "sid=" << command.session_id << ",";
141 }
142
143 if (has_id) {
144 os << "id=" << command.id << ",";
145 }
146
147 switch (command.choice) {
148 case CommandChoice::kRegisterFilePath:
149 os << "file_path=";
150
151 if (command.file_path) {
152 os << *(command.file_path);
153 } else {
154 os << "(nullopt)";
155 }
156 break;
157 case CommandChoice::kUnregisterFilePath:
158 break;
159 case CommandChoice::kReadAhead:
160 os << "read_ahead_kind=" << command.read_ahead_kind << ",";
161 os << "length=" << command.length << ",";
162 os << "offset=" << command.offset << ",";
163 break;
164 case CommandChoice::kExit:
165 break;
166 case CommandChoice::kCreateFdSession:
167 os << "fd=";
168 if (command.fd.has_value()) {
169 os << command.fd.value();
170 } else {
171 os << "(nullopt)";
172 }
173 os << ",";
174 FALLTHROUGH_INTENDED;
175 case CommandChoice::kCreateSession:
176 os << "description=";
177 if (command.file_path) {
178 os << "'" << *(command.file_path) << "'";
179 } else {
180 os << "(nullopt)";
181 }
182 break;
183 case CommandChoice::kDestroySession:
184 break;
185 case CommandChoice::kDumpSession:
186 break;
187 case CommandChoice::kDumpEverything:
188 break;
189 default:
190 CHECK(false) << "forgot to handle this choice";
191 break;
192 }
193
194 os << "}";
195
196 return os;
197 }
198
199 template <typename T>
200 struct ParseResult {
201 T value;
202 char* next_token;
203 size_t stream_size;
204
ParseResultiorap::prefetcher::ParseResult205 ParseResult() : value{}, next_token{nullptr}, stream_size{} {
206 }
207
208 constexpr operator bool() const {
209 return next_token != nullptr;
210 }
211 };
212
213 // Very spammy: Keep it off by default. Set to true if changing this code.
214 static constexpr bool kDebugParsingRead = false;
215
216 #define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead "
217
218
219
220 // Parse a strong type T from a buffer stream.
221 // If there's insufficient space left to parse the value, an empty ParseResult is returned.
222 template <typename T>
ParsingRead(char * stream,size_t stream_size)223 ParseResult<T> ParsingRead(char* stream, size_t stream_size) {
224 if (stream == nullptr) {
225 DEBUG_PREAD << "stream was null";
226 return {};
227 }
228
229 if constexpr (std::is_same_v<T, std::string>) {
230 ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size);
231
232 if (!length) {
233 DEBUG_PREAD << "could not find length";
234 // Not enough bytes left?
235 return {};
236 }
237
238 ParseResult<std::string> string_result;
239 string_result.value.reserve(length);
240
241 stream = length.next_token;
242 stream_size = length.stream_size;
243
244 for (size_t i = 0; i < length.value; ++i) {
245 ParseResult<char> char_result = ParsingRead<char>(stream, stream_size);
246
247 stream = char_result.next_token;
248 stream_size = char_result.stream_size;
249
250 if (!char_result) {
251 DEBUG_PREAD << "too few chars in stream, expected length: " << length.value;
252 // Not enough bytes left?
253 return {};
254 }
255
256 string_result.value += char_result.value;
257
258 DEBUG_PREAD << "string preliminary is : " << string_result.value;
259 }
260
261 DEBUG_PREAD << "parsed string to: " << string_result.value;
262 string_result.next_token = stream;
263 return string_result;
264 } else {
265 if (sizeof(T) > stream_size) {
266 return {};
267 }
268
269 ParseResult<T> result;
270 result.next_token = stream + sizeof(T);
271 result.stream_size = stream_size - sizeof(T);
272
273 memcpy(&result.value, stream, sizeof(T));
274
275 return result;
276 }
277 }
278
279 // Convenience overload to chain multiple ParsingRead together.
280 template <typename T, typename U>
ParsingRead(ParseResult<U> result)281 ParseResult<T> ParsingRead(ParseResult<U> result) {
282 return ParsingRead<T>(result.next_token, result.stream_size);
283 }
284
285 class CommandParser {
286 public:
CommandParser(PrefetcherForkParameters params)287 CommandParser(PrefetcherForkParameters params) {
288 params_ = params;
289 }
290
ParseSocketCommands(bool & eof)291 std::vector<Command> ParseSocketCommands(bool& eof) {
292 eof = false;
293
294 std::vector<Command> commands_vec;
295
296 std::vector<char> buf_vector;
297 buf_vector.resize(1024*1024); // 1MB.
298 char* buf = &buf_vector[0];
299
300 // Binary only parsing. The higher level code can parse text
301 // with ifstream if it really wants to.
302 char* stream = &buf[0];
303 size_t stream_size = buf_vector.size();
304
305 while (true) {
306 if (stream_size == 0) {
307 // TODO: reply with an overflow command.
308 LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
309 stream = &buf[0];
310 stream_size = buf_vector.size();
311 memset(&buf[0], /*c*/0, buf_vector.size());
312 }
313
314 if (LogVerboseIpc()) {
315 LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")";
316 }
317
318 ssize_t count;
319 struct msghdr hdr;
320 memset(&hdr, 0, sizeof(hdr));
321
322 {
323 union {
324 struct cmsghdr cmh;
325 char control[CMSG_SPACE(sizeof(int))];
326 } control_un;
327 memset(&control_un, 0, sizeof(control_un));
328
329 /* Set 'control_un' to describe ancillary data that we want to receive */
330 control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */
331 control_un.cmh.cmsg_level = SOL_SOCKET;
332 control_un.cmh.cmsg_type = SCM_CREDENTIALS;
333
334 // the regular message data will be read into stream
335 struct iovec iov;
336 memset(&iov, 0, sizeof(iov));
337 iov.iov_base = stream;
338 iov.iov_len = stream_size;
339
340 /* Set hdr fields to describe 'control_un' */
341 hdr.msg_control = control_un.control;
342 hdr.msg_controllen = sizeof(control_un.control);
343 hdr.msg_iov = &iov;
344 hdr.msg_iovlen = 1;
345 hdr.msg_name = nullptr; /* no peer address */
346 hdr.msg_namelen = 0;
347
348 count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0));
349 }
350
351 if (LogVerboseIpc()) {
352 LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size;
353 }
354
355 if (count < 0) {
356 PLOG(ERROR) << "failed to recvmsg from input fd";
357 break;
358 // TODO: let the daemon be restarted by higher level code?
359 } else if (count == 0) {
360 LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
361 eof = true;
362 break;
363 // TODO: let the daemon be restarted by higher level code?
364 }
365
366 {
367 /* Extract fd from ancillary data if present */
368 struct cmsghdr* hp;
369 hp = CMSG_FIRSTHDR(&hdr);
370 if (hp &&
371 // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing?
372 // (hp->cmsg_len == CMSG_LEN(sizeof(int))) &&
373 (hp->cmsg_level == SOL_SOCKET) &&
374 (hp->cmsg_type == SCM_RIGHTS)) {
375
376 int passed_fd = *(int*) CMSG_DATA(hp);
377 if (LogVerboseIpc()) {
378 LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd;
379 }
380
381 // tack the FD into our dequeue.
382 // we assume the FDs are sent in-order same as the regular iov are sent in-order.
383 longbuf_fds_.insert(longbuf_fds_.end(), passed_fd);
384 } else if (hp != nullptr) {
385 if (LogVerboseIpc()) {
386 LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS,"
387 << "cmsg_len=" << hp->cmsg_len << ","
388 << "cmsg_level=" << hp->cmsg_level << ","
389 << "cmsg_type=" << hp->cmsg_type;
390 }
391 }
392 }
393
394 longbuf_.insert(longbuf_.end(), stream, stream + count);
395 if (LogVerboseIpc()) {
396 LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
397 }
398
399 // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]*
400 {
401 if (longbuf_.size() == 0) {
402 break;
403 }
404
405 std::vector<char> v(longbuf_.begin(),
406 longbuf_.end());
407
408 std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()};
409
410 if (LogVerboseIpc()) {
411 LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
412 if (WOULD_LOG(VERBOSE)) {
413 std::stringstream dump;
414 dump << std::hex << std::setfill('0');
415 for (size_t i = 0; i < v.size(); ++i) {
416 dump << std::setw(2) << static_cast<unsigned>(v[i]);
417 }
418
419 LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
420 }
421 LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size();
422 if (WOULD_LOG(VERBOSE)) {
423 std::stringstream dump;
424 for (size_t i = 0; i < v_fds.size(); ++i) {
425 dump << v_fds[i] << ", ";
426 }
427
428 LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str();
429 }
430
431 }
432
433 size_t v_fds_off = 0;
434 size_t consumed_fds_total = 0;
435
436 size_t v_off = 0;
437 size_t consumed_bytes = std::numeric_limits<size_t>::max();
438 size_t consumed_total = 0;
439
440 while (true) {
441 std::optional<Command> maybe_command;
442 maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
443 consumed_total += consumed_bytes;
444 // Normal every time we get to the end of a buffer.
445 if (!maybe_command) {
446 if (LogVerboseIpc()) {
447 LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
448 }
449 break;
450 }
451
452 if (maybe_command->RequiresFd()) {
453 if (v_fds_off < v_fds.size()) {
454 maybe_command->fd = v_fds[v_fds_off++];
455 consumed_fds_total++;
456 if (LogVerboseIpc()) {
457 LOG(VERBOSE) << "Append the FD to " << *maybe_command;
458 }
459 } else {
460 LOG(WARNING) << "Failed to acquire FD for " << *maybe_command;
461 }
462 }
463
464 // in the next pass ignore what we already consumed.
465 v_off += consumed_bytes;
466
467 // true as long we don't hit the 'break' above.
468 DCHECK_EQ(v_off, consumed_total);
469 if (LogVerboseIpc()) {
470 LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
471 << "," << *maybe_command;
472
473 // Pretty-print a single command for debugging/testing.
474 LOG(VERBOSE) << *maybe_command;
475 }
476
477 // add to the commands we parsed.
478 commands_vec.push_back(*maybe_command);
479 }
480
481 // erase however many were consumed
482 longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
483
484 // erase however many FDs were consumed.
485 longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total);
486 }
487 break;
488 }
489
490 return commands_vec;
491 }
492
ParseCommands(bool & eof)493 std::vector<Command> ParseCommands(bool& eof) {
494 eof = false;
495
496 std::vector<Command> commands_vec;
497
498 std::vector<char> buf_vector;
499 buf_vector.resize(kPipeBufferSize);
500 char* buf = &buf_vector[0];
501
502 // Binary only parsing. The higher level code can parse text
503 // with ifstream if it really wants to.
504 char* stream = &buf[0];
505 size_t stream_size = buf_vector.size();
506
507 while (true) {
508 if (stream_size == 0) {
509 // TODO: reply with an overflow command.
510 LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
511 stream = &buf[0];
512 stream_size = buf_vector.size();
513 memset(&buf[0], /*c*/0, buf_vector.size());
514 }
515
516 if (LogVerboseIpc()) {
517 LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")";
518 }
519 ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size));
520 if (LogVerboseIpc()) {
521 LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size;
522 }
523
524 if (count < 0) {
525 PLOG(ERROR) << "failed to read from input fd";
526 break;
527 // TODO: let the daemon be restarted by higher level code?
528 } else if (count == 0) {
529 LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
530 eof = true;
531 break;
532 // TODO: let the daemon be restarted by higher level code?
533 }
534
535 longbuf_.insert(longbuf_.end(), stream, stream + count);
536 if (LogVerboseIpc()) {
537 LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
538 }
539
540 std::optional<Command> maybe_command;
541 {
542 if (longbuf_.size() == 0) {
543 break;
544 }
545
546 std::vector<char> v(longbuf_.begin(),
547 longbuf_.end());
548
549 if (LogVerboseIpc()) {
550 LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
551 if (WOULD_LOG(VERBOSE)) {
552 std::stringstream dump;
553 dump << std::hex << std::setfill('0');
554 for (size_t i = 0; i < v.size(); ++i) {
555 dump << std::setw(2) << static_cast<unsigned>(v[i]);
556 }
557
558 LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
559 }
560 }
561
562 size_t v_off = 0;
563 size_t consumed_bytes = std::numeric_limits<size_t>::max();
564 size_t consumed_total = 0;
565
566 while (true) {
567 maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
568 consumed_total += consumed_bytes;
569 // Normal every time we get to the end of a buffer.
570 if (!maybe_command) {
571 if (LogVerboseIpc()) {
572 LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
573 }
574 break;
575 }
576
577 // in the next pass ignore what we already consumed.
578 v_off += consumed_bytes;
579
580 // true as long we don't hit the 'break' above.
581 DCHECK_EQ(v_off, consumed_total);
582 if (LogVerboseIpc()) {
583 LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
584 << "," << *maybe_command;
585
586 // Pretty-print a single command for debugging/testing.
587 LOG(VERBOSE) << *maybe_command;
588 }
589
590 // add to the commands we parsed.
591 commands_vec.push_back(*maybe_command);
592 }
593
594 // erase however many were consumed
595 longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
596 }
597 break;
598 }
599
600 return commands_vec;
601 }
602
603 private:
IsTextMode() const604 bool IsTextMode() const {
605 return params_.format_text;
606 }
607
608 PrefetcherForkParameters params_;
609
610 // A buffer long enough to contain a lot of buffers.
611 // This handles reads that only contain a partial command.
612 std::deque<char> longbuf_;
613
614 // File descriptor buffers.
615 std::deque<int> longbuf_fds_;
616 };
617
618 static constexpr bool kDebugCommandRead = true;
619
620 #define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read "
621
Read(char * buf,size_t buf_size,size_t * consumed_bytes)622 std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) {
623 *consumed_bytes = 0;
624 if (buf == nullptr) {
625 return std::nullopt;
626 }
627
628 Command cmd{}; // zero-initialize any unused fields
629 ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size);
630 cmd.choice = parsed_choice.value;
631
632 if (!parsed_choice) {
633 DEBUG_READ << "no choice";
634 return std::nullopt;
635 }
636
637 switch (parsed_choice.value) {
638 case CommandChoice::kRegisterFilePath: {
639 ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
640 if (!parsed_session_id) {
641 DEBUG_READ << "no parsed session id";
642 return std::nullopt;
643 }
644
645 ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
646 if (!parsed_id) {
647 DEBUG_READ << "no parsed id";
648 return std::nullopt;
649 }
650
651 ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id);
652
653 if (!parsed_file_path) {
654 DEBUG_READ << "no file path";
655 return std::nullopt;
656 }
657 *consumed_bytes = parsed_file_path.next_token - buf;
658
659 cmd.session_id = parsed_session_id.value;
660 cmd.id = parsed_id.value;
661 cmd.file_path = parsed_file_path.value;
662
663 break;
664 }
665 case CommandChoice::kUnregisterFilePath: {
666 ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
667 if (!parsed_session_id) {
668 DEBUG_READ << "no parsed session id";
669 return std::nullopt;
670 }
671
672 ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
673 if (!parsed_id) {
674 DEBUG_READ << "no parsed id";
675 return std::nullopt;
676 }
677 *consumed_bytes = parsed_id.next_token - buf;
678
679 cmd.session_id = parsed_session_id.value;
680 cmd.id = parsed_id.value;
681
682 break;
683 }
684 case CommandChoice::kReadAhead: {
685 ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
686 if (!parsed_session_id) {
687 DEBUG_READ << "no parsed session id";
688 return std::nullopt;
689 }
690
691 ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
692 if (!parsed_id) {
693 DEBUG_READ << "no parsed id";
694 return std::nullopt;
695 }
696
697 ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id);
698 if (!parsed_kind) {
699 DEBUG_READ << "no parsed kind";
700 return std::nullopt;
701 }
702 ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind);
703 if (!parsed_length) {
704 DEBUG_READ << "no parsed length";
705 return std::nullopt;
706 }
707 ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length);
708 if (!parsed_offset) {
709 DEBUG_READ << "no parsed offset";
710 return std::nullopt;
711 }
712 *consumed_bytes = parsed_offset.next_token - buf;
713
714 cmd.session_id = parsed_session_id.value;
715 cmd.id = parsed_id.value;
716 cmd.read_ahead_kind = parsed_kind.value;
717 cmd.length = parsed_length.value;
718 cmd.offset = parsed_offset.value;
719
720 break;
721 }
722 case CommandChoice::kCreateSession:
723 case CommandChoice::kCreateFdSession: {
724 ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
725 if (!parsed_session_id) {
726 DEBUG_READ << "no parsed session id";
727 return std::nullopt;
728 }
729
730 ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id);
731
732 if (!parsed_description) {
733 DEBUG_READ << "no description";
734 return std::nullopt;
735 }
736 *consumed_bytes = parsed_description.next_token - buf;
737
738 cmd.session_id = parsed_session_id.value;
739 cmd.file_path = parsed_description.value;
740
741 break;
742 }
743 case CommandChoice::kDestroySession:
744 case CommandChoice::kDumpSession: {
745 ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
746 if (!parsed_session_id) {
747 DEBUG_READ << "no parsed session id";
748 return std::nullopt;
749 }
750
751 *consumed_bytes = parsed_session_id.next_token - buf;
752
753 cmd.session_id = parsed_session_id.value;
754
755 break;
756 }
757 case CommandChoice::kExit:
758 case CommandChoice::kDumpEverything:
759 *consumed_bytes = parsed_choice.next_token - buf;
760 // Only need to parse the choice.
761 break;
762 default:
763 LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value);
764 break;
765 }
766
767 return cmd;
768 }
769
Write(char * buf,size_t buf_size,size_t * produced_bytes) const770 bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const {
771 *produced_bytes = 0;
772 if (buf == nullptr) {
773 LOG(WARNING) << "null buf, is this expected?";
774 return false;
775 }
776
777 bool has_enough_space = false;
778 size_t space_requirement = std::numeric_limits<size_t>::max();
779
780 space_requirement = sizeof(choice);
781
782 switch (choice) {
783 case CommandChoice::kRegisterFilePath:
784 space_requirement += sizeof(session_id);
785 space_requirement += sizeof(id);
786 space_requirement += sizeof(uint32_t); // string length
787
788 if (!file_path) {
789 LOG(WARNING) << "Missing file path for kRegisterFilePath";
790 return false;
791 }
792
793 space_requirement += file_path->size(); // string contents
794 break;
795 case CommandChoice::kUnregisterFilePath:
796 space_requirement += sizeof(session_id);
797 space_requirement += sizeof(id);
798 break;
799 case CommandChoice::kReadAhead:
800 space_requirement += sizeof(session_id);
801 space_requirement += sizeof(id);
802 space_requirement += sizeof(read_ahead_kind);
803 space_requirement += sizeof(length);
804 space_requirement += sizeof(offset);
805 break;
806 case CommandChoice::kCreateSession:
807 case CommandChoice::kCreateFdSession:
808 space_requirement += sizeof(session_id);
809 space_requirement += sizeof(uint32_t); // string length
810
811 if (!file_path) {
812 LOG(WARNING) << "Missing file path for kCreateSession";
813 return false;
814 }
815
816 space_requirement += file_path->size(); // string contents
817 break;
818 case CommandChoice::kDestroySession:
819 case CommandChoice::kDumpSession:
820 space_requirement += sizeof(session_id);
821 break;
822 case CommandChoice::kExit:
823 case CommandChoice::kDumpEverything:
824 // Only need space for the choice.
825 break;
826 default:
827 LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice);
828 break;
829 }
830
831 if (buf_size < space_requirement) {
832 return false;
833 }
834
835 *produced_bytes = space_requirement;
836
837 // Always write out the choice.
838 size_t buf_offset = 0;
839
840 memcpy(&buf[buf_offset], &choice, sizeof(choice));
841 buf_offset += sizeof(choice);
842
843 switch (choice) {
844 case CommandChoice::kRegisterFilePath:
845 memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
846 buf_offset += sizeof(session_id);
847 memcpy(&buf[buf_offset], &id, sizeof(id));
848 buf_offset += sizeof(id);
849
850 {
851 uint32_t string_length = static_cast<uint32_t>(file_path->size());
852 memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
853 buf_offset += sizeof(string_length);
854 }
855
856 DCHECK(file_path.has_value());
857
858 memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
859 buf_offset += file_path->size();
860 break;
861 case CommandChoice::kUnregisterFilePath:
862 memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
863 buf_offset += sizeof(session_id);
864 memcpy(&buf[buf_offset], &id, sizeof(id));
865 buf_offset += sizeof(id);
866 break;
867 case CommandChoice::kReadAhead:
868 memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
869 buf_offset += sizeof(session_id);
870 memcpy(&buf[buf_offset], &id, sizeof(id));
871 buf_offset += sizeof(id);
872 memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind));
873 buf_offset += sizeof(read_ahead_kind);
874 memcpy(&buf[buf_offset], &length, sizeof(length));
875 buf_offset += sizeof(length);
876 memcpy(&buf[buf_offset], &offset, sizeof(offset));
877 buf_offset += sizeof(offset);
878 break;
879 case CommandChoice::kCreateSession:
880 case CommandChoice::kCreateFdSession:
881 memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
882 buf_offset += sizeof(session_id);
883
884 {
885 uint32_t string_length = static_cast<uint32_t>(file_path->size());
886 memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
887 buf_offset += sizeof(string_length);
888 }
889
890 DCHECK(file_path.has_value());
891
892 memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
893 buf_offset += file_path->size();
894
895 DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size();
896 DCHECK_EQ(buf_offset, *produced_bytes) << *this;
897
898 break;
899 case CommandChoice::kDestroySession:
900 case CommandChoice::kDumpSession:
901 memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
902 buf_offset += sizeof(session_id);
903 break;
904 case CommandChoice::kExit:
905 case CommandChoice::kDumpEverything:
906 // Only need to write out the choice.
907 break;
908 default:
909 LOG(FATAL) << "should have fallen out in the above switch"
910 << static_cast<uint32_t>(choice);
911 break;
912 }
913
914 DCHECK_EQ(buf_offset, space_requirement) << *this;
915 DCHECK_EQ(buf_offset, *produced_bytes) << *this;
916
917 return true;
918 }
919
920 class PrefetcherDaemon::Impl {
921 public:
StartPipesViaFork()922 std::optional<PrefetcherForkParameters> StartPipesViaFork() {
923 int pipefds[2];
924 if (pipe(&pipefds[0]) != 0) {
925 PLOG(FATAL) << "Failed to create read/write pipes";
926 }
927
928 if (WOULD_LOG(VERBOSE)) {
929 long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ));
930 if (pipe_size < 0) {
931 PLOG(ERROR) << "Failed to F_GETPIPE_SZ:";
932 }
933 LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size;
934 }
935
936 for (int i = 0; i < 2; ++i) {
937 // Default pipe size is usually 64KB.
938 // Increase to 1MB so that iorapd has to rarely run during prefetching.
939 if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) {
940 PLOG(FATAL) << "Failed to increase pipe size to max";
941 }
942 }
943
944 pipefd_read_ = pipefds[0];
945 pipefd_write_ = pipefds[1];
946
947 PrefetcherForkParameters params;
948 params.input_fd = pipefd_read_;
949 params.output_fd = pipefd_write_;
950 params.format_text = false;
951 params.use_sockets = false;
952
953 bool res = StartViaFork(params);
954 if (res) {
955 return params;
956 } else {
957 return std::nullopt;
958 }
959 }
960
StartSocketViaFork()961 std::optional<PrefetcherForkParameters> StartSocketViaFork() {
962 int socket_fds[2];
963 if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) {
964 PLOG(FATAL) << "Failed to create read/write socketpair";
965 }
966
967 pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader
968 pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer
969
970 PrefetcherForkParameters params;
971 params.input_fd = pipefd_read_;
972 params.output_fd = pipefd_write_;
973 params.format_text = false;
974 params.use_sockets = true;
975
976 bool res = StartViaFork(params);
977 if (res) {
978 return params;
979 } else {
980 return std::nullopt;
981 }
982 }
983
StartViaFork(PrefetcherForkParameters params)984 bool StartViaFork(PrefetcherForkParameters params) {
985 params_ = params;
986
987 forked_ = true;
988 child_ = fork();
989
990 if (child_ == -1) {
991 LOG(FATAL) << "Failed to fork PrefetcherDaemon";
992 } else if (child_ > 0) { // we are the caller of this function
993 LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_;
994
995 return true;
996 } else {
997 // we are the child that was forked.
998 std::stringstream argv; // for logging
999 std::vector<std::string> argv_vec;
1000
1001 {
1002 std::stringstream s;
1003 s << "--input-fd";
1004 argv_vec.push_back(s.str());
1005
1006 std::stringstream s2;
1007 s2 << params.input_fd;
1008 argv_vec.push_back(s2.str());
1009
1010 argv << " --input-fd" << " " << params.input_fd;
1011 }
1012
1013 {
1014 std::stringstream s;
1015 s << "--output-fd";
1016 argv_vec.push_back(s.str());
1017
1018 std::stringstream s2;
1019 s2 << params.output_fd;
1020 argv_vec.push_back(s2.str());
1021
1022 argv << " --output-fd" << " " << params.output_fd;
1023 }
1024
1025
1026 if (params.use_sockets) {
1027 std::stringstream s;
1028 s << "--use-sockets";
1029 argv_vec.push_back(s.str());
1030
1031 argv << " --use-sockets";
1032 }
1033
1034 if (WOULD_LOG(VERBOSE)) {
1035 std::stringstream s;
1036 s << "--verbose";
1037 argv_vec.push_back(s.str());
1038
1039 argv << " --verbose";
1040 }
1041
1042 std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec);
1043
1044 LOG(DEBUG) << "fork+exec: " << kCommandFileName << " "
1045 << argv.str();
1046 execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr);
1047 // This should never return.
1048 _exit(EXIT_FAILURE);
1049 }
1050
1051 DCHECK(false);
1052 return false;
1053 }
1054
1055 // TODO: Not very useful since this can never return 'true'
1056 // -> in the child we would've already execd which loses all this code.
IsDaemon()1057 bool IsDaemon() {
1058 // In the child the pid is always 0.
1059 return child_ > 0;
1060 }
1061
Main(PrefetcherForkParameters params)1062 bool Main(PrefetcherForkParameters params) {
1063 LOG(VERBOSE) << "PrefetcherDaemon::Main " << params;
1064
1065 CommandParser command_parser{params};
1066
1067 Command next_command{};
1068
1069 std::vector<Command> many_commands;
1070
1071 // Ensure alogd is pre-initialized before installing minijail.
1072 LOG(DEBUG) << "Installing minijail";
1073
1074 // Install seccomp filter using libminijail.
1075 if (kInstallMiniJail) {
1076 MiniJail();
1077 }
1078
1079 while (true) {
1080 bool eof = false;
1081
1082 if (params.use_sockets) {
1083 // use recvmsg(2). supports receiving FDs.
1084 many_commands = command_parser.ParseSocketCommands(/*out*/eof);
1085 } else {
1086 // use read(2). does not support receiving FDs.
1087 many_commands = command_parser.ParseCommands(/*out*/eof);
1088 }
1089
1090 if (eof) {
1091 LOG(WARNING) << "PrefetcherDaemon got EOF, terminating";
1092 return true;
1093 }
1094
1095 for (auto& command : many_commands) {
1096 if (LogVerboseIpc()) {
1097 LOG(VERBOSE) << "PrefetcherDaemon got command: " << command;
1098 }
1099
1100 if (command.choice == CommandChoice::kExit) {
1101 LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating";
1102 return true;
1103 }
1104
1105 if (!ReceiveCommand(command)) {
1106 // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command;
1107 }
1108
1109 // ReceiveCommand should dup to keep the FD. Avoid leaks.
1110 if (command.fd.has_value()) {
1111 close(*command.fd);
1112 }
1113 }
1114 }
1115
1116 LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating";
1117
1118 return true;
1119 // Terminate.
1120 }
1121
Impl(PrefetcherDaemon * daemon)1122 Impl(PrefetcherDaemon* daemon) {
1123 session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect);
1124 DCHECK(session_manager_ != nullptr);
1125 };
1126
~Impl()1127 ~Impl() {
1128 // Don't do anything if we never called 'StartViaFork'
1129 if (forked_) {
1130 if (!IsDaemon()) {
1131 int status;
1132 waitpid(child_, /*out*/&status, /*options*/0);
1133 } else {
1134 LOG(WARNING) << "execve should have avoided this path";
1135 // DCHECK(false) << "not possible because the execve would avoid this path";
1136 }
1137 }
1138 }
1139
SendCommand(const Command & command)1140 bool SendCommand(const Command& command) {
1141 // Only parent is the sender.
1142 DCHECK(forked_);
1143 //DCHECK(!IsDaemon());
1144
1145 char buf[1024];
1146 size_t stream_size;
1147 if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) {
1148 PLOG(ERROR) << "Failed to serialize command: " << command;
1149 return false;
1150 }
1151
1152 if (LogVerboseIpc()) {
1153 LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf
1154 << ", size=" << stream_size<< ")";
1155 }
1156
1157 if (params_.use_sockets) {
1158 /* iov contains the normal message (Command) */
1159 struct iovec iov;
1160 memset(&iov, 0, sizeof(iov));
1161 iov.iov_base = &buf[0];
1162 iov.iov_len = stream_size;
1163
1164 struct msghdr msg;
1165 memset(&msg, 0, sizeof(msg));
1166
1167 /* point to iov to transmit */
1168 msg.msg_iov = &iov;
1169 msg.msg_iovlen = 1;
1170
1171 /* no dest address; socket is connected */
1172 msg.msg_name = nullptr;
1173 msg.msg_namelen = 0;
1174
1175 // append a CMSG with SCM_RIGHTS if we have an FD.
1176 if (command.fd.has_value()) {
1177 union {
1178 struct cmsghdr cmh;
1179 char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */
1180 } control_un;
1181 memset(&control_un, 0, sizeof(control_un));
1182
1183 msg.msg_control = &control_un.control[0];
1184 msg.msg_controllen = sizeof(control_un.control);
1185
1186 struct cmsghdr *hp;
1187 hp = CMSG_FIRSTHDR(&msg);
1188 hp->cmsg_len = CMSG_LEN(sizeof(int));
1189 hp->cmsg_level = SOL_SOCKET;
1190 hp->cmsg_type = SCM_RIGHTS;
1191 *((int *) CMSG_DATA(hp)) = *(command.fd);
1192
1193 DCHECK(command.RequiresFd()) << command;
1194
1195 if (LogVerboseIpc()) {
1196 LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd);
1197 }
1198 }
1199
1200 // TODO: add CMSG for the FD passage.
1201
1202 if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) {
1203 PLOG(ERROR) << "Failed to sendmsg command: " << command;
1204 return false;
1205 }
1206 } else {
1207 if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) {
1208 PLOG(ERROR) << "Failed to write command: " << command;
1209 return false;
1210 }
1211 }
1212
1213 if (LogVerboseIpc()) {
1214 LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf
1215 << ", size=" << stream_size<< ")";
1216 }
1217
1218 // TODO: also read the reply?
1219 return true;
1220 }
1221
ReceiveCommand(const Command & command)1222 bool ReceiveCommand(const Command& command) {
1223 // Only child is the command receiver.
1224 // DCHECK(IsDaemon());
1225
1226 switch (command.choice) {
1227 case CommandChoice::kRegisterFilePath: {
1228 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1229
1230 if (!session) {
1231 LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1232 return false;
1233 }
1234
1235 CHECK(command.file_path.has_value()) << command;
1236 return session->RegisterFilePath(command.id, *command.file_path);
1237 }
1238 case CommandChoice::kUnregisterFilePath: {
1239 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1240
1241 if (!session) {
1242 LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1243 return false;
1244 }
1245
1246 return session->UnregisterFilePath(command.id);
1247 }
1248 case CommandChoice::kReadAhead: {
1249 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1250
1251 if (!session) {
1252 LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1253 return false;
1254 }
1255
1256 return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset);
1257 }
1258 // TODO: unreadahead
1259 case CommandChoice::kExit: {
1260 LOG(WARNING) << "kExit should be handled earlier.";
1261 return true;
1262 }
1263 case CommandChoice::kCreateSession: {
1264 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1265 if (session != nullptr) {
1266 LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
1267 return false;
1268 }
1269 CHECK(command.file_path.has_value()) << command;
1270 if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path)
1271 == nullptr) {
1272 LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command;
1273 return false;
1274 }
1275 return true;
1276 }
1277 case CommandChoice::kDestroySession: {
1278 if (!session_manager_->DestroySession(command.session_id)) {
1279 LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command;
1280 return false;
1281 }
1282 return true;
1283 }
1284 case CommandChoice::kDumpSession: {
1285 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1286
1287 if (!session) {
1288 LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1289 return false;
1290 }
1291
1292 // TODO: Consider doing dumpsys support somehow?
1293 session->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
1294 return true;
1295 }
1296 case CommandChoice::kDumpEverything: {
1297 session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
1298 break;
1299 }
1300 case CommandChoice::kCreateFdSession: {
1301 std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1302 if (session != nullptr) {
1303 LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
1304 return false;
1305 }
1306 CHECK(command.file_path.has_value()) << command;
1307 CHECK(command.fd.has_value()) << command;
1308
1309 LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd);
1310
1311 // TODO: Maybe use CreateFdSession instead?
1312 session =
1313 session_manager_->CreateSession(command.session_id,
1314 /*description*/*command.file_path,
1315 command.fd.value());
1316 if (session == nullptr) {
1317 LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command;
1318 return false;
1319 }
1320
1321 return session->ProcessFd(*command.fd);
1322 }
1323 }
1324
1325 return true;
1326 }
1327
1328 pid_t child_;
1329 bool forked_;
1330 int pipefd_read_;
1331 int pipefd_write_;
1332 PrefetcherForkParameters params_;
1333 // do not ever use an indirect session manager here, as it would cause a lifetime cycle.
1334 std::unique_ptr<SessionManager> session_manager_; // direct only.
1335 };
1336
PrefetcherDaemon()1337 PrefetcherDaemon::PrefetcherDaemon()
1338 : impl_{new Impl{this}} {
1339 LOG(VERBOSE) << "PrefetcherDaemon() constructor";
1340 }
1341
StartViaFork(PrefetcherForkParameters params)1342 bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) {
1343 return impl_->StartViaFork(std::move(params));
1344 }
1345
1346
StartPipesViaFork()1347 std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() {
1348 return impl_->StartPipesViaFork();
1349 }
1350
StartSocketViaFork()1351 std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() {
1352 return impl_->StartSocketViaFork();
1353 }
1354
Main(PrefetcherForkParameters params)1355 bool PrefetcherDaemon::Main(PrefetcherForkParameters params) {
1356 return impl_->Main(params);
1357 }
1358
SendCommand(const Command & command)1359 bool PrefetcherDaemon::SendCommand(const Command& command) {
1360 return impl_->SendCommand(command);
1361 }
1362
~PrefetcherDaemon()1363 PrefetcherDaemon::~PrefetcherDaemon() {
1364 // required for unique_ptr for incomplete types.
1365 }
1366
1367 } // namespace iorap::prefetcher
1368