1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "prefetcher/minijail.h"
16 #include "common/cmd_utils.h"
17 #include "prefetcher/prefetcher_daemon.h"
18 #include "prefetcher/session_manager.h"
19 #include "prefetcher/session.h"
20 
21 #include <android-base/logging.h>
22 #include <android-base/properties.h>
23 
24 #include <deque>
25 #include <iomanip>
26 #include <string>
27 #include <sstream>
28 #include <vector>
29 
30 #include <fcntl.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <sys/un.h>
36 #include <unistd.h>
37 
38 namespace iorap::prefetcher {
39 
40 // Gate super-spammy IPC logging behind a property.
41 // This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower.
LogVerboseIpc()42 static bool LogVerboseIpc() {
43   static bool initialized = false;
44   static bool verbose_ipc;
45 
46   if (initialized == false) {
47     initialized = true;
48 
49     verbose_ipc =
50         ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false);
51   }
52 
53   return verbose_ipc;
54 }
55 
56 static const bool kInstallMiniJail =
57     ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true);
58 
59 static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd";
60 
61 static constexpr size_t kPipeBufferSize = 1024 * 1024;  // matches /proc/sys/fs/pipe-max-size
62 
63 using ArgString = const char*;
64 
operator <<(std::ostream & os,ReadAheadKind ps)65 std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) {
66   switch (ps) {
67     case ReadAheadKind::kFadvise:
68       os << "fadvise";
69       break;
70     case ReadAheadKind::kMmapLocked:
71       os << "mmap";
72       break;
73     case ReadAheadKind::kMlock:
74       os << "mlock";
75       break;
76     default:
77       os << "<invalid>";
78   }
79   return os;
80 }
81 
operator <<(std::ostream & os,CommandChoice choice)82 std::ostream& operator<<(std::ostream& os, CommandChoice choice) {
83   switch (choice) {
84     case CommandChoice::kRegisterFilePath:
85       os << "kRegisterFilePath";
86       break;
87     case CommandChoice::kUnregisterFilePath:
88       os << "kUnregisterFilePath";
89       break;
90     case CommandChoice::kReadAhead:
91       os << "kReadAhead";
92       break;
93     case CommandChoice::kExit:
94       os << "kExit";
95       break;
96     case CommandChoice::kCreateSession:
97       os << "kCreateSession";
98       break;
99     case CommandChoice::kDestroySession:
100       os << "kDestroySession";
101       break;
102     case CommandChoice::kDumpSession:
103       os << "kDumpSession";
104       break;
105     case CommandChoice::kDumpEverything:
106       os << "kDumpEverything";
107       break;
108     case CommandChoice::kCreateFdSession:
109       os << "kCreateFdSession";
110       break;
111     default:
112       CHECK(false) << "forgot to handle this choice";
113       break;
114   }
115   return os;
116 }
117 
operator <<(std::ostream & os,const Command & command)118 std::ostream& operator<<(std::ostream& os, const Command& command) {
119   os << "Command{";
120   os << "choice=" << command.choice << ",";
121 
122   bool has_session_id = true;
123   bool has_id = true;
124   switch (command.choice) {
125     case CommandChoice::kDumpEverything:
126     case CommandChoice::kExit:
127       has_session_id = false;
128       FALLTHROUGH_INTENDED;
129     case CommandChoice::kCreateFdSession:
130     case CommandChoice::kCreateSession:
131     case CommandChoice::kDestroySession:
132     case CommandChoice::kDumpSession:
133       has_id = false;
134       break;
135     default:
136       break;
137   }
138 
139   if (has_session_id) {
140     os << "sid=" << command.session_id << ",";
141   }
142 
143   if (has_id) {
144     os << "id=" << command.id << ",";
145   }
146 
147   switch (command.choice) {
148     case CommandChoice::kRegisterFilePath:
149       os << "file_path=";
150 
151       if (command.file_path) {
152         os << *(command.file_path);
153       } else {
154         os << "(nullopt)";
155       }
156       break;
157     case CommandChoice::kUnregisterFilePath:
158       break;
159     case CommandChoice::kReadAhead:
160       os << "read_ahead_kind=" << command.read_ahead_kind << ",";
161       os << "length=" << command.length << ",";
162       os << "offset=" << command.offset << ",";
163       break;
164     case CommandChoice::kExit:
165       break;
166     case CommandChoice::kCreateFdSession:
167       os << "fd=";
168       if (command.fd.has_value()) {
169         os << command.fd.value();
170       } else {
171         os << "(nullopt)";
172       }
173       os << ",";
174     FALLTHROUGH_INTENDED;
175     case CommandChoice::kCreateSession:
176       os << "description=";
177       if (command.file_path) {
178         os << "'" << *(command.file_path) << "'";
179       } else {
180         os << "(nullopt)";
181       }
182       break;
183     case CommandChoice::kDestroySession:
184       break;
185     case CommandChoice::kDumpSession:
186       break;
187     case CommandChoice::kDumpEverything:
188       break;
189     default:
190       CHECK(false) << "forgot to handle this choice";
191       break;
192   }
193 
194   os << "}";
195 
196   return os;
197 }
198 
199 template <typename T>
200 struct ParseResult {
201   T value;
202   char* next_token;
203   size_t stream_size;
204 
ParseResultiorap::prefetcher::ParseResult205   ParseResult() : value{}, next_token{nullptr}, stream_size{} {
206   }
207 
208   constexpr operator bool() const {
209     return next_token != nullptr;
210   }
211 };
212 
213 // Very spammy: Keep it off by default. Set to true if changing this code.
214 static constexpr bool kDebugParsingRead = false;
215 
216 #define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead "
217 
218 
219 
220 // Parse a strong type T from a buffer stream.
221 // If there's insufficient space left to parse the value, an empty ParseResult is returned.
222 template <typename T>
ParsingRead(char * stream,size_t stream_size)223 ParseResult<T> ParsingRead(char* stream, size_t stream_size) {
224   if (stream == nullptr) {
225     DEBUG_PREAD << "stream was null";
226     return {};
227   }
228 
229   if constexpr (std::is_same_v<T, std::string>) {
230     ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size);
231 
232     if (!length) {
233       DEBUG_PREAD << "could not find length";
234       // Not enough bytes left?
235       return {};
236     }
237 
238     ParseResult<std::string> string_result;
239     string_result.value.reserve(length);
240 
241     stream = length.next_token;
242     stream_size = length.stream_size;
243 
244     for (size_t i = 0; i < length.value; ++i) {
245       ParseResult<char> char_result = ParsingRead<char>(stream, stream_size);
246 
247       stream = char_result.next_token;
248       stream_size = char_result.stream_size;
249 
250       if (!char_result) {
251         DEBUG_PREAD << "too few chars in stream, expected length: " << length.value;
252         // Not enough bytes left?
253         return {};
254       }
255 
256       string_result.value += char_result.value;
257 
258       DEBUG_PREAD << "string preliminary is : " << string_result.value;
259     }
260 
261     DEBUG_PREAD << "parsed string to: " << string_result.value;
262     string_result.next_token = stream;
263     return string_result;
264   } else {
265     if (sizeof(T) > stream_size) {
266       return {};
267     }
268 
269     ParseResult<T> result;
270     result.next_token = stream + sizeof(T);
271     result.stream_size = stream_size - sizeof(T);
272 
273     memcpy(&result.value, stream, sizeof(T));
274 
275     return result;
276   }
277 }
278 
279 // Convenience overload to chain multiple ParsingRead together.
280 template <typename T, typename U>
ParsingRead(ParseResult<U> result)281 ParseResult<T> ParsingRead(ParseResult<U> result) {
282   return ParsingRead<T>(result.next_token, result.stream_size);
283 }
284 
285 class CommandParser {
286  public:
CommandParser(PrefetcherForkParameters params)287   CommandParser(PrefetcherForkParameters params) {
288     params_ = params;
289   }
290 
ParseSocketCommands(bool & eof)291   std::vector<Command> ParseSocketCommands(bool& eof) {
292     eof = false;
293 
294     std::vector<Command> commands_vec;
295 
296     std::vector<char> buf_vector;
297     buf_vector.resize(1024*1024);  // 1MB.
298     char* buf = &buf_vector[0];
299 
300     // Binary only parsing. The higher level code can parse text
301     // with ifstream if it really wants to.
302     char* stream = &buf[0];
303     size_t stream_size = buf_vector.size();
304 
305     while (true) {
306       if (stream_size == 0) {
307         // TODO: reply with an overflow command.
308         LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
309         stream = &buf[0];
310         stream_size = buf_vector.size();
311         memset(&buf[0], /*c*/0, buf_vector.size());
312       }
313 
314       if (LogVerboseIpc()) {
315         LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")";
316       }
317 
318       ssize_t count;
319       struct msghdr hdr;
320       memset(&hdr, 0, sizeof(hdr));
321 
322       {
323         union {
324           struct cmsghdr cmh;
325           char   control[CMSG_SPACE(sizeof(int))];
326         } control_un;
327         memset(&control_un, 0, sizeof(control_un));
328 
329         /* Set 'control_un' to describe ancillary data that we want to receive */
330         control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int));  /* fd is sizeof(int) */
331         control_un.cmh.cmsg_level = SOL_SOCKET;
332         control_un.cmh.cmsg_type = SCM_CREDENTIALS;
333 
334         // the regular message data will be read into stream
335         struct iovec iov;
336         memset(&iov, 0, sizeof(iov));
337         iov.iov_base = stream;
338         iov.iov_len = stream_size;
339 
340         /* Set hdr fields to describe 'control_un' */
341         hdr.msg_control = control_un.control;
342         hdr.msg_controllen = sizeof(control_un.control);
343         hdr.msg_iov = &iov;
344         hdr.msg_iovlen = 1;
345         hdr.msg_name = nullptr; /* no peer address */
346         hdr.msg_namelen = 0;
347 
348         count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0));
349       }
350 
351       if (LogVerboseIpc()) {
352         LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size;
353       }
354 
355       if (count < 0) {
356         PLOG(ERROR) << "failed to recvmsg from input fd";
357         break;
358         // TODO: let the daemon be restarted by higher level code?
359       } else if (count == 0) {
360         LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
361         eof = true;
362         break;
363         // TODO: let the daemon be restarted by higher level code?
364       }
365 
366       {
367         /* Extract fd from ancillary data if present */
368         struct cmsghdr* hp;
369         hp = CMSG_FIRSTHDR(&hdr);
370         if (hp                                              &&
371             // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing?
372             // (hp->cmsg_len == CMSG_LEN(sizeof(int)))          &&
373             (hp->cmsg_level == SOL_SOCKET)                   &&
374             (hp->cmsg_type == SCM_RIGHTS)) {
375 
376           int passed_fd = *(int*) CMSG_DATA(hp);
377           if (LogVerboseIpc()) {
378             LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd;
379           }
380 
381           // tack the FD into our dequeue.
382           // we assume the FDs are sent in-order same as the regular iov are sent in-order.
383           longbuf_fds_.insert(longbuf_fds_.end(), passed_fd);
384         } else if (hp != nullptr) {
385           if (LogVerboseIpc()) {
386             LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS,"
387                          << "cmsg_len=" << hp->cmsg_len << ","
388                          << "cmsg_level=" << hp->cmsg_level << ","
389                          << "cmsg_type=" << hp->cmsg_type;
390           }
391         }
392       }
393 
394       longbuf_.insert(longbuf_.end(), stream, stream + count);
395       if (LogVerboseIpc()) {
396         LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
397       }
398 
399       // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]*
400       {
401         if (longbuf_.size() == 0) {
402           break;
403         }
404 
405         std::vector<char> v(longbuf_.begin(),
406                             longbuf_.end());
407 
408         std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()};
409 
410         if (LogVerboseIpc()) {
411           LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
412           if (WOULD_LOG(VERBOSE)) {
413             std::stringstream dump;
414             dump << std::hex << std::setfill('0');
415             for (size_t i = 0; i < v.size(); ++i) {
416               dump << std::setw(2) << static_cast<unsigned>(v[i]);
417             }
418 
419             LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
420           }
421           LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size();
422           if (WOULD_LOG(VERBOSE)) {
423             std::stringstream dump;
424             for (size_t i = 0; i < v_fds.size(); ++i) {
425               dump << v_fds[i] << ", ";
426             }
427 
428             LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str();
429           }
430 
431         }
432 
433         size_t v_fds_off = 0;
434         size_t consumed_fds_total = 0;
435 
436         size_t v_off = 0;
437         size_t consumed_bytes = std::numeric_limits<size_t>::max();
438         size_t consumed_total = 0;
439 
440         while (true) {
441           std::optional<Command> maybe_command;
442           maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
443           consumed_total += consumed_bytes;
444           // Normal every time we get to the end of a buffer.
445           if (!maybe_command) {
446               if (LogVerboseIpc()) {
447                 LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
448               }
449               break;
450           }
451 
452           if (maybe_command->RequiresFd()) {
453             if (v_fds_off < v_fds.size()) {
454               maybe_command->fd = v_fds[v_fds_off++];
455               consumed_fds_total++;
456               if (LogVerboseIpc()) {
457                 LOG(VERBOSE) << "Append the FD to " << *maybe_command;
458               }
459             } else {
460               LOG(WARNING) << "Failed to acquire FD for " << *maybe_command;
461             }
462           }
463 
464           // in the next pass ignore what we already consumed.
465           v_off += consumed_bytes;
466 
467           // true as long we don't hit the 'break' above.
468           DCHECK_EQ(v_off, consumed_total);
469           if (LogVerboseIpc()) {
470             LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
471                          << "," << *maybe_command;
472 
473             // Pretty-print a single command for debugging/testing.
474             LOG(VERBOSE) << *maybe_command;
475           }
476 
477           // add to the commands we parsed.
478           commands_vec.push_back(*maybe_command);
479         }
480 
481         // erase however many were consumed
482         longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
483 
484         // erase however many FDs were consumed.
485         longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total);
486       }
487       break;
488     }
489 
490     return commands_vec;
491   }
492 
ParseCommands(bool & eof)493   std::vector<Command> ParseCommands(bool& eof) {
494     eof = false;
495 
496     std::vector<Command> commands_vec;
497 
498     std::vector<char> buf_vector;
499     buf_vector.resize(kPipeBufferSize);
500     char* buf = &buf_vector[0];
501 
502     // Binary only parsing. The higher level code can parse text
503     // with ifstream if it really wants to.
504     char* stream = &buf[0];
505     size_t stream_size = buf_vector.size();
506 
507     while (true) {
508       if (stream_size == 0) {
509         // TODO: reply with an overflow command.
510         LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands.";
511         stream = &buf[0];
512         stream_size = buf_vector.size();
513         memset(&buf[0], /*c*/0, buf_vector.size());
514       }
515 
516       if (LogVerboseIpc()) {
517         LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")";
518       }
519       ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size));
520       if (LogVerboseIpc()) {
521         LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size;
522       }
523 
524       if (count < 0) {
525         PLOG(ERROR) << "failed to read from input fd";
526         break;
527         // TODO: let the daemon be restarted by higher level code?
528       } else if (count == 0) {
529         LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating";
530         eof = true;
531         break;
532         // TODO: let the daemon be restarted by higher level code?
533       }
534 
535       longbuf_.insert(longbuf_.end(), stream, stream + count);
536       if (LogVerboseIpc()) {
537         LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size();
538       }
539 
540       std::optional<Command> maybe_command;
541       {
542         if (longbuf_.size() == 0) {
543           break;
544         }
545 
546         std::vector<char> v(longbuf_.begin(),
547                             longbuf_.end());
548 
549         if (LogVerboseIpc()) {
550           LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size();
551           if (WOULD_LOG(VERBOSE)) {
552             std::stringstream dump;
553             dump << std::hex << std::setfill('0');
554             for (size_t i = 0; i < v.size(); ++i) {
555               dump << std::setw(2) << static_cast<unsigned>(v[i]);
556             }
557 
558             LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str();
559           }
560         }
561 
562         size_t v_off = 0;
563         size_t consumed_bytes = std::numeric_limits<size_t>::max();
564         size_t consumed_total = 0;
565 
566         while (true) {
567           maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes);
568           consumed_total += consumed_bytes;
569           // Normal every time we get to the end of a buffer.
570           if (!maybe_command) {
571               if (LogVerboseIpc()) {
572                 LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size();
573               }
574               break;
575           }
576 
577           // in the next pass ignore what we already consumed.
578           v_off += consumed_bytes;
579 
580           // true as long we don't hit the 'break' above.
581           DCHECK_EQ(v_off, consumed_total);
582           if (LogVerboseIpc()) {
583             LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size()
584                          << "," << *maybe_command;
585 
586             // Pretty-print a single command for debugging/testing.
587             LOG(VERBOSE) << *maybe_command;
588           }
589 
590           // add to the commands we parsed.
591           commands_vec.push_back(*maybe_command);
592         }
593 
594         // erase however many were consumed
595         longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total);
596       }
597       break;
598     }
599 
600     return commands_vec;
601   }
602 
603  private:
IsTextMode() const604   bool IsTextMode() const {
605     return params_.format_text;
606   }
607 
608   PrefetcherForkParameters params_;
609 
610   // A buffer long enough to contain a lot of buffers.
611   // This handles reads that only contain a partial command.
612   std::deque<char> longbuf_;
613 
614   // File descriptor buffers.
615   std::deque<int> longbuf_fds_;
616 };
617 
618 static constexpr bool kDebugCommandRead = true;
619 
620 #define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read "
621 
Read(char * buf,size_t buf_size,size_t * consumed_bytes)622 std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) {
623   *consumed_bytes = 0;
624   if (buf == nullptr) {
625     return std::nullopt;
626   }
627 
628   Command cmd{};  // zero-initialize any unused fields
629   ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size);
630   cmd.choice = parsed_choice.value;
631 
632   if (!parsed_choice) {
633     DEBUG_READ << "no choice";
634     return std::nullopt;
635   }
636 
637   switch (parsed_choice.value) {
638     case CommandChoice::kRegisterFilePath: {
639       ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
640       if (!parsed_session_id) {
641         DEBUG_READ << "no parsed session id";
642         return std::nullopt;
643       }
644 
645       ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
646       if (!parsed_id) {
647         DEBUG_READ << "no parsed id";
648         return std::nullopt;
649       }
650 
651       ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id);
652 
653       if (!parsed_file_path) {
654         DEBUG_READ << "no file path";
655         return std::nullopt;
656       }
657       *consumed_bytes = parsed_file_path.next_token - buf;
658 
659       cmd.session_id = parsed_session_id.value;
660       cmd.id = parsed_id.value;
661       cmd.file_path = parsed_file_path.value;
662 
663       break;
664     }
665     case CommandChoice::kUnregisterFilePath: {
666       ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
667       if (!parsed_session_id) {
668         DEBUG_READ << "no parsed session id";
669         return std::nullopt;
670       }
671 
672       ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
673       if (!parsed_id) {
674         DEBUG_READ << "no parsed id";
675         return std::nullopt;
676       }
677       *consumed_bytes = parsed_id.next_token - buf;
678 
679       cmd.session_id = parsed_session_id.value;
680       cmd.id = parsed_id.value;
681 
682       break;
683     }
684     case CommandChoice::kReadAhead: {
685       ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
686       if (!parsed_session_id) {
687         DEBUG_READ << "no parsed session id";
688         return std::nullopt;
689       }
690 
691       ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id);
692       if (!parsed_id) {
693         DEBUG_READ << "no parsed id";
694         return std::nullopt;
695       }
696 
697       ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id);
698       if (!parsed_kind) {
699         DEBUG_READ << "no parsed kind";
700         return std::nullopt;
701       }
702       ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind);
703       if (!parsed_length) {
704         DEBUG_READ << "no parsed length";
705         return std::nullopt;
706       }
707       ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length);
708       if (!parsed_offset) {
709         DEBUG_READ << "no parsed offset";
710         return std::nullopt;
711       }
712       *consumed_bytes = parsed_offset.next_token - buf;
713 
714       cmd.session_id = parsed_session_id.value;
715       cmd.id = parsed_id.value;
716       cmd.read_ahead_kind = parsed_kind.value;
717       cmd.length = parsed_length.value;
718       cmd.offset = parsed_offset.value;
719 
720       break;
721     }
722     case CommandChoice::kCreateSession:
723     case CommandChoice::kCreateFdSession: {
724       ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
725       if (!parsed_session_id) {
726         DEBUG_READ << "no parsed session id";
727         return std::nullopt;
728       }
729 
730       ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id);
731 
732       if (!parsed_description) {
733         DEBUG_READ << "no description";
734         return std::nullopt;
735       }
736       *consumed_bytes = parsed_description.next_token - buf;
737 
738       cmd.session_id = parsed_session_id.value;
739       cmd.file_path = parsed_description.value;
740 
741       break;
742     }
743     case CommandChoice::kDestroySession:
744     case CommandChoice::kDumpSession: {
745       ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice);
746       if (!parsed_session_id) {
747         DEBUG_READ << "no parsed session id";
748         return std::nullopt;
749       }
750 
751       *consumed_bytes = parsed_session_id.next_token - buf;
752 
753       cmd.session_id = parsed_session_id.value;
754 
755       break;
756     }
757     case CommandChoice::kExit:
758     case CommandChoice::kDumpEverything:
759       *consumed_bytes = parsed_choice.next_token - buf;
760       // Only need to parse the choice.
761       break;
762     default:
763       LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value);
764       break;
765   }
766 
767   return cmd;
768 }
769 
Write(char * buf,size_t buf_size,size_t * produced_bytes) const770 bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const {
771   *produced_bytes = 0;
772   if (buf == nullptr) {
773     LOG(WARNING) << "null buf, is this expected?";
774     return false;
775   }
776 
777   bool has_enough_space = false;
778   size_t space_requirement = std::numeric_limits<size_t>::max();
779 
780   space_requirement = sizeof(choice);
781 
782   switch (choice) {
783     case CommandChoice::kRegisterFilePath:
784       space_requirement += sizeof(session_id);
785       space_requirement += sizeof(id);
786       space_requirement += sizeof(uint32_t);    // string length
787 
788       if (!file_path) {
789         LOG(WARNING) << "Missing file path for kRegisterFilePath";
790         return false;
791       }
792 
793       space_requirement += file_path->size(); // string contents
794       break;
795     case CommandChoice::kUnregisterFilePath:
796       space_requirement += sizeof(session_id);
797       space_requirement += sizeof(id);
798       break;
799     case CommandChoice::kReadAhead:
800       space_requirement += sizeof(session_id);
801       space_requirement += sizeof(id);
802       space_requirement += sizeof(read_ahead_kind);
803       space_requirement += sizeof(length);
804       space_requirement += sizeof(offset);
805       break;
806     case CommandChoice::kCreateSession:
807     case CommandChoice::kCreateFdSession:
808       space_requirement += sizeof(session_id);
809       space_requirement += sizeof(uint32_t);    // string length
810 
811       if (!file_path) {
812         LOG(WARNING) << "Missing file path for kCreateSession";
813         return false;
814       }
815 
816       space_requirement += file_path->size(); // string contents
817       break;
818     case CommandChoice::kDestroySession:
819     case CommandChoice::kDumpSession:
820       space_requirement += sizeof(session_id);
821       break;
822     case CommandChoice::kExit:
823     case CommandChoice::kDumpEverything:
824       // Only need space for the choice.
825       break;
826     default:
827       LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice);
828       break;
829   }
830 
831   if (buf_size < space_requirement) {
832     return false;
833   }
834 
835   *produced_bytes = space_requirement;
836 
837   // Always write out the choice.
838   size_t buf_offset = 0;
839 
840   memcpy(&buf[buf_offset], &choice, sizeof(choice));
841   buf_offset += sizeof(choice);
842 
843   switch (choice) {
844     case CommandChoice::kRegisterFilePath:
845       memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
846       buf_offset += sizeof(session_id);
847       memcpy(&buf[buf_offset], &id, sizeof(id));
848       buf_offset += sizeof(id);
849 
850       {
851         uint32_t string_length = static_cast<uint32_t>(file_path->size());
852         memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
853         buf_offset += sizeof(string_length);
854       }
855 
856       DCHECK(file_path.has_value());
857 
858       memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
859       buf_offset += file_path->size();
860       break;
861     case CommandChoice::kUnregisterFilePath:
862       memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
863       buf_offset += sizeof(session_id);
864       memcpy(&buf[buf_offset], &id, sizeof(id));
865       buf_offset += sizeof(id);
866       break;
867     case CommandChoice::kReadAhead:
868       memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
869       buf_offset += sizeof(session_id);
870       memcpy(&buf[buf_offset], &id, sizeof(id));
871       buf_offset += sizeof(id);
872       memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind));
873       buf_offset += sizeof(read_ahead_kind);
874       memcpy(&buf[buf_offset], &length, sizeof(length));
875       buf_offset += sizeof(length);
876       memcpy(&buf[buf_offset], &offset, sizeof(offset));
877       buf_offset += sizeof(offset);
878       break;
879     case CommandChoice::kCreateSession:
880     case CommandChoice::kCreateFdSession:
881       memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
882       buf_offset += sizeof(session_id);
883 
884       {
885         uint32_t string_length = static_cast<uint32_t>(file_path->size());
886         memcpy(&buf[buf_offset], &string_length, sizeof(string_length));
887         buf_offset += sizeof(string_length);
888       }
889 
890       DCHECK(file_path.has_value());
891 
892       memcpy(&buf[buf_offset], file_path->c_str(), file_path->size());
893       buf_offset += file_path->size();
894 
895       DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size();
896       DCHECK_EQ(buf_offset, *produced_bytes) << *this;
897 
898       break;
899     case CommandChoice::kDestroySession:
900     case CommandChoice::kDumpSession:
901       memcpy(&buf[buf_offset], &session_id, sizeof(session_id));
902       buf_offset += sizeof(session_id);
903       break;
904     case CommandChoice::kExit:
905     case CommandChoice::kDumpEverything:
906       // Only need to write out the choice.
907       break;
908     default:
909       LOG(FATAL) << "should have fallen out in the above switch"
910                  << static_cast<uint32_t>(choice);
911       break;
912   }
913 
914   DCHECK_EQ(buf_offset, space_requirement) << *this;
915   DCHECK_EQ(buf_offset, *produced_bytes) << *this;
916 
917   return true;
918 }
919 
920 class PrefetcherDaemon::Impl {
921  public:
StartPipesViaFork()922   std::optional<PrefetcherForkParameters> StartPipesViaFork() {
923     int pipefds[2];
924     if (pipe(&pipefds[0]) != 0) {
925       PLOG(FATAL) << "Failed to create read/write pipes";
926     }
927 
928     if (WOULD_LOG(VERBOSE)) {
929       long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ));
930       if (pipe_size < 0) {
931         PLOG(ERROR) << "Failed to F_GETPIPE_SZ:";
932       }
933       LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size;
934     }
935 
936     for (int i = 0; i < 2; ++i) {
937       // Default pipe size is usually 64KB.
938       // Increase to 1MB so that iorapd has to rarely run during prefetching.
939       if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) {
940         PLOG(FATAL) << "Failed to increase pipe size to max";
941       }
942     }
943 
944     pipefd_read_ = pipefds[0];
945     pipefd_write_ = pipefds[1];
946 
947     PrefetcherForkParameters params;
948     params.input_fd = pipefd_read_;
949     params.output_fd = pipefd_write_;
950     params.format_text = false;
951     params.use_sockets = false;
952 
953     bool res = StartViaFork(params);
954     if (res) {
955       return params;
956     } else {
957       return std::nullopt;
958     }
959   }
960 
StartSocketViaFork()961 std::optional<PrefetcherForkParameters> StartSocketViaFork() {
962     int socket_fds[2];
963     if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) {
964       PLOG(FATAL) << "Failed to create read/write socketpair";
965     }
966 
967     pipefd_read_ = socket_fds[0];  // iorapd writer, iorap.prefetcherd reader
968     pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer
969 
970     PrefetcherForkParameters params;
971     params.input_fd = pipefd_read_;
972     params.output_fd = pipefd_write_;
973     params.format_text = false;
974     params.use_sockets = true;
975 
976     bool res = StartViaFork(params);
977     if (res) {
978       return params;
979     } else {
980       return std::nullopt;
981     }
982   }
983 
StartViaFork(PrefetcherForkParameters params)984   bool StartViaFork(PrefetcherForkParameters params) {
985     params_ = params;
986 
987     forked_ = true;
988     child_ = fork();
989 
990     if (child_ == -1) {
991       LOG(FATAL) << "Failed to fork PrefetcherDaemon";
992     } else if (child_ > 0) {  // we are the caller of this function
993       LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_;
994 
995       return true;
996     } else {
997       // we are the child that was forked.
998       std::stringstream argv;  // for logging
999       std::vector<std::string> argv_vec;
1000 
1001       {
1002         std::stringstream s;
1003         s << "--input-fd";
1004         argv_vec.push_back(s.str());
1005 
1006         std::stringstream s2;
1007         s2 << params.input_fd;
1008         argv_vec.push_back(s2.str());
1009 
1010         argv << " --input-fd" << " " << params.input_fd;
1011       }
1012 
1013       {
1014         std::stringstream s;
1015         s << "--output-fd";
1016         argv_vec.push_back(s.str());
1017 
1018         std::stringstream s2;
1019         s2 << params.output_fd;
1020         argv_vec.push_back(s2.str());
1021 
1022         argv << " --output-fd" << " " << params.output_fd;
1023       }
1024 
1025 
1026       if (params.use_sockets) {
1027         std::stringstream s;
1028         s << "--use-sockets";
1029         argv_vec.push_back(s.str());
1030 
1031         argv << " --use-sockets";
1032       }
1033 
1034       if (WOULD_LOG(VERBOSE)) {
1035         std::stringstream s;
1036         s << "--verbose";
1037         argv_vec.push_back(s.str());
1038 
1039         argv << " --verbose";
1040       }
1041 
1042       std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec);
1043 
1044       LOG(DEBUG) << "fork+exec: " << kCommandFileName << " "
1045                  << argv.str();
1046       execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr);
1047       // This should never return.
1048       _exit(EXIT_FAILURE);
1049     }
1050 
1051     DCHECK(false);
1052     return false;
1053   }
1054 
1055   // TODO: Not very useful since this can never return 'true'
1056   // -> in the child we would've already execd which loses all this code.
IsDaemon()1057   bool IsDaemon() {
1058     // In the child the pid is always 0.
1059     return child_ > 0;
1060   }
1061 
Main(PrefetcherForkParameters params)1062   bool Main(PrefetcherForkParameters params) {
1063     LOG(VERBOSE) << "PrefetcherDaemon::Main " << params;
1064 
1065     CommandParser command_parser{params};
1066 
1067     Command next_command{};
1068 
1069     std::vector<Command> many_commands;
1070 
1071     // Ensure alogd is pre-initialized before installing minijail.
1072     LOG(DEBUG) << "Installing minijail";
1073 
1074     // Install seccomp filter using libminijail.
1075     if (kInstallMiniJail) {
1076       MiniJail();
1077     }
1078 
1079     while (true) {
1080       bool eof = false;
1081 
1082       if (params.use_sockets) {
1083         // use recvmsg(2). supports receiving FDs.
1084         many_commands = command_parser.ParseSocketCommands(/*out*/eof);
1085       } else {
1086         // use read(2). does not support receiving FDs.
1087         many_commands = command_parser.ParseCommands(/*out*/eof);
1088       }
1089 
1090       if (eof) {
1091         LOG(WARNING) << "PrefetcherDaemon got EOF, terminating";
1092         return true;
1093       }
1094 
1095       for (auto& command : many_commands) {
1096         if (LogVerboseIpc()) {
1097           LOG(VERBOSE) << "PrefetcherDaemon got command: " << command;
1098         }
1099 
1100         if (command.choice == CommandChoice::kExit) {
1101           LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating";
1102           return true;
1103         }
1104 
1105         if (!ReceiveCommand(command)) {
1106           // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command;
1107         }
1108 
1109         // ReceiveCommand should dup to keep the FD. Avoid leaks.
1110         if (command.fd.has_value()) {
1111           close(*command.fd);
1112         }
1113       }
1114     }
1115 
1116     LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating";
1117 
1118     return true;
1119     // Terminate.
1120   }
1121 
Impl(PrefetcherDaemon * daemon)1122   Impl(PrefetcherDaemon* daemon) {
1123     session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect);
1124     DCHECK(session_manager_ != nullptr);
1125   };
1126 
~Impl()1127   ~Impl() {
1128     // Don't do anything if we never called 'StartViaFork'
1129     if (forked_) {
1130       if (!IsDaemon()) {
1131         int status;
1132         waitpid(child_, /*out*/&status, /*options*/0);
1133       } else {
1134         LOG(WARNING) << "execve should have avoided this path";
1135         // DCHECK(false) << "not possible because the execve would avoid this path";
1136       }
1137     }
1138   }
1139 
SendCommand(const Command & command)1140   bool SendCommand(const Command& command) {
1141     // Only parent is the sender.
1142     DCHECK(forked_);
1143     //DCHECK(!IsDaemon());
1144 
1145     char buf[1024];
1146     size_t stream_size;
1147     if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) {
1148       PLOG(ERROR) << "Failed to serialize command: " << command;
1149       return false;
1150     }
1151 
1152     if (LogVerboseIpc()) {
1153       LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf
1154                    << ", size=" << stream_size<< ")";
1155     }
1156 
1157     if (params_.use_sockets) {
1158       /* iov contains the normal message (Command) */
1159       struct iovec iov;
1160       memset(&iov, 0, sizeof(iov));
1161       iov.iov_base = &buf[0];
1162       iov.iov_len = stream_size;
1163 
1164       struct msghdr msg;
1165       memset(&msg, 0, sizeof(msg));
1166 
1167       /* point to iov to transmit */
1168       msg.msg_iov = &iov;
1169       msg.msg_iovlen = 1;
1170 
1171       /* no dest address; socket is connected */
1172       msg.msg_name = nullptr;
1173       msg.msg_namelen = 0;
1174 
1175       // append a CMSG with SCM_RIGHTS if we have an FD.
1176       if (command.fd.has_value()) {
1177           union {
1178             struct cmsghdr cmh;
1179             char   control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */
1180           } control_un;
1181           memset(&control_un, 0, sizeof(control_un));
1182 
1183           msg.msg_control = &control_un.control[0];
1184           msg.msg_controllen = sizeof(control_un.control);
1185 
1186           struct cmsghdr *hp;
1187           hp = CMSG_FIRSTHDR(&msg);
1188           hp->cmsg_len = CMSG_LEN(sizeof(int));
1189           hp->cmsg_level = SOL_SOCKET;
1190           hp->cmsg_type = SCM_RIGHTS;
1191           *((int *) CMSG_DATA(hp)) = *(command.fd);
1192 
1193           DCHECK(command.RequiresFd()) << command;
1194 
1195           if (LogVerboseIpc()) {
1196             LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd);
1197           }
1198       }
1199 
1200       // TODO: add CMSG for the FD passage.
1201 
1202       if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) {
1203         PLOG(ERROR) << "Failed to sendmsg command: " << command;
1204         return false;
1205       }
1206     } else {
1207       if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) {
1208         PLOG(ERROR) << "Failed to write command: " << command;
1209         return false;
1210       }
1211     }
1212 
1213     if (LogVerboseIpc()) {
1214       LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf
1215                    << ", size=" << stream_size<< ")";
1216     }
1217 
1218     // TODO: also read the reply?
1219     return true;
1220   }
1221 
ReceiveCommand(const Command & command)1222   bool ReceiveCommand(const Command& command) {
1223     // Only child is the command receiver.
1224     // DCHECK(IsDaemon());
1225 
1226     switch (command.choice) {
1227       case CommandChoice::kRegisterFilePath: {
1228         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1229 
1230         if (!session) {
1231           LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1232           return false;
1233         }
1234 
1235         CHECK(command.file_path.has_value()) << command;
1236         return session->RegisterFilePath(command.id, *command.file_path);
1237       }
1238       case CommandChoice::kUnregisterFilePath: {
1239         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1240 
1241         if (!session) {
1242           LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1243           return false;
1244         }
1245 
1246         return session->UnregisterFilePath(command.id);
1247       }
1248       case CommandChoice::kReadAhead: {
1249         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1250 
1251         if (!session) {
1252           LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1253           return false;
1254         }
1255 
1256         return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset);
1257       }
1258       // TODO: unreadahead
1259       case CommandChoice::kExit: {
1260         LOG(WARNING) << "kExit should be handled earlier.";
1261         return true;
1262       }
1263       case CommandChoice::kCreateSession: {
1264         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1265         if (session != nullptr) {
1266           LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
1267           return false;
1268         }
1269         CHECK(command.file_path.has_value()) << command;
1270         if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path)
1271                 == nullptr) {
1272           LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command;
1273           return false;
1274         }
1275         return true;
1276       }
1277       case CommandChoice::kDestroySession: {
1278         if (!session_manager_->DestroySession(command.session_id)) {
1279           LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command;
1280           return false;
1281         }
1282         return true;
1283       }
1284       case CommandChoice::kDumpSession: {
1285         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1286 
1287         if (!session) {
1288           LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command;
1289           return false;
1290         }
1291 
1292         // TODO: Consider doing dumpsys support somehow?
1293         session->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
1294         return true;
1295       }
1296       case CommandChoice::kDumpEverything: {
1297         session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true);
1298         break;
1299       }
1300       case CommandChoice::kCreateFdSession: {
1301         std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id);
1302         if (session != nullptr) {
1303           LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command;
1304           return false;
1305         }
1306         CHECK(command.file_path.has_value()) << command;
1307         CHECK(command.fd.has_value()) << command;
1308 
1309         LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd);
1310 
1311         // TODO: Maybe use CreateFdSession instead?
1312         session =
1313             session_manager_->CreateSession(command.session_id,
1314                                             /*description*/*command.file_path,
1315                                             command.fd.value());
1316         if (session == nullptr) {
1317           LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command;
1318           return false;
1319         }
1320 
1321         return session->ProcessFd(*command.fd);
1322       }
1323     }
1324 
1325     return true;
1326   }
1327 
1328   pid_t child_;
1329   bool forked_;
1330   int pipefd_read_;
1331   int pipefd_write_;
1332   PrefetcherForkParameters params_;
1333   // do not ever use an indirect session manager here, as it would cause a lifetime cycle.
1334   std::unique_ptr<SessionManager> session_manager_; // direct only.
1335 };
1336 
PrefetcherDaemon()1337 PrefetcherDaemon::PrefetcherDaemon()
1338   : impl_{new Impl{this}} {
1339   LOG(VERBOSE) << "PrefetcherDaemon() constructor";
1340 }
1341 
StartViaFork(PrefetcherForkParameters params)1342 bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) {
1343   return impl_->StartViaFork(std::move(params));
1344 }
1345 
1346 
StartPipesViaFork()1347 std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() {
1348   return impl_->StartPipesViaFork();
1349 }
1350 
StartSocketViaFork()1351 std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() {
1352   return impl_->StartSocketViaFork();
1353 }
1354 
Main(PrefetcherForkParameters params)1355 bool PrefetcherDaemon::Main(PrefetcherForkParameters params) {
1356   return impl_->Main(params);
1357 }
1358 
SendCommand(const Command & command)1359 bool PrefetcherDaemon::SendCommand(const Command& command) {
1360   return impl_->SendCommand(command);
1361 }
1362 
~PrefetcherDaemon()1363 PrefetcherDaemon::~PrefetcherDaemon() {
1364   // required for unique_ptr for incomplete types.
1365 }
1366 
1367 }  // namespace iorap::prefetcher
1368