1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #define HST_LOG_TAG "FileFdSourcePlugin"
17 
18 #include <cerrno>
19 #include <cstring>
20 #include <regex>
21 #include <memory>
22 #ifdef WIN32
23 #include <fcntl.h>
24 #else
25 #include <sys/types.h>
26 #include <unistd.h>
27 #endif
28 #include <sys/ioctl.h>
29 #include <sys/stat.h>
30 #include "common/log.h"
31 #include "osal/filesystem/file_system.h"
32 #include "file_fd_source_plugin.h"
33 #include "common/media_core.h"
34 
35 namespace {
36 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_SYSTEM_PLAYER, "FileFdSourcePlugin" };
37 }
38 
39 namespace OHOS {
40 namespace Media {
41 namespace Plugins {
42 namespace FileFdSource {
43 namespace {
44 constexpr int32_t FDPOS                         = 2;
45 constexpr int32_t READ_TIME                     = 3;
46 constexpr size_t CACHE_SIZE                     = 40 * 1024 * 1024;
47 constexpr size_t PER_CACHE_SIZE                 = 48 * 10 * 1024;;
48 constexpr int32_t TEN_MILLISECOUNDS             = 10 * 1000;
49 constexpr int32_t ONE_SECONDS                   = 1 * 1000 * 1000;
50 constexpr int32_t CACHE_TIME_DEFAULT            = 5;
51 constexpr int32_t SEEK_TIME_LOWER               = 20;
52 constexpr int32_t SEEK_TIME_UPPER               = 1000;
53 constexpr int32_t RECORD_TIME_INTERVAL          = 1 * 1000;
54 constexpr int32_t MILLISECOUND_TO_SECOND        = 1 * 1000;
55 constexpr int32_t RETRY_TIMES                   = 3;
56 constexpr int32_t TO_BYTE                       = 8;
57 constexpr int32_t PERCENT_100                   = 100;
58 constexpr int32_t MAX_RANK                      = 100;
59 constexpr int32_t READ_RETRY                    = 2;
60 constexpr float CACHE_LEVEL_1                   = 0.3;
61 
62 constexpr unsigned int HMDFS_IOC = 0xf2;
63 #define IOCTL_CLOUD 2
64 #define HMDFS_IOC_HAS_CACHE _IOW(HMDFS_IOC, 6, struct HmdfsHasCache)
65 #define HMDFS_IOC_GET_LOCATION _IOR(HMDFS_IOC, 7, __u32)
66 #define HMDFS_IOC_CANCEL_READ _IO(HMDFS_IOC, 8)
67 #define HMDFS_IOC_RESTORE_READ _IO(HMDFS_IOC, 9)
68 
GetFileSize(int32_t fd)69 uint64_t GetFileSize(int32_t fd)
70 {
71     uint64_t fileSize = 0;
72     struct stat s {};
73     int ret = fstat(fd, &s);
74     if (ret == 0) {
75         fileSize = static_cast<uint64_t>(s.st_size);
76         FALSE_RETURN_V_MSG_E(fileSize != 0, fileSize, "fileSize 0, fstat ret 0");
77         return fileSize;
78     } else {
79         MEDIA_LOG_W("GetFileSize error ret " PUBLIC_LOG_D32 ", errno " PUBLIC_LOG_D32, ret, errno);
80     }
81     return fileSize;
82 }
isNumber(const std::string & str)83 bool isNumber(const std::string& str)
84 {
85     return str.find_first_not_of("0123456789") == std::string::npos;
86 }
87 }
FileFdSourceRegister(const std::shared_ptr<Register> & reg)88 Status FileFdSourceRegister(const std::shared_ptr<Register>& reg)
89 {
90     MEDIA_LOG_I("fileSourceRegister is started");
91     SourcePluginDef definition;
92     definition.name = "FileFdSource";
93     definition.description = "File Fd source";
94     definition.rank = MAX_RANK; // 100: max rank
95     Capability capability;
96     capability.AppendFixedKey<std::vector<ProtocolType>>(Tag::MEDIA_PROTOCOL_TYPE, {ProtocolType::FD});
97     definition.AddInCaps(capability);
98     auto func = [](const std::string& name) -> std::shared_ptr<SourcePlugin> {
99         return std::make_shared<FileFdSourcePlugin>(name);
100     };
101     definition.SetCreator(func);
102     return reg->AddPlugin(definition);
103 }
104 
__anon06d37a240402null105 PLUGIN_DEFINITION(FileFdSource, LicenseType::APACHE_V2, FileFdSourceRegister, [] {});
106 
FileFdSourcePlugin(std::string name)107 FileFdSourcePlugin::FileFdSourcePlugin(std::string name)
108     : SourcePlugin(std::move(name))
109 {
110 }
111 
~FileFdSourcePlugin()112 FileFdSourcePlugin::~FileFdSourcePlugin()
113 {
114     MEDIA_LOG_I("~FileFdSourcePlugin in.");
115     steadyClock_.Reset();
116     SetInterruptState(true);
117     MEDIA_LOG_I("~FileFdSourcePlugin isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
118     FALSE_RETURN_MSG(downloadTask_ != nullptr, "~FileFdSourcePlugin out.");
119     downloadTask_->Stop();
120     MEDIA_LOG_I("~FileFdSourcePlugin out.");
121 }
122 
SetCallback(Callback * cb)123 Status FileFdSourcePlugin::SetCallback(Callback* cb)
124 {
125     MEDIA_LOG_D("SetCallback in " PUBLIC_LOG_D32, cb != nullptr);
126     callback_ = cb;
127     return Status::OK;
128 }
129 
SetSource(std::shared_ptr<MediaSource> source)130 Status FileFdSourcePlugin::SetSource(std::shared_ptr<MediaSource> source)
131 {
132     MEDIA_LOG_I("SetSource in. %{private}s", source->GetSourceUri().c_str());
133     auto err = ParseUriInfo(source->GetSourceUri());
134     if (err != Status::OK) {
135         MEDIA_LOG_E("Parse file name from uri fail, uri %{private}s", source->GetSourceUri().c_str());
136         return err;
137     }
138     CheckFileType();
139     if (isCloudFile_ && isEnableFdCache_) {
140         ringBuffer_ = std::make_shared<RingBuffer>(CACHE_SIZE);
141         FALSE_RETURN_V_MSG_E(!(ringBuffer_ == nullptr || !ringBuffer_->Init()),
142             Status::ERROR_NO_MEMORY, "memory is not enough ringBuffer_");
143         downloadTask_ = std::make_shared<Task>(std::string("downloadTaskFD"));
144         FALSE_RETURN_V_MSG_E(downloadTask_ != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
145         downloadTask_->RegisterJob([this] {
146             CacheDataLoop();
147             return 0;
148         });
149         downloadTask_->Start();
150         steadyClock_.Reset();
151     }
152     return Status::OK;
153 }
154 
Read(std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)155 Status FileFdSourcePlugin::Read(std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
156 {
157     return Read(0, buffer, offset, expectedLen);
158 }
159 
Read(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)160 Status FileFdSourcePlugin::Read(int32_t streamId, std::shared_ptr<Buffer>& buffer, uint64_t offset, size_t expectedLen)
161 {
162     FALSE_RETURN_V_MSG_E(fd_ != -1, Status::ERROR_WRONG_STATE, "no valid fd");
163     if (isCloudFile_) {
164         return ReadOnlineFile(0, buffer, offset, expectedLen);
165     } else {
166         return ReadOfflineFile(0, buffer, offset, expectedLen);
167     }
168 }
169 
ReadOfflineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)170 Status FileFdSourcePlugin::ReadOfflineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
171     uint64_t offset, size_t expectedLen)
172 {
173     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
174     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
175     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
176     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
177     MEDIA_LOG_D("ReadLocal buffer pos: " PUBLIC_LOG_U64 " , len:" PUBLIC_LOG_ZU, position_.load(), expectedLen);
178 
179     int32_t offsetCur = lseek(fd_, 0, SEEK_CUR);
180     if (static_cast<uint64_t>(offsetCur) != position_) {
181         MEDIA_LOG_E("Fd offsetCur has changed. offsetCur " PUBLIC_LOG_D32 ", offsetOld " PUBLIC_LOG_U64,
182             offsetCur, position_.load());
183     }
184     auto size = read(fd_, bufData->GetWritableAddr(expectedLen), expectedLen);
185     if (size <= 0) {
186         HandleReadResult(expectedLen, size);
187         MEDIA_LOG_D("ReadLocal END_OF_STREAM");
188         return Status::END_OF_STREAM;
189     }
190     bufData->UpdateDataSize(size);
191     position_ += static_cast<uint64_t>(size);
192     if (buffer->GetMemory() != nullptr) {
193         MEDIA_LOG_D("ReadLocal position_ " PUBLIC_LOG_U64 ", readSize " PUBLIC_LOG_ZU,
194             position_.load(), buffer->GetMemory()->GetSize());
195     }
196     return Status::OK;
197 }
198 
ReadOnlineFile(int32_t streamId,std::shared_ptr<Buffer> & buffer,uint64_t offset,size_t expectedLen)199 Status FileFdSourcePlugin::ReadOnlineFile(int32_t streamId, std::shared_ptr<Buffer>& buffer,
200     uint64_t offset, size_t expectedLen)
201 {
202     if (isBuffering_) {
203         if (HandleBuffering()) {
204             FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
205             FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
206             MEDIA_LOG_I("is buffering, return error again.");
207             return Status::ERROR_AGAIN;
208         }
209     }
210 
211     std::shared_ptr<Memory> bufData = GetBufferPtr(buffer, expectedLen);
212     FALSE_RETURN_V_MSG_E(bufData != nullptr, Status::ERROR_NO_MEMORY, "memory is not enough");
213     expectedLen = std::min(static_cast<size_t>(GetLastSize(position_)), expectedLen);
214     expectedLen = std::min(bufData->GetCapacity(), expectedLen);
215 
216     // ringbuffer 0 after seek in 20ms, don't notify buffering
217     curReadTime_ = steadyClock2_.ElapsedMilliseconds();
218     if (isReadFrame_ &&  ringBuffer_->GetSize() < expectedLen && !HasCacheData(expectedLen, offset) &&
219          (GetLastSize(position_) > static_cast<int64_t>(expectedLen))) {
220         MEDIA_LOG_I("ringBuffer.size() " PUBLIC_LOG_ZU " curReadTime_ " PUBLIC_LOG_D64
221             " lastReadTime_ " PUBLIC_LOG_D64, ringBuffer_->GetSize(), curReadTime_, lastReadTime_);
222         CheckReadTime();
223         FALSE_RETURN_V_MSG_E(!isInterrupted_, Status::OK, "please not retry read, isInterrupted true");
224         FALSE_RETURN_V_MSG_E(isReadBlocking_, Status::OK, "please not retry read, isReadBlocking false");
225         return Status::ERROR_AGAIN;
226     }
227 
228     size_t size = ringBuffer_->ReadBuffer(bufData->GetWritableAddr(expectedLen), expectedLen, READ_RETRY);
229     if (size == 0) {
230         MEDIA_LOG_I("read size 0,fd " PUBLIC_LOG_D32 ",offset " PUBLIC_LOG_D64 ", size:" PUBLIC_LOG_U64 ", pos:"
231             PUBLIC_LOG_U64 ",readBlock:" PUBLIC_LOG_D32, fd_, offset, size_, position_.load(), isReadBlocking_.load());
232         FALSE_RETURN_V_MSG_E(GetLastSize(position_) != 0, Status::END_OF_STREAM, "ReadCloud END_OF_STREAM");
233         bufData->UpdateDataSize(0);
234         return Status::OK;
235     }
236     bufData->UpdateDataSize(size);
237     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curReadTime_;
238     if (ct > READ_TIME) {
239         MEDIA_LOG_I("ReadCloud buffer position " PUBLIC_LOG_U64 ", expectedLen " PUBLIC_LOG_ZU
240         " costTime: " PUBLIC_LOG_U64, position_.load(), expectedLen, ct);
241     }
242     position_ += static_cast<uint64_t>(size);
243     MEDIA_LOG_D("ringBuffer.size() " PUBLIC_LOG_ZU, ringBuffer_->GetSize());
244     return Status::OK;
245 }
246 
SeekTo(uint64_t offset)247 Status FileFdSourcePlugin::SeekTo(uint64_t offset)
248 {
249     FALSE_RETURN_V_MSG_E(fd_ != -1 && seekable_ == Seekable::SEEKABLE,
250         Status::ERROR_WRONG_STATE, "no valid fd or no seekable.");
251 
252     MEDIA_LOG_D("SeekTo offset: " PUBLIC_LOG_U64, offset);
253     if (isCloudFile_) {
254         return SeekToOnlineFile(offset);
255     } else {
256         return SeekToOfflineFile(offset);
257     }
258 }
259 
SeekToOfflineFile(uint64_t offset)260 Status FileFdSourcePlugin::SeekToOfflineFile(uint64_t offset)
261 {
262     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
263     if (ret == -1) {
264         MEDIA_LOG_E("SeekLocal failed, fd " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
265             PUBLIC_LOG_S, fd_, offset, strerror(errno));
266         return Status::ERROR_UNKNOWN;
267     }
268     position_ = offset + static_cast<uint64_t>(offset_);
269     MEDIA_LOG_D("SeekLocal end ret " PUBLIC_LOG_D32 ", position_ " PUBLIC_LOG_U64, ret, position_.load());
270     return Status::OK;
271 }
272 
SeekToOnlineFile(uint64_t offset)273 Status FileFdSourcePlugin::SeekToOnlineFile(uint64_t offset)
274 {
275     FALSE_RETURN_V_MSG_E(ringBuffer_ != nullptr, Status::ERROR_WRONG_STATE, "SeekCloud ringBuffer_ is nullptr");
276     MEDIA_LOG_D("SeekCloud,ringBuffer.size: " PUBLIC_LOG_ZU ",offset " PUBLIC_LOG_U64, ringBuffer_->GetSize(), offset);
277     if (ringBuffer_->Seek(offset)) {
278         position_ = offset + static_cast<uint64_t>(offset_);
279         MEDIA_LOG_I("SeekCloud ringBuffer_ seek hit, offset " PUBLIC_LOG_U64, offset);
280         return Status::OK;
281     }
282     // First clear buffer, avoid no available buffer then task pause never exit.
283     ringBuffer_->SetActive(false);
284     inSeek_ = true;
285     if (downloadTask_ != nullptr) {
286         downloadTask_->Pause();
287         inSeek_ = false;
288     }
289     ringBuffer_->Clear();
290     ringBuffer_->SetMediaOffset(offset);
291     ringBuffer_->SetActive(true);
292 
293     int32_t ret = lseek(fd_, offset + static_cast<uint64_t>(offset_), SEEK_SET);
294     if (ret == -1) {
295         MEDIA_LOG_E("SeekCloud failed, fd_ " PUBLIC_LOG_D32 ", offset " PUBLIC_LOG_U64 ", errStr "
296             PUBLIC_LOG_S, fd_, offset, strerror(errno));
297         return Status::ERROR_UNKNOWN;
298     }
299     position_ = offset + static_cast<uint64_t>(offset_);
300     cachePosition_ = position_.load();
301 
302     MEDIA_LOG_D("SeekCloud end, fd_ " PUBLIC_LOG_D32 ", size_ " PUBLIC_LOG_U64 ", offset_ " PUBLIC_LOG_D64
303         ", position_ " PUBLIC_LOG_U64, fd_, size_, offset_, position_.load());
304     if (downloadTask_ != nullptr) {
305         downloadTask_->Start();
306     }
307     return Status::OK;
308 }
309 
ParseUriInfo(const std::string & uri)310 Status FileFdSourcePlugin::ParseUriInfo(const std::string& uri)
311 {
312     if (uri.empty()) {
313         MEDIA_LOG_E("uri is empty");
314         return Status::ERROR_INVALID_PARAMETER;
315     }
316     std::smatch fdUriMatch;
317     FALSE_RETURN_V_MSG_E(std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)\\?offset=(.*)&size=(.*)")) ||
318         std::regex_match(uri, fdUriMatch, std::regex("^fd://(.*)")),
319         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
320     FALSE_RETURN_V_MSG_E(fdUriMatch.size() >= FDPOS && isNumber(fdUriMatch[1].str()),
321         Status::ERROR_INVALID_PARAMETER, "Invalid fd uri format");
322     fd_ = std::stoi(fdUriMatch[1].str()); // 1: sub match fd subscript
323     FALSE_RETURN_V_MSG_E(fd_ != -1 && FileSystem::IsRegularFile(fd_),
324         Status::ERROR_INVALID_PARAMETER, "Invalid fd: " PUBLIC_LOG_D32, fd_);
325     fileSize_ = GetFileSize(fd_);
326     if (fdUriMatch.size() == 4) { // 4:4 sub match
327         offset_ = std::stoll(fdUriMatch[2].str()); // 2: sub match offset subscript
328         if (static_cast<uint64_t>(offset_) > fileSize_) {
329             offset_ = fileSize_;
330         }
331         size_ = static_cast<uint64_t>(std::stoll(fdUriMatch[3].str())); // 3: sub match size subscript
332         uint64_t remainingSize = fileSize_ - offset_;
333         if (size_ > remainingSize) {
334             size_ = remainingSize;
335         }
336     } else {
337         size_ = fileSize_;
338         offset_ = 0;
339     }
340     position_ = offset_;
341     seekable_ = FileSystem::IsSeekable(fd_) ? Seekable::SEEKABLE : Seekable::UNSEEKABLE;
342     if (seekable_ == Seekable::SEEKABLE) {
343         NOK_LOG(SeekTo(0));
344     }
345     MEDIA_LOG_I("Fd: " PUBLIC_LOG_D32 ", offset: " PUBLIC_LOG_D64 ", size: " PUBLIC_LOG_U64, fd_, offset_, size_);
346     return Status::OK;
347 }
348 
CacheDataLoop()349 void FileFdSourcePlugin::CacheDataLoop()
350 {
351     if (isInterrupted_) {
352         MEDIA_LOG_E("CacheData break");
353         usleep(TEN_MILLISECOUNDS);
354         return;
355     }
356 
357     int64_t curTime = steadyClock_.ElapsedMilliseconds();
358     GetCurrentSpeed(curTime);
359 
360     size_t bufferSize = std::min(PER_CACHE_SIZE, static_cast<size_t>(GetLastSize(cachePosition_.load())));
361     if (bufferSize < 0) {
362         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
363         usleep(TEN_MILLISECOUNDS);
364         return;
365     }
366 
367     char* cacheBuffer = new char[bufferSize];
368     if (cacheBuffer == nullptr) {
369         MEDIA_LOG_E("CacheData memory is not enough bufferSize " PUBLIC_LOG_ZU, bufferSize);
370         usleep(TEN_MILLISECOUNDS);
371         return;
372     }
373     int size = read(fd_, cacheBuffer, bufferSize);
374     if (size <= 0) {
375         DeleteCacheBuffer(cacheBuffer, bufferSize);
376         HandleReadResult(bufferSize, size);
377         return;
378     }
379     MEDIA_LOG_D("Cache fd: " PUBLIC_LOG_D32 "cachePos_ " PUBLIC_LOG_U64 " ringBuffer_.size() " PUBLIC_LOG_ZU
380         ", size_ " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_);
381     while (!ringBuffer_->WriteBuffer(cacheBuffer, size)) {
382         MEDIA_LOG_I("CacheData ringbuffer write failed");
383         if (inSeek_ || isInterrupted_) {
384             DeleteCacheBuffer(cacheBuffer, bufferSize);
385             return;
386         }
387         usleep(TEN_MILLISECOUNDS);
388     }
389     cachePosition_ += static_cast<uint64_t>(size);
390     downloadSize_ += static_cast<uint64_t>(size);
391 
392     int64_t ct = steadyClock2_.ElapsedMilliseconds() - curTime;
393     if (ct > READ_TIME) {
394         MEDIA_LOG_I("Cache fd: " PUBLIC_LOG_D32 "cachePos:" PUBLIC_LOG_U64 ",ringBuffer.size() " PUBLIC_LOG_ZU ", size "
395             PUBLIC_LOG_U64 " cTime: " PUBLIC_LOG_U64, fd_, cachePosition_.load(), ringBuffer_->GetSize(), size_, ct);
396     }
397 
398     DeleteCacheBuffer(cacheBuffer, bufferSize);
399 
400     if (isBuffering_ && (static_cast<int64_t>(ringBuffer_->GetSize()) > waterLineAbove_ ||
401         GetLastSize(cachePosition_.load()) == 0)) {
402         NotifyBufferingEnd();
403     }
404 }
405 
HasCacheData(size_t bufferSize,uint64_t offset)406 bool FileFdSourcePlugin::HasCacheData(size_t bufferSize, uint64_t offset)
407 {
408     HmdfsHasCache ioctlData;
409     ioctlData.offset = static_cast<int64_t>(offset);
410     ioctlData.readSize = static_cast<int64_t>(bufferSize);
411     int32_t ioResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData); // 0在 -1不在
412 
413     ioctlData.offset = static_cast<int64_t>(cachePosition_);
414     ioctlData.readSize = static_cast<int64_t>(PER_CACHE_SIZE);
415     int32_t ioCacheResult = ioctl(fd_, HMDFS_IOC_HAS_CACHE, &ioctlData);
416     // ioctl has cache
417     if (ioResult == 0 && ioCacheResult == 0) {
418         return true;
419     } else {
420         MEDIA_LOG_I("ioctl has no cache with errno " PUBLIC_LOG_D32, errno);
421     }
422     return false;
423 }
424 
Stop()425 Status FileFdSourcePlugin::Stop()
426 {
427     MEDIA_LOG_I("Stop enter.");
428     isInterrupted_ = true;
429     MEDIA_LOG_I("Stop isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
430     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
431     downloadTask_->StopAsync();
432     return Status::OK;
433 }
434 
Reset()435 Status FileFdSourcePlugin::Reset()
436 {
437     MEDIA_LOG_I("Reset enter.");
438     isInterrupted_ = true;
439     MEDIA_LOG_I("Reset isInterrupted_ " PUBLIC_LOG_D32, isInterrupted_.load());
440     FALSE_RETURN_V(downloadTask_ != nullptr, Status::OK);
441     downloadTask_->StopAsync();
442     return Status::OK;
443 }
444 
PauseDownloadTask(bool isAsync)445 void FileFdSourcePlugin::PauseDownloadTask(bool isAsync)
446 {
447     FALSE_RETURN(downloadTask_ != nullptr);
448     if (isAsync) {
449         downloadTask_->PauseAsync();
450     } else {
451         downloadTask_->Pause();
452     }
453 }
454 
HandleBuffering()455 bool FileFdSourcePlugin::HandleBuffering()
456 {
457     MEDIA_LOG_I("HandleBuffering in.");
458     int32_t sleepTime = 0;
459     // return error again 1 time 1s, avoid ffmpeg error
460     while (sleepTime < ONE_SECONDS && !isInterrupted_ && isReadBlocking_) {
461         NotifyBufferingPercent();
462         if (!isBuffering_) {
463             break;
464         }
465         MEDIA_LOG_I("isBuffering.");
466         usleep(TEN_MILLISECOUNDS);
467         sleepTime += TEN_MILLISECOUNDS;
468     }
469     MEDIA_LOG_I("HandleBuffering out.");
470     return isBuffering_;
471 }
472 
HandleReadResult(size_t bufferSize,int size)473 void FileFdSourcePlugin::HandleReadResult(size_t bufferSize, int size)
474 {
475     MEDIA_LOG_I("HandleReadResult size " PUBLIC_LOG_D32 ", fd " PUBLIC_LOG_D32 ", cachePosition_" PUBLIC_LOG_U64
476         ", position_ " PUBLIC_LOG_U64 ", bufferSize " PUBLIC_LOG_ZU ", size_ " PUBLIC_LOG_U64 ", offset_ "
477         PUBLIC_LOG_D64, size, fd_, cachePosition_.load(), position_.load(), bufferSize, size_, offset_);
478     if (size < 0) {
479         // errno EIO  5
480         MEDIA_LOG_E("read fail, errno " PUBLIC_LOG_D32, errno);
481 
482         // read fail with errno, retry 3 * 10ms
483         retryTimes_++;
484         if (retryTimes_ >= RETRY_TIMES) {
485             NotifyReadFail();
486             SetInterruptState(true);
487         }
488         usleep(TEN_MILLISECOUNDS);
489     } else {
490         cachePosition_ = 0;
491         PauseDownloadTask(false);
492     }
493 }
494 
NotifyBufferingStart()495 void FileFdSourcePlugin::NotifyBufferingStart()
496 {
497     MEDIA_LOG_I("NotifyBufferingStart, ringBuffer.size() " PUBLIC_LOG_ZU
498         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
499     isBuffering_ = true;
500     if (callback_ != nullptr && !isInterrupted_) {
501         MEDIA_LOG_I("Read OnEvent BUFFERING_START.");
502         callback_->OnEvent({PluginEventType::BUFFERING_START, {BufferingInfoType::BUFFERING_START}, "start"});
503     } else {
504         MEDIA_LOG_E("BUFFERING_START callback_ is nullptr or isInterrupted_ is true");
505     }
506 }
507 
NotifyBufferingPercent()508 void FileFdSourcePlugin::NotifyBufferingPercent()
509 {
510     if (waterLineAbove_ != 0) {
511         int64_t bp = static_cast<float>(ringBuffer_->GetSize()) / waterLineAbove_ * PERCENT_100;
512         bp = bp > PERCENT_100 ? PERCENT_100 : bp;
513         if (isBuffering_ && callback_ != nullptr && !isInterrupted_) {
514             MEDIA_LOG_I("NotifyBufferingPercent, ringBuffer.size() " PUBLIC_LOG_ZU ", waterLineAbove_ " PUBLIC_LOG_U64
515                 ", PERCENT " PUBLIC_LOG_D32, ringBuffer_->GetSize(), waterLineAbove_, static_cast<int32_t>(bp));
516             callback_->OnEvent({PluginEventType::EVENT_BUFFER_PROGRESS,
517                 {BufferingInfoType::BUFFERING_PERCENT}, std::to_string(bp)});
518         } else {
519             MEDIA_LOG_E("EVENT_BUFFER_PROGRESS callback_ is nullptr or isInterrupted_ \
520                 is true or isBuffering_ is false");
521         }
522     }
523 }
524 
NotifyBufferingEnd()525 void FileFdSourcePlugin::NotifyBufferingEnd()
526 {
527     NotifyBufferingPercent();
528     MEDIA_LOG_I("NotifyBufferingEnd, ringBuffer.size() " PUBLIC_LOG_ZU
529         ", waterLineAbove_ " PUBLIC_LOG_U64, ringBuffer_->GetSize(), waterLineAbove_);
530     isBuffering_ = false;
531     lastReadTime_ = 0;
532     if (callback_ != nullptr && !isInterrupted_) {
533         MEDIA_LOG_I("NotifyBufferingEnd success .");
534         callback_->OnEvent({PluginEventType::BUFFERING_END, {BufferingInfoType::BUFFERING_END}, "end"});
535     } else {
536         MEDIA_LOG_E("BUFFERING_END callback_ is nullptr or isInterrupted_ is true");
537     }
538 }
539 
NotifyReadFail()540 void FileFdSourcePlugin::NotifyReadFail()
541 {
542     MEDIA_LOG_I("NotifyReadFail in.");
543     if (callback_ != nullptr && !isInterrupted_) {
544         MEDIA_LOG_I("Read OnEvent read fail");
545         callback_->OnEvent({PluginEventType::CLIENT_ERROR, {NetworkClientErrorCode::ERROR_TIME_OUT}, "read"});
546     } else {
547         MEDIA_LOG_E("CLIENT_ERROR callback_ is nullptr or isInterrupted_ is true");
548     }
549 }
550 
SetDemuxerState(int32_t streamId)551 void FileFdSourcePlugin::SetDemuxerState(int32_t streamId)
552 {
553     MEDIA_LOG_I("SetDemuxerState");
554     isReadFrame_ = true;
555 }
556 
SetCurrentBitRate(int32_t bitRate,int32_t streamID)557 Status FileFdSourcePlugin::SetCurrentBitRate(int32_t bitRate, int32_t streamID)
558 {
559     currentBitRate_ = bitRate / TO_BYTE; // 8b
560     MEDIA_LOG_I("currentBitRate: " PUBLIC_LOG_D32, currentBitRate_);
561     // default cache 0.3s
562     waterLineAbove_ = CACHE_LEVEL_1 * currentBitRate_;
563     return Status::OK;
564 }
565 
SetBundleName(const std::string & bundleName)566 void FileFdSourcePlugin::SetBundleName(const std::string& bundleName)
567 {
568     MEDIA_LOG_I("SetBundleName bundleName: " PUBLIC_LOG_S, bundleName.c_str());
569 }
570 
SetReadBlockingFlag(bool isAllowed)571 Status FileFdSourcePlugin::SetReadBlockingFlag(bool isAllowed)
572 {
573     MEDIA_LOG_I("SetReadBlockingFlag entered, IsReadBlockingAllowed %{public}d", isAllowed);
574     if (ringBuffer_) {
575         ringBuffer_->SetReadBlocking(isAllowed);
576     }
577     isReadBlocking_ = isAllowed;
578     return Status::OK;
579 }
580 
SetInterruptState(bool isInterruptNeeded)581 void FileFdSourcePlugin::SetInterruptState(bool isInterruptNeeded)
582 {
583     MEDIA_LOG_I("SetInterruptState isInterrupted_" PUBLIC_LOG_D32, isInterruptNeeded);
584     isInterrupted_ = isInterruptNeeded;
585     if (ringBuffer_ != nullptr) {
586         if (isInterrupted_) {
587             ringBuffer_->SetActive(false);
588         } else {
589             ringBuffer_->SetActive(true);
590         }
591     }
592 
593     if (isInterrupted_ && isCloudFile_) {
594         if (downloadTask_ != nullptr) {
595             downloadTask_->StopAsync();
596         }
597         int ret = ioctl(fd_, HMDFS_IOC_CANCEL_READ);
598         MEDIA_LOG_I("ioctl break read, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
599     }
600 }
601 
GetSize(uint64_t & size)602 Status FileFdSourcePlugin::GetSize(uint64_t& size)
603 {
604     size = size_;
605     return Status::OK;
606 }
607 
GetSeekable()608 Seekable FileFdSourcePlugin::GetSeekable()
609 {
610     MEDIA_LOG_D("GetSeekable in");
611     return seekable_;
612 }
613 
CheckFileType()614 void FileFdSourcePlugin::CheckFileType()
615 {
616     int loc; // 1本地,2云端
617     int ioResult = ioctl(fd_, HMDFS_IOC_GET_LOCATION, &loc);
618     MEDIA_LOG_I("SetSource ioctl loc, ret " PUBLIC_LOG_D32 ", loc " PUBLIC_LOG_D32 ", errno"
619         PUBLIC_LOG_D32, ioResult, loc, errno);
620 
621     if (!isEnableFdCache_) {
622         isCloudFile_ = false;
623         return;
624     }
625 
626     if (ioResult == 0) {
627         if (loc == IOCTL_CLOUD) {
628             isCloudFile_ = true;
629             MEDIA_LOG_I("ioctl file is cloud");
630             int ret = ioctl(fd_, HMDFS_IOC_RESTORE_READ);
631             MEDIA_LOG_I("ioctl restore fd, fd %{public}d, ret %{public}d, errno %{public}d", fd_, ret, errno);
632             return;
633         } else {
634             isCloudFile_ = false;
635             MEDIA_LOG_I("ioctl file is local");
636         }
637     } else {
638         isCloudFile_ = false;
639         MEDIA_LOG_I("ioctl get file type");
640     }
641 }
642 
GetBufferPtr(std::shared_ptr<Buffer> & buffer,size_t expectedLen)643 std::shared_ptr<Memory> FileFdSourcePlugin::GetBufferPtr(std::shared_ptr<Buffer>& buffer, size_t expectedLen)
644 {
645     if (!buffer) {
646         buffer = std::make_shared<Buffer>();
647     }
648     std::shared_ptr<Memory> bufData;
649     if (buffer->IsEmpty()) {
650         bufData = buffer->AllocMemory(nullptr, expectedLen);
651     } else {
652         bufData = buffer->GetMemory();
653     }
654     return bufData;
655 }
656 
GetLastSize(uint64_t position)657 int64_t FileFdSourcePlugin::GetLastSize(uint64_t position)
658 {
659     int64_t ret = static_cast<int64_t>(size_) + offset_ - static_cast<int64_t>(position);
660     if (ret < 0) {
661         MEDIA_LOG_E("GetLastSize error, fd_ " PUBLIC_LOG_D32 ", offset_ " PUBLIC_LOG_D64 ", size_ "
662             PUBLIC_LOG_U64 ", position " PUBLIC_LOG_U64, fd_, offset_, size_, position);
663     }
664     return ret;
665 }
666 
GetCurrentSpeed(int64_t curTime)667 void FileFdSourcePlugin::GetCurrentSpeed(int64_t curTime)
668 {
669     if ((curTime - lastCheckTime_) > RECORD_TIME_INTERVAL) {
670         MEDIA_LOG_I("CacheDataLoop curTime_: " PUBLIC_LOG_U64 " lastCheckTime_: "
671         PUBLIC_LOG_U64 " downloadSize_: " PUBLIC_LOG_U64, curTime, lastCheckTime_, downloadSize_);
672         float duration = static_cast<double>(curTime - lastCheckTime_) / MILLISECOUND_TO_SECOND;
673         avgDownloadSpeed_ = downloadSize_ / duration; //b/s
674         MEDIA_LOG_I("downloadDuration: " PUBLIC_LOG_F "avgDownloadSpeed_: " PUBLIC_LOG_F,
675             duration, avgDownloadSpeed_);
676         downloadSize_ = 0;
677         lastCheckTime_ = curTime;
678         FALSE_RETURN(currentBitRate_ > 0);
679         UpdateWaterLineAbove();
680     }
681 }
682 
UpdateWaterLineAbove()683 void FileFdSourcePlugin::UpdateWaterLineAbove()
684 {
685     FALSE_RETURN_MSG(currentBitRate_ > 0, "currentBitRate_ <= 0");
686     float cacheTime = GetCacheTime(avgDownloadSpeed_ / currentBitRate_);
687     MEDIA_LOG_I("cacheTime: " PUBLIC_LOG_F "avgDownloadSpeed_: " PUBLIC_LOG_F
688         "currentBitRate: " PUBLIC_LOG_D32, cacheTime, avgDownloadSpeed_, currentBitRate_);
689     waterLineAbove_ = cacheTime * currentBitRate_ > GetLastSize(cachePosition_.load()) ?
690         GetLastSize(cachePosition_.load()) : cacheTime * currentBitRate_;
691     MEDIA_LOG_I("waterLineAbove_: " PUBLIC_LOG_U64, waterLineAbove_);
692 }
693 
GetCacheTime(float num)694 float FileFdSourcePlugin::GetCacheTime(float num)
695 {
696     MEDIA_LOG_I("GetCacheTime with num: " PUBLIC_LOG_F, num);
697     if (num < 0) {
698         return CACHE_LEVEL_1;
699     }
700     if (num > 0 && num < 0.5) { // (0, 0.5)
701         return CACHE_TIME_DEFAULT;
702     } else if (num >= 0.5 && num < 1) { //[0.5, 1)
703         return CACHE_TIME_DEFAULT;
704     } else if (num >= 1) { //[1, 2)
705         return CACHE_LEVEL_1;
706     }
707     return CACHE_TIME_DEFAULT;
708 }
709 
DeleteCacheBuffer(char * buffer,size_t bufferSize)710 void FileFdSourcePlugin::DeleteCacheBuffer(char* buffer, size_t bufferSize)
711 {
712     if (buffer != nullptr && bufferSize > 0) {
713         delete[] buffer;
714     }
715 }
716 
CheckReadTime()717 void FileFdSourcePlugin::CheckReadTime()
718 {
719     if (IsValidTime(curReadTime_, lastReadTime_)) {
720         NotifyBufferingStart();
721         lastReadTime_ = 0;
722     } else {
723         if (lastReadTime_ == 0) {
724             lastReadTime_ = curReadTime_;
725         }
726     }
727 }
728 
IsValidTime(int64_t curTime,int64_t lastTime)729 bool FileFdSourcePlugin::IsValidTime(int64_t curTime, int64_t lastTime)
730 {
731     return lastReadTime_ != 0 && curReadTime_ - lastReadTime_ < SEEK_TIME_UPPER &&
732         curReadTime_ - lastReadTime_ > SEEK_TIME_LOWER;
733 }
734 
SetEnableOnlineFdCache(bool isEnableFdCache)735 void FileFdSourcePlugin::SetEnableOnlineFdCache(bool isEnableFdCache)
736 {
737     isEnableFdCache_ = isEnableFdCache;
738 }
739 } // namespace FileFdSource
740 } // namespace Plugin
741 } // namespace Media
742 } // namespace OHOS