1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
37 }
38
39 namespace OHOS {
40 namespace Media {
41 namespace Plugins {
42 namespace FileFdSource {
43 namespace {
44 constexpr int32_t FDPOS = 2;
45 constexpr int32_t READ_TIME = 3;
46 constexpr size_t CACHE_SIZE = 40 * 1024 * 1024;
47 constexpr size_t PER_CACHE_SIZE = 48 * 10 * 1024;;
48 constexpr int32_t TEN_MILLISECOUNDS = 10 * 1000;
49 constexpr int32_t ONE_SECONDS = 1 * 1000 * 1000;
50 constexpr int32_t CACHE_TIME_DEFAULT = 5;
51 constexpr int32_t SEEK_TIME_LOWER = 20;
52 constexpr int32_t SEEK_TIME_UPPER = 1000;
53 constexpr int32_t RECORD_TIME_INTERVAL = 1 * 1000;
54 constexpr int32_t MILLISECOUND_TO_SECOND = 1 * 1000;
55 constexpr int32_t RETRY_TIMES = 3;
56 constexpr int32_t TO_BYTE = 8;
57 constexpr int32_t PERCENT_100 = 100;
58 constexpr int32_t MAX_RANK = 100;
59 constexpr int32_t READ_RETRY = 2;
60 constexpr float CACHE_LEVEL_1 = 0.3;
61
62 constexpr unsigned int HMDFS_IOC = 0xf2;
63 #define IOCTL_CLOUD 2
64 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
65 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
66 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
67 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
68
GetFileSize(int32_t fd)69 uint64_t GetFileSize(int32_t fd)
70 {
71 uint64_t fileSize = 0;
72 struct stat s {};
73 int ret = fstat(fd, &s);
74 if (ret == 0) {
75 fileSize = static_cast<uint64_t>(s.st_size);
76 FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
77 return fileSize;
78 } else {
79 MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
80 }
81 return fileSize;
82 }
isNumber(const std::string & str)83 bool isNumber(const std::string& str)
84 {
85 return str.find_first_not_of("0123456789") == std::string::npos;
86 }
87 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)88 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
89 {
90 MEDIA_LOG_I("fileSourceRegister is started");
91 SourcePluginDef definition;
92 definition.name = "FileFdSource";
93 definition.description = "File Fd source";
94 definition.rank = MAX_RANK; // 100: max rank
95 Capability capability;
96 capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
97 definition.AddInCaps(capability);
98 auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
99 return std::make_shared<FileFdSourcePlugin>(name);
100 };
101 definition.SetCreator(func);
102 return reg->AddPlugin(definition);
103 }
104
__anon06d37a240402null105 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
106
FileFdSourcePlugin(std::string name)107 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
108 : SourcePlugin(std::move(name))
109 {
110 }
111
~FileFdSourcePlugin()112 FileFdSourcePlugin::~FileFdSourcePlugin()
113 {
114 MEDIA_LOG_I("~FileFdSourcePlugin in.");
115 steadyClock_.Reset();
116 SetInterruptState(true);
117 MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
118 FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
119 downloadTask_->Stop();
120 MEDIA_LOG_I("~FileFdSourcePlugin out.");
121 }
122
SetCallback(Callback * cb)123 Status FileFdSourcePlugin::SetCallback(Callback* cb)
124 {
125 MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
126 callback_ = cb;
127 return Status::OK;
128 }
129
SetSource(std::shared_ptr<MediaSource> source)130 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
131 {
132 MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
133 auto err = ParseUriInfo(source->GetSourceUri());
134 if (err != Status::OK) {
135 MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
136 return err;
137 }
138 CheckFileType();
139 if (isCloudFile_ && isEnableFdCache_) {
140 ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
141 FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
142 Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
143 downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
144 FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
145 downloadTask_->RegisterJob([this] {
146 CacheDataLoop();
147 return 0;
148 });
149 downloadTask_->Start();
150 steadyClock_.Reset();
151 }
152 return Status::OK;
153 }
154
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)155 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
156 {
157 return Read(0, buffer, offset, expectedLen);
158 }
159
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)160 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
161 {
162 FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
163 if (isCloudFile_) {
164 return ReadOnlineFile(0, buffer, offset, expectedLen);
165 } else {
166 return ReadOfflineFile(0, buffer, offset, expectedLen);
167 }
168 }
169
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)170 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
171 uint64_t offset, size_t expectedLen)
172 {
173 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
174 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
175 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
176 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
177 MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
178
179 int32_t offsetCur = lseek(fd_, 0, SEEK_CUR);
180 if (static_cast<uint64_t>(offsetCur) != position_) {
181 MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D32 ", offsetOld " PUBLIC_LOG_U64,
182 offsetCur, position_.load());
183 }
184 auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
185 if (size <= 0) {
186 HandleReadResult(expectedLen, size);
187 MEDIA_LOG_D("ReadLocal END_OF_STREAM");
188 return Status::END_OF_STREAM;
189 }
190 bufData->UpdateDataSize(size);
191 position_ += static_cast<uint64_t>(size);
192 if (buffer->GetMemory() != nullptr) {
193 MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
194 position_.load(), buffer->GetMemory()->GetSize());
195 }
196 return Status::OK;
197 }
198
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)199 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
200 uint64_t offset, size_t expectedLen)
201 {
202 if (isBuffering_) {
203 if (HandleBuffering()) {
204 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
205 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
206 MEDIA_LOG_I("is buffering, return error again.");
207 return Status::ERROR_AGAIN;
208 }
209 }
210
211 std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
212 FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
213 expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
214 expectedLen = std::min(bufData->GetCapacity(), expectedLen);
215
216 // ringbuffer 0 after seek in 20ms, don't notify buffering
217 curReadTime_ = steadyClock2_.ElapsedMilliseconds();
218 if (isReadFrame_ && ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
219 (GetLastSize(position_) > static_cast<int64_t>(expectedLen))) {
220 MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
221 " lastReadTime_ " PUBLIC_LOG_D64, ringBuffer_->GetSize(), curReadTime_, lastReadTime_);
222 CheckReadTime();
223 FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
224 FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
225 return Status::ERROR_AGAIN;
226 }
227
228 size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
229 if (size == 0) {
230 MEDIA_LOG_I("read size 0,fd " PUBLIC_LOG_D32 ",offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
231 PUBLIC_LOG_U64 ",readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
232 FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
233 bufData->UpdateDataSize(0);
234 return Status::OK;
235 }
236 bufData->UpdateDataSize(size);
237 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
238 if (ct > READ_TIME) {
239 MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
240 " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
241 }
242 position_ += static_cast<uint64_t>(size);
243 MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
244 return Status::OK;
245 }
246
SeekTo(uint64_t offset)247 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
248 {
249 FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
250 Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
251
252 MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
253 if (isCloudFile_) {
254 return SeekToOnlineFile(offset);
255 } else {
256 return SeekToOfflineFile(offset);
257 }
258 }
259
SeekToOfflineFile(uint64_t offset)260 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
261 {
262 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
263 if (ret == -1) {
264 MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
265 PUBLIC_LOG_S, fd_, offset, strerror(errno));
266 return Status::ERROR_UNKNOWN;
267 }
268 position_ = offset + static_cast<uint64_t>(offset_);
269 MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
270 return Status::OK;
271 }
272
SeekToOnlineFile(uint64_t offset)273 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
274 {
275 FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
276 MEDIA_LOG_D("SeekCloud,ringBuffer.size: " PUBLIC_LOG_ZU ",offset " PUBLIC_LOG_U64, ringBuffer_->GetSize(), offset);
277 if (ringBuffer_->Seek(offset)) {
278 position_ = offset + static_cast<uint64_t>(offset_);
279 MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
280 return Status::OK;
281 }
282 // First clear buffer, avoid no available buffer then task pause never exit.
283 ringBuffer_->SetActive(false);
284 inSeek_ = true;
285 if (downloadTask_ != nullptr) {
286 downloadTask_->Pause();
287 inSeek_ = false;
288 }
289 ringBuffer_->Clear();
290 ringBuffer_->SetMediaOffset(offset);
291 ringBuffer_->SetActive(true);
292
293 int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
294 if (ret == -1) {
295 MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
296 PUBLIC_LOG_S, fd_, offset, strerror(errno));
297 return Status::ERROR_UNKNOWN;
298 }
299 position_ = offset + static_cast<uint64_t>(offset_);
300 cachePosition_ = position_.load();
301
302 MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
303 ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
304 if (downloadTask_ != nullptr) {
305 downloadTask_->Start();
306 }
307 return Status::OK;
308 }
309
ParseUriInfo(const std::string & uri)310 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
311 {
312 if (uri.empty()) {
313 MEDIA_LOG_E("uri is empty");
314 return Status::ERROR_INVALID_PARAMETER;
315 }
316 std::smatch fdUriMatch;
317 FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
318 std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
319 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
320 FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
321 Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
322 fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
323 FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
324 Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
325 fileSize_ = GetFileSize(fd_);
326 if (fdUriMatch.size() == 4) { // 4:4 sub match
327 offset_ = std::stoll(fdUriMatch[2].str()); // 2: sub match offset subscript
328 if (static_cast<uint64_t>(offset_) > fileSize_) {
329 offset_ = fileSize_;
330 }
331 size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
332 uint64_t remainingSize = fileSize_ - offset_;
333 if (size_ > remainingSize) {
334 size_ = remainingSize;
335 }
336 } else {
337 size_ = fileSize_;
338 offset_ = 0;
339 }
340 position_ = offset_;
341 seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
342 if (seekable_ == Seekable::SEEKABLE) {
343 NOK_LOG(SeekTo(0));
344 }
345 MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
346 return Status::OK;
347 }
348
CacheDataLoop()349 void FileFdSourcePlugin::CacheDataLoop()
350 {
351 if (isInterrupted_) {
352 MEDIA_LOG_E("CacheData break");
353 usleep(TEN_MILLISECOUNDS);
354 return;
355 }
356
357 int64_t curTime = steadyClock_.ElapsedMilliseconds();
358 GetCurrentSpeed(curTime);
359
360 size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
361 if (bufferSize < 0) {
362 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
363 usleep(TEN_MILLISECOUNDS);
364 return;
365 }
366
367 char* cacheBuffer = new char[bufferSize];
368 if (cacheBuffer == nullptr) {
369 MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
370 usleep(TEN_MILLISECOUNDS);
371 return;
372 }
373 int size = read(fd_, cacheBuffer, bufferSize);
374 if (size <= 0) {
375 DeleteCacheBuffer(cacheBuffer, bufferSize);
376 HandleReadResult(bufferSize, size);
377 return;
378 }
379 MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 "cachePos_ " PUBLIC_LOG_U64 " ringBuffer_.size() " PUBLIC_LOG_ZU
380 ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
381 while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
382 MEDIA_LOG_I("CacheData ringbuffer write failed");
383 if (inSeek_ || isInterrupted_) {
384 DeleteCacheBuffer(cacheBuffer, bufferSize);
385 return;
386 }
387 usleep(TEN_MILLISECOUNDS);
388 }
389 cachePosition_ += static_cast<uint64_t>(size);
390 downloadSize_ += static_cast<uint64_t>(size);
391
392 int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
393 if (ct > READ_TIME) {
394 MEDIA_LOG_I("Cache fd: " PUBLIC_LOG_D32 "cachePos:" PUBLIC_LOG_U64 ",ringBuffer.size() " PUBLIC_LOG_ZU ", size "
395 PUBLIC_LOG_U64 " cTime: " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_, ct);
396 }
397
398 DeleteCacheBuffer(cacheBuffer, bufferSize);
399
400 if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
401 GetLastSize(cachePosition_.load()) == 0)) {
402 NotifyBufferingEnd();
403 }
404 }
405
HasCacheData(size_t bufferSize,uint64_t offset)406 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
407 {
408 HmdfsHasCache ioctlData;
409 ioctlData.offset = static_cast<int64_t>(offset);
410 ioctlData.readSize = static_cast<int64_t>(bufferSize);
411 int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0在 -1不在
412
413 ioctlData.offset = static_cast<int64_t>(cachePosition_);
414 ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
415 int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
416 // ioctl has cache
417 if (ioResult == 0 && ioCacheResult == 0) {
418 return true;
419 } else {
420 MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32, errno);
421 }
422 return false;
423 }
424
Stop()425 Status FileFdSourcePlugin::Stop()
426 {
427 MEDIA_LOG_I("Stop enter.");
428 isInterrupted_ = true;
429 MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
430 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
431 downloadTask_->StopAsync();
432 return Status::OK;
433 }
434
Reset()435 Status FileFdSourcePlugin::Reset()
436 {
437 MEDIA_LOG_I("Reset enter.");
438 isInterrupted_ = true;
439 MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
440 FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
441 downloadTask_->StopAsync();
442 return Status::OK;
443 }
444
PauseDownloadTask(bool isAsync)445 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
446 {
447 FALSE_RETURN(downloadTask_ != nullptr);
448 if (isAsync) {
449 downloadTask_->PauseAsync();
450 } else {
451 downloadTask_->Pause();
452 }
453 }
454
HandleBuffering()455 bool FileFdSourcePlugin::HandleBuffering()
456 {
457 MEDIA_LOG_I("HandleBuffering in.");
458 int32_t sleepTime = 0;
459 // return error again 1 time 1s, avoid ffmpeg error
460 while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
461 NotifyBufferingPercent();
462 if (!isBuffering_) {
463 break;
464 }
465 MEDIA_LOG_I("isBuffering.");
466 usleep(TEN_MILLISECOUNDS);
467 sleepTime += TEN_MILLISECOUNDS;
468 }
469 MEDIA_LOG_I("HandleBuffering out.");
470 return isBuffering_;
471 }
472
HandleReadResult(size_t bufferSize,int size)473 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
474 {
475 MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
476 ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
477 PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
478 if (size < 0) {
479 // errno EIO 5
480 MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
481
482 // read fail with errno, retry 3 * 10ms
483 retryTimes_++;
484 if (retryTimes_ >= RETRY_TIMES) {
485 NotifyReadFail();
486 SetInterruptState(true);
487 }
488 usleep(TEN_MILLISECOUNDS);
489 } else {
490 cachePosition_ = 0;
491 PauseDownloadTask(false);
492 }
493 }
494
NotifyBufferingStart()495 void FileFdSourcePlugin::NotifyBufferingStart()
496 {
497 MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
498 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
499 isBuffering_ = true;
500 if (callback_ != nullptr && !isInterrupted_) {
501 MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
502 callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
503 } else {
504 MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
505 }
506 }
507
NotifyBufferingPercent()508 void FileFdSourcePlugin::NotifyBufferingPercent()
509 {
510 if (waterLineAbove_ != 0) {
511 int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
512 bp = bp > PERCENT_100 ? PERCENT_100 : bp;
513 if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
514 MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
515 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
516 callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
517 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
518 } else {
519 MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
520 is true or isBuffering_ is false");
521 }
522 }
523 }
524
NotifyBufferingEnd()525 void FileFdSourcePlugin::NotifyBufferingEnd()
526 {
527 NotifyBufferingPercent();
528 MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
529 ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
530 isBuffering_ = false;
531 lastReadTime_ = 0;
532 if (callback_ != nullptr && !isInterrupted_) {
533 MEDIA_LOG_I("NotifyBufferingEnd success .");
534 callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
535 } else {
536 MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
537 }
538 }
539
NotifyReadFail()540 void FileFdSourcePlugin::NotifyReadFail()
541 {
542 MEDIA_LOG_I("NotifyReadFail in.");
543 if (callback_ != nullptr && !isInterrupted_) {
544 MEDIA_LOG_I("Read OnEvent read fail");
545 callback_->OnEvent({PluginEventType::CLIENT_ERROR, {NetworkClientErrorCode::ERROR_TIME_OUT}, "read"});
546 } else {
547 MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
548 }
549 }
550
SetDemuxerState(int32_t streamId)551 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
552 {
553 MEDIA_LOG_I("SetDemuxerState");
554 isReadFrame_ = true;
555 }
556
SetCurrentBitRate(int32_t bitRate,int32_t streamID)557 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
558 {
559 currentBitRate_ = bitRate / TO_BYTE; // 8b
560 MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
561 // default cache 0.3s
562 waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
563 return Status::OK;
564 }
565
SetBundleName(const std::string & bundleName)566 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
567 {
568 MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
569 }
570
SetReadBlockingFlag(bool isAllowed)571 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
572 {
573 MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
574 if (ringBuffer_) {
575 ringBuffer_->SetReadBlocking(isAllowed);
576 }
577 isReadBlocking_ = isAllowed;
578 return Status::OK;
579 }
580
SetInterruptState(bool isInterruptNeeded)581 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
582 {
583 MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
584 isInterrupted_ = isInterruptNeeded;
585 if (ringBuffer_ != nullptr) {
586 if (isInterrupted_) {
587 ringBuffer_->SetActive(false);
588 } else {
589 ringBuffer_->SetActive(true);
590 }
591 }
592
593 if (isInterrupted_ && isCloudFile_) {
594 if (downloadTask_ != nullptr) {
595 downloadTask_->StopAsync();
596 }
597 int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
598 MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
599 }
600 }
601
GetSize(uint64_t & size)602 Status FileFdSourcePlugin::GetSize(uint64_t& size)
603 {
604 size = size_;
605 return Status::OK;
606 }
607
GetSeekable()608 Seekable FileFdSourcePlugin::GetSeekable()
609 {
610 MEDIA_LOG_D("GetSeekable in");
611 return seekable_;
612 }
613
CheckFileType()614 void FileFdSourcePlugin::CheckFileType()
615 {
616 int loc; // 1本地,2云端
617 int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
618 MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
619 PUBLIC_LOG_D32, ioResult, loc, errno);
620
621 if (!isEnableFdCache_) {
622 isCloudFile_ = false;
623 return;
624 }
625
626 if (ioResult == 0) {
627 if (loc == IOCTL_CLOUD) {
628 isCloudFile_ = true;
629 MEDIA_LOG_I("ioctl file is cloud");
630 int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
631 MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
632 return;
633 } else {
634 isCloudFile_ = false;
635 MEDIA_LOG_I("ioctl file is local");
636 }
637 } else {
638 isCloudFile_ = false;
639 MEDIA_LOG_I("ioctl get file type");
640 }
641 }
642
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)643 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
644 {
645 if (!buffer) {
646 buffer = std::make_shared<Buffer>();
647 }
648 std::shared_ptr<Memory> bufData;
649 if (buffer->IsEmpty()) {
650 bufData = buffer->AllocMemory(nullptr, expectedLen);
651 } else {
652 bufData = buffer->GetMemory();
653 }
654 return bufData;
655 }
656
GetLastSize(uint64_t position)657 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
658 {
659 int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
660 if (ret < 0) {
661 MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
662 PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
663 }
664 return ret;
665 }
666
GetCurrentSpeed(int64_t curTime)667 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
668 {
669 if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
670 MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
671 PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
672 float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
673 avgDownloadSpeed_ = downloadSize_ / duration; //b/s
674 MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F "avgDownloadSpeed_: " PUBLIC_LOG_F,
675 duration, avgDownloadSpeed_);
676 downloadSize_ = 0;
677 lastCheckTime_ = curTime;
678 FALSE_RETURN(currentBitRate_ > 0);
679 UpdateWaterLineAbove();
680 }
681 }
682
UpdateWaterLineAbove()683 void FileFdSourcePlugin::UpdateWaterLineAbove()
684 {
685 FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
686 float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
687 MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F "avgDownloadSpeed_: " PUBLIC_LOG_F
688 "currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
689 waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
690 GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
691 MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
692 }
693
GetCacheTime(float num)694 float FileFdSourcePlugin::GetCacheTime(float num)
695 {
696 MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
697 if (num < 0) {
698 return CACHE_LEVEL_1;
699 }
700 if (num > 0 && num < 0.5) { // (0, 0.5)
701 return CACHE_TIME_DEFAULT;
702 } else if (num >= 0.5 && num < 1) { //[0.5, 1)
703 return CACHE_TIME_DEFAULT;
704 } else if (num >= 1) { //[1, 2)
705 return CACHE_LEVEL_1;
706 }
707 return CACHE_TIME_DEFAULT;
708 }
709
DeleteCacheBuffer(char * buffer,size_t bufferSize)710 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
711 {
712 if (buffer != nullptr && bufferSize > 0) {
713 delete[] buffer;
714 }
715 }
716
CheckReadTime()717 void FileFdSourcePlugin::CheckReadTime()
718 {
719 if (IsValidTime(curReadTime_, lastReadTime_)) {
720 NotifyBufferingStart();
721 lastReadTime_ = 0;
722 } else {
723 if (lastReadTime_ == 0) {
724 lastReadTime_ = curReadTime_;
725 }
726 }
727 }
728
IsValidTime(int64_t curTime,int64_t lastTime)729 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
730 {
731 return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
732 curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
733 }
734
SetEnableOnlineFdCache(bool isEnableFdCache)735 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
736 {
737 isEnableFdCache_ = isEnableFdCache;
738 }
739 } // namespace FileFdSource
740 } // namespace Plugin
741 } // namespace Media
742 } // namespace OHOS