1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "ValueMetricProducer.h"
21
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "../guardrail/StatsdStats.h"
26 #include "../stats_log_util.h"
27 #include "metrics/parsing_utils/metrics_manager_util.h"
28
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::shared_ptr;
39 using std::unordered_map;
40
41 namespace android {
42 namespace os {
43 namespace statsd {
44
45 // for StatsLogReport
46 const int FIELD_ID_ID = 1;
47 const int FIELD_ID_VALUE_METRICS = 7;
48 const int FIELD_ID_TIME_BASE = 9;
49 const int FIELD_ID_BUCKET_SIZE = 10;
50 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
51 const int FIELD_ID_IS_ACTIVE = 14;
52 // for ValueMetricDataWrapper
53 const int FIELD_ID_DATA = 1;
54 const int FIELD_ID_SKIPPED = 2;
55 // for SkippedBuckets
56 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
57 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
58 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
59 // for DumpEvent Proto
60 const int FIELD_ID_BUCKET_DROP_REASON = 1;
61 const int FIELD_ID_DROP_TIME = 2;
62 // for ValueMetricData
63 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
64 const int FIELD_ID_BUCKET_INFO = 3;
65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
66 const int FIELD_ID_SLICE_BY_STATE = 6;
67 // for ValueBucketInfo
68 const int FIELD_ID_VALUE_INDEX = 1;
69 const int FIELD_ID_VALUE_LONG = 2;
70 const int FIELD_ID_VALUE_DOUBLE = 3;
71 const int FIELD_ID_VALUES = 9;
72 const int FIELD_ID_BUCKET_NUM = 4;
73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
75 const int FIELD_ID_CONDITION_TRUE_NS = 10;
76
77 const Value ZERO_LONG((int64_t)0);
78 const Value ZERO_DOUBLE((int64_t)0);
79
80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const int conditionIndex,const vector<ConditionState> & initialConditionCache,const sp<ConditionWizard> & conditionWizard,const uint64_t protoHash,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager,const unordered_map<int,shared_ptr<Activation>> & eventActivationMap,const unordered_map<int,vector<shared_ptr<Activation>>> & eventDeactivationMap,const vector<int> & slicedStateAtoms,const unordered_map<int,unordered_map<int,int64_t>> & stateGroupMap)81 ValueMetricProducer::ValueMetricProducer(
82 const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
83 const vector<ConditionState>& initialConditionCache,
84 const sp<ConditionWizard>& conditionWizard, const uint64_t protoHash,
85 const int whatMatcherIndex, const sp<EventMatcherWizard>& matcherWizard,
86 const int pullTagId, const int64_t timeBaseNs, const int64_t startTimeNs,
87 const sp<StatsPullerManager>& pullerManager,
88 const unordered_map<int, shared_ptr<Activation>>& eventActivationMap,
89 const unordered_map<int, vector<shared_ptr<Activation>>>& eventDeactivationMap,
90 const vector<int>& slicedStateAtoms,
91 const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap)
92 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, initialConditionCache,
93 conditionWizard, protoHash, eventActivationMap, eventDeactivationMap,
94 slicedStateAtoms, stateGroupMap),
95 mWhatMatcherIndex(whatMatcherIndex),
96 mEventMatcherWizard(matcherWizard),
97 mPullerManager(pullerManager),
98 mPullTagId(pullTagId),
99 mIsPulled(pullTagId != -1),
100 mMinBucketSizeNs(metric.min_bucket_size_nanos()),
101 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
102 StatsdStats::kAtomDimensionKeySizeLimitMap.end()
103 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
104 : StatsdStats::kDimensionKeySizeSoftLimit),
105 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
106 StatsdStats::kAtomDimensionKeySizeLimitMap.end()
107 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
108 : StatsdStats::kDimensionKeySizeHardLimit),
109 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
110 mAggregationType(metric.aggregation_type()),
111 mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
112 mValueDirection(metric.value_direction()),
113 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
114 mUseZeroDefaultBase(metric.use_zero_default_base()),
115 mHasGlobalBase(false),
116 mCurrentBucketIsSkipped(false),
117 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
118 : StatsdStats::kPullMaxDelayNs),
119 mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
120 // Condition timer will be set later within the constructor after pulling events
121 mConditionTimer(false, timeBaseNs) {
122 int64_t bucketSizeMills = 0;
123 if (metric.has_bucket()) {
124 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
125 } else {
126 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
127 }
128
129 mBucketSizeNs = bucketSizeMills * 1000000;
130
131 translateFieldMatcher(metric.value_field(), &mFieldMatchers);
132
133 if (metric.has_dimensions_in_what()) {
134 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
135 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
136 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what());
137 }
138
139 if (metric.links().size() > 0) {
140 for (const auto& link : metric.links()) {
141 Metric2Condition mc;
142 mc.conditionId = link.condition();
143 translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
144 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
145 mMetric2ConditionLinks.push_back(mc);
146 }
147 mConditionSliced = true;
148 }
149
150 for (const auto& stateLink : metric.state_link()) {
151 Metric2State ms;
152 ms.stateAtomId = stateLink.state_atom_id();
153 translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
154 translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
155 mMetric2StateLinks.push_back(ms);
156 }
157
158 if (metric.has_threshold()) {
159 mUploadThreshold = metric.threshold();
160 }
161
162 int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
163 mCurrentBucketNum += numBucketsForward;
164
165 flushIfNeededLocked(startTimeNs);
166
167 if (mIsPulled) {
168 mPullerManager->RegisterReceiver(mPullTagId, mConfigKey, this, getCurrentBucketEndTimeNs(),
169 mBucketSizeNs);
170 }
171
172 // Only do this for partial buckets like first bucket. All other buckets should use
173 // flushIfNeeded to adjust start and end to bucket boundaries.
174 // Adjust start for partial bucket
175 mCurrentBucketStartTimeNs = startTimeNs;
176 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
177
178 // Now that activations are processed, start the condition timer if needed.
179 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
180 mCurrentBucketStartTimeNs);
181
182 VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
183 (long long)mBucketSizeNs, (long long)mTimeBaseNs);
184 }
185
~ValueMetricProducer()186 ValueMetricProducer::~ValueMetricProducer() {
187 VLOG("~ValueMetricProducer() called");
188 if (mIsPulled) {
189 mPullerManager->UnRegisterReceiver(mPullTagId, mConfigKey, this);
190 }
191 }
192
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)193 bool ValueMetricProducer::onConfigUpdatedLocked(
194 const StatsdConfig& config, const int configIndex, const int metricIndex,
195 const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
196 const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
197 const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
198 const sp<EventMatcherWizard>& matcherWizard,
199 const vector<sp<ConditionTracker>>& allConditionTrackers,
200 const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
201 const unordered_map<int64_t, int>& metricToActivationMap,
202 unordered_map<int, vector<int>>& trackerToMetricMap,
203 unordered_map<int, vector<int>>& conditionToMetricMap,
204 unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
205 unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
206 vector<int>& metricsWithActivation) {
207 if (!MetricProducer::onConfigUpdatedLocked(
208 config, configIndex, metricIndex, allAtomMatchingTrackers,
209 oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard,
210 allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap,
211 trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap,
212 deactivationAtomTrackerToMetricMap, metricsWithActivation)) {
213 return false;
214 }
215
216 const ValueMetric& metric = config.value_metric(configIndex);
217 // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
218 if (!handleMetricWithAtomMatchingTrackers(metric.what(), metricIndex, /*enforceOneAtom=*/false,
219 allAtomMatchingTrackers, newAtomMatchingTrackerMap,
220 trackerToMetricMap, mWhatMatcherIndex)) {
221 return false;
222 }
223
224 if (metric.has_condition() &&
225 !handleMetricWithConditions(metric.condition(), metricIndex, conditionTrackerMap,
226 metric.links(), allConditionTrackers, mConditionTrackerIndex,
227 conditionToMetricMap)) {
228 return false;
229 }
230 sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
231 mEventMatcherWizard = matcherWizard;
232 return true;
233 }
234
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)235 void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
236 const HashableDimensionKey& primaryKey,
237 const FieldValue& oldState, const FieldValue& newState) {
238 VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
239 (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
240 oldState.mValue.int_value, newState.mValue.int_value);
241
242 // If old and new states are in the same StateGroup, then we do not need to
243 // pull for this state change.
244 FieldValue oldStateCopy = oldState;
245 FieldValue newStateCopy = newState;
246 mapStateValue(atomId, &oldStateCopy);
247 mapStateValue(atomId, &newStateCopy);
248 if (oldStateCopy == newStateCopy) {
249 return;
250 }
251
252 // If condition is not true or metric is not active, we do not need to pull
253 // for this state change.
254 if (mCondition != ConditionState::kTrue || !mIsActive) {
255 return;
256 }
257
258 bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
259 if (isEventLate) {
260 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
261 (long long)mCurrentBucketStartTimeNs);
262 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
263 return;
264 }
265 mStateChangePrimaryKey.first = atomId;
266 mStateChangePrimaryKey.second = primaryKey;
267 if (mIsPulled) {
268 pullAndMatchEventsLocked(eventTimeNs);
269 }
270 mStateChangePrimaryKey.first = 0;
271 mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
272 flushIfNeededLocked(eventTimeNs);
273 }
274
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)275 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
276 const int64_t eventTime) {
277 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
278 }
279
dropDataLocked(const int64_t dropTimeNs)280 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
281 StatsdStats::getInstance().noteBucketDropped(mMetricId);
282
283 // The current partial bucket is not flushed and does not require a pull,
284 // so the data is still valid.
285 flushIfNeededLocked(dropTimeNs);
286 clearPastBucketsLocked(dropTimeNs);
287 }
288
clearPastBucketsLocked(const int64_t dumpTimeNs)289 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
290 mPastBuckets.clear();
291 mSkippedBuckets.clear();
292 }
293
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,ProtoOutputStream * protoOutput)294 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
295 const bool include_current_partial_bucket,
296 const bool erase_data,
297 const DumpLatency dumpLatency,
298 std::set<string> *str_set,
299 ProtoOutputStream* protoOutput) {
300 VLOG("metric %lld dump report now...", (long long)mMetricId);
301 if (include_current_partial_bucket) {
302 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
303 // current bucket will have incomplete data and the next will have the wrong snapshot to do
304 // a diff against. If the condition is false, we are fine since the base data is reset and
305 // we are not tracking anything.
306 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
307 if (pullNeeded) {
308 switch (dumpLatency) {
309 case FAST:
310 invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
311 break;
312 case NO_TIME_CONSTRAINTS:
313 pullAndMatchEventsLocked(dumpTimeNs);
314 break;
315 }
316 }
317 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
318 }
319 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
320 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
321
322 if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
323 return;
324 }
325 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
326 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
327 // Fills the dimension path if not slicing by ALL.
328 if (!mSliceByPositionALL) {
329 if (!mDimensionsInWhat.empty()) {
330 uint64_t dimenPathToken =
331 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
332 writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
333 protoOutput->end(dimenPathToken);
334 }
335 }
336
337 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
338
339 for (const auto& skippedBucket : mSkippedBuckets) {
340 uint64_t wrapperToken =
341 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
342 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
343 (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
344 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
345 (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
346 for (const auto& dropEvent : skippedBucket.dropEvents) {
347 uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
348 FIELD_ID_SKIPPED_DROP_EVENT);
349 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
350 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
351 (long long)(NanoToMillis(dropEvent.dropTimeNs)));
352 protoOutput->end(dropEventToken);
353 }
354 protoOutput->end(wrapperToken);
355 }
356
357 for (const auto& pair : mPastBuckets) {
358 const MetricDimensionKey& dimensionKey = pair.first;
359 VLOG(" dimension key %s", dimensionKey.toString().c_str());
360 uint64_t wrapperToken =
361 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
362
363 // First fill dimension.
364 if (mSliceByPositionALL) {
365 uint64_t dimensionToken =
366 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
367 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
368 protoOutput->end(dimensionToken);
369 } else {
370 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
371 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
372 }
373
374 // Then fill slice_by_state.
375 for (auto state : dimensionKey.getStateValuesKey().getValues()) {
376 uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
377 FIELD_ID_SLICE_BY_STATE);
378 writeStateToProto(state, protoOutput);
379 protoOutput->end(stateToken);
380 }
381
382 // Then fill bucket_info (ValueBucketInfo).
383 for (const auto& bucket : pair.second) {
384 uint64_t bucketInfoToken = protoOutput->start(
385 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
386
387 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
388 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
389 (long long)NanoToMillis(bucket.mBucketStartNs));
390 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
391 (long long)NanoToMillis(bucket.mBucketEndNs));
392 } else {
393 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
394 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
395 }
396 // We only write the condition timer value if the metric has a
397 // condition and/or is sliced by state.
398 // If the metric is sliced by state, the condition timer value is
399 // also sliced by state to reflect time spent in that state.
400 if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
401 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
402 (long long)bucket.mConditionTrueNs);
403 }
404 for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
405 int index = bucket.valueIndex[i];
406 const Value& value = bucket.values[i];
407 uint64_t valueToken = protoOutput->start(
408 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
409 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
410 index);
411 if (value.getType() == LONG) {
412 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
413 (long long)value.long_value);
414 VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
415 (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
416 } else if (value.getType() == DOUBLE) {
417 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
418 value.double_value);
419 VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
420 (long long)bucket.mBucketEndNs, index, value.double_value);
421 } else {
422 VLOG("Wrong value type for ValueMetric output: %d", value.getType());
423 }
424 protoOutput->end(valueToken);
425 }
426 protoOutput->end(bucketInfoToken);
427 }
428 protoOutput->end(wrapperToken);
429 }
430 protoOutput->end(protoToken);
431
432 VLOG("metric %lld done with dump report...", (long long)mMetricId);
433 if (erase_data) {
434 mPastBuckets.clear();
435 mSkippedBuckets.clear();
436 }
437 }
438
invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,const BucketDropReason reason)439 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
440 const BucketDropReason reason) {
441 if (!mCurrentBucketIsSkipped) {
442 // Only report to StatsdStats once per invalid bucket.
443 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
444 }
445
446 skipCurrentBucket(dropTimeNs, reason);
447 }
448
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)449 void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
450 const BucketDropReason reason) {
451 invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason);
452 resetBase();
453 }
454
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)455 void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs,
456 const BucketDropReason reason) {
457 if (!maxDropEventsReached()) {
458 mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason));
459 }
460 mCurrentBucketIsSkipped = true;
461 }
462
resetBase()463 void ValueMetricProducer::resetBase() {
464 for (auto& slice : mCurrentBaseInfo) {
465 for (auto& baseInfo : slice.second.baseInfos) {
466 baseInfo.hasBase = false;
467 }
468 }
469 mHasGlobalBase = false;
470 }
471
472 // Handle active state change. Active state change is treated like a condition change:
473 // - drop bucket if active state change event arrives too late
474 // - if condition is true, pull data on active state changes
475 // - ConditionTimer tracks changes based on AND of condition and active state.
onActiveStateChangedLocked(const int64_t & eventTimeNs)476 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
477 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
478 if (isEventTooLate) {
479 // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
480 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
481 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
482 }
483
484 // Call parent method once we've verified the validity of current bucket.
485 MetricProducer::onActiveStateChangedLocked(eventTimeNs);
486
487 if (ConditionState::kTrue != mCondition) {
488 return;
489 }
490
491 // Pull on active state changes.
492 if (!isEventTooLate) {
493 if (mIsPulled) {
494 pullAndMatchEventsLocked(eventTimeNs);
495 }
496 // When active state changes from true to false, clear diff base but don't
497 // reset other counters as we may accumulate more value in the bucket.
498 if (mUseDiff && !mIsActive) {
499 resetBase();
500 }
501 }
502
503 flushIfNeededLocked(eventTimeNs);
504
505 // Let condition timer know of new active state.
506 mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
507
508 updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs);
509 }
510
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)511 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
512 const int64_t eventTimeNs) {
513 ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
514 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
515
516 // If the config is not active, skip the event.
517 if (!mIsActive) {
518 mCondition = isEventTooLate ? ConditionState::kUnknown : newCondition;
519 return;
520 }
521
522 // If the event arrived late, mark the bucket as invalid and skip the event.
523 if (isEventTooLate) {
524 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
525 (long long)mCurrentBucketStartTimeNs);
526 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
527 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
528 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
529 mCondition = ConditionState::kUnknown;
530 mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
531
532 updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs);
533 return;
534 }
535
536 // If the previous condition was unknown, mark the bucket as invalid
537 // because the bucket will contain partial data. For example, the condition
538 // change might happen close to the end of the bucket and we might miss a
539 // lot of data.
540 //
541 // We still want to pull to set the base.
542 if (mCondition == ConditionState::kUnknown) {
543 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
544 }
545
546 // Pull and match for the following condition change cases:
547 // unknown/false -> true - condition changed
548 // true -> false - condition changed
549 // true -> true - old condition was true so we can flush the bucket at the
550 // end if needed.
551 //
552 // We don’t need to pull for unknown -> false or false -> false.
553 //
554 // onConditionChangedLocked might happen on bucket boundaries if this is
555 // called before #onDataPulled.
556 if (mIsPulled &&
557 (newCondition == ConditionState::kTrue || mCondition == ConditionState::kTrue)) {
558 pullAndMatchEventsLocked(eventTimeNs);
559 }
560
561 // For metrics that use diff, when condition changes from true to false,
562 // clear diff base but don't reset other counts because we may accumulate
563 // more value in the bucket.
564 if (mUseDiff &&
565 (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
566 resetBase();
567 }
568
569 // Update condition state after pulling.
570 mCondition = newCondition;
571
572 flushIfNeededLocked(eventTimeNs);
573 mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
574
575 updateCurrentSlicedBucketConditionTimers(mCondition, eventTimeNs);
576 }
577
updateCurrentSlicedBucketConditionTimers(bool newCondition,int64_t eventTimeNs)578 void ValueMetricProducer::updateCurrentSlicedBucketConditionTimers(bool newCondition,
579 int64_t eventTimeNs) {
580 if (mSlicedStateAtoms.empty()) {
581 return;
582 }
583
584 // Utilize the current state key of each DimensionsInWhat key to determine
585 // which condition timers to update.
586 //
587 // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
588 bool inPulledData;
589 for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mCurrentBaseInfo) {
590 // If the new condition is true, turn ON the condition timer only if
591 // the DimensionInWhat key was present in the pulled data.
592 inPulledData = dimensionInWhatInfo.hasCurrentState;
593 mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
594 dimensionInWhatInfo.currentState)]
595 .conditionTimer.onConditionChanged(newCondition && inPulledData, eventTimeNs);
596 }
597 }
598
prepareFirstBucketLocked()599 void ValueMetricProducer::prepareFirstBucketLocked() {
600 // Kicks off the puller immediately if condition is true and diff based.
601 if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
602 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
603 }
604 }
605
pullAndMatchEventsLocked(const int64_t timestampNs)606 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
607 vector<std::shared_ptr<LogEvent>> allData;
608 if (!mPullerManager->Pull(mPullTagId, mConfigKey, timestampNs, &allData)) {
609 ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
610 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
611 return;
612 }
613
614 accumulateEvents(allData, timestampNs, timestampNs);
615 }
616
calcPreviousBucketEndTime(const int64_t currentTimeNs)617 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
618 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
619 }
620
621 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
622 // to be delayed. Other events like condition changes or app upgrade which are not based on
623 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)624 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
625 bool pullSuccess, int64_t originalPullTimeNs) {
626 std::lock_guard<std::mutex> lock(mMutex);
627 if (mCondition == ConditionState::kTrue) {
628 // If the pull failed, we won't be able to compute a diff.
629 if (!pullSuccess) {
630 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
631 } else {
632 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
633 if (isEventLate) {
634 // If the event is late, we are in the middle of a bucket. Just
635 // process the data without trying to snap the data to the nearest bucket.
636 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
637 } else {
638 // For scheduled pulled data, the effective event time is snap to the nearest
639 // bucket end. In the case of waking up from a deep sleep state, we will
640 // attribute to the previous bucket end. If the sleep was long but not very
641 // long, we will be in the immediate next bucket. Previous bucket may get a
642 // larger number as we pull at a later time than real bucket end.
643 //
644 // If the sleep was very long, we skip more than one bucket before sleep. In
645 // this case, if the diff base will be cleared and this new data will serve as
646 // new diff base.
647 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
648 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
649 mMetricId, originalPullTimeNs - bucketEndTime);
650 accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
651 }
652 }
653 }
654
655 // We can probably flush the bucket. Since we used bucketEndTime when calling
656 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
657 flushIfNeededLocked(originalPullTimeNs);
658 }
659
accumulateEvents(const std::vector<std::shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)660 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
661 int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
662 bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
663 if (isEventLate) {
664 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
665 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
666 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
667 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
668 return;
669 }
670
671 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
672 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
673 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
674 if (pullDelayNs > mMaxPullDelayNs) {
675 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
676 (long long)mMaxPullDelayNs);
677 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
678 // We are missing one pull from the bucket which means we will not have a complete view of
679 // what's going on.
680 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
681 return;
682 }
683
684 mMatchedMetricDimensionKeys.clear();
685 for (const auto& data : allData) {
686 LogEvent localCopy = data->makeCopy();
687 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
688 MatchingState::kMatched) {
689 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
690 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
691 }
692 }
693 // If a key that is:
694 // 1. Tracked in mCurrentSlicedBucket and
695 // 2. A superset of the current mStateChangePrimaryKey
696 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
697 // then we need to reset the base.
698 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
699 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
700 bool presentInPulledData =
701 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
702 if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
703 auto it = mCurrentBaseInfo.find(whatKey);
704 for (auto& baseInfo : it->second.baseInfos) {
705 baseInfo.hasBase = false;
706 }
707 // Set to false when DimensionInWhat key is not present in a pull.
708 // Used in onMatchedLogEventInternalLocked() to ensure the condition
709 // timer is turned on the next pull when data is present.
710 it->second.hasCurrentState = false;
711 // Turn OFF condition timer for keys not present in pulled data.
712 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
713 }
714 }
715 mMatchedMetricDimensionKeys.clear();
716 mHasGlobalBase = true;
717
718 // If we reach the guardrail, we might have dropped some data which means the bucket is
719 // incomplete.
720 //
721 // The base also needs to be reset. If we do not have the full data, we might
722 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
723 // might be missing from mCurrentSlicedBucket.
724 if (hasReachedGuardRailLimit()) {
725 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
726 mCurrentSlicedBucket.clear();
727 }
728 }
729
dumpStatesLocked(FILE * out,bool verbose) const730 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
731 if (mCurrentSlicedBucket.size() == 0) {
732 return;
733 }
734
735 fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
736 (unsigned long)mCurrentSlicedBucket.size());
737 if (verbose) {
738 for (const auto& it : mCurrentSlicedBucket) {
739 for (const auto& interval : it.second.intervals) {
740 fprintf(out, "\t(what)%s\t(states)%s (value)%s\n",
741 it.first.getDimensionKeyInWhat().toString().c_str(),
742 it.first.getStateValuesKey().toString().c_str(),
743 interval.value.toString().c_str());
744 }
745 }
746 }
747 }
748
hasReachedGuardRailLimit() const749 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
750 return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
751 }
752
hitGuardRailLocked(const MetricDimensionKey & newKey)753 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
754 // ===========GuardRail==============
755 // 1. Report the tuple count if the tuple count > soft limit
756 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
757 return false;
758 }
759 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
760 size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
761 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
762 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
763 if (hasReachedGuardRailLimit()) {
764 ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
765 newKey.toString().c_str());
766 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
767 return true;
768 }
769 }
770
771 return false;
772 }
773
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)774 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
775 // ===========GuardRail==============
776 // 1. Report the tuple count if the tuple count > soft limit
777 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
778 return false;
779 }
780 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
781 size_t newTupleCount = mCurrentFullBucket.size() + 1;
782 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
783 if (newTupleCount > mDimensionHardLimit) {
784 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
785 (long long)mMetricId,
786 newKey.toString().c_str());
787 return true;
788 }
789 }
790
791 return false;
792 }
793
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)794 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
795 for (const FieldValue& value : event.getValues()) {
796 if (value.mField.matches(matcher)) {
797 switch (value.mValue.type) {
798 case INT:
799 ret.setLong(value.mValue.int_value);
800 break;
801 case LONG:
802 ret.setLong(value.mValue.long_value);
803 break;
804 case FLOAT:
805 ret.setDouble(value.mValue.float_value);
806 break;
807 case DOUBLE:
808 ret.setDouble(value.mValue.double_value);
809 break;
810 default:
811 return false;
812 break;
813 }
814 return true;
815 }
816 }
817 return false;
818 }
819
multipleBucketsSkipped(const int64_t numBucketsForward)820 bool ValueMetricProducer::multipleBucketsSkipped(const int64_t numBucketsForward) {
821 // Skip buckets if this is a pulled metric or a pushed metric that is diffed.
822 return numBucketsForward > 1 && (mIsPulled || mUseDiff);
823 }
824
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)825 void ValueMetricProducer::onMatchedLogEventInternalLocked(
826 const size_t matcherIndex, const MetricDimensionKey& eventKey,
827 const ConditionKey& conditionKey, bool condition, const LogEvent& event,
828 const map<int, HashableDimensionKey>& statePrimaryKeys) {
829 auto whatKey = eventKey.getDimensionKeyInWhat();
830 auto stateKey = eventKey.getStateValuesKey();
831
832 // Skip this event if a state changed occurred for a different primary key.
833 auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
834 // Check that both the atom id and the primary key are equal.
835 if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
836 VLOG("ValueMetric skip event with primary key %s because state change primary key "
837 "is %s",
838 it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
839 return;
840 }
841
842 int64_t eventTimeNs = event.GetElapsedTimestampNs();
843 if (eventTimeNs < mCurrentBucketStartTimeNs) {
844 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
845 (long long)mCurrentBucketStartTimeNs);
846 return;
847 }
848 mMatchedMetricDimensionKeys.insert(whatKey);
849
850 if (!mIsPulled) {
851 // We cannot flush without doing a pull first.
852 flushIfNeededLocked(eventTimeNs);
853 }
854
855 // We should not accumulate the data for pushed metrics when the condition is false.
856 bool shouldSkipForPushMetric = !mIsPulled && !condition;
857 // For pulled metrics, there are two cases:
858 // - to compute diffs, we need to process all the state changes
859 // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
860 // state change from
861 // + True -> True: we should process the data, it might be a bucket boundary
862 // + True -> False: we als need to process the data.
863 bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
864 && mCondition != ConditionState::kTrue;
865 if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
866 VLOG("ValueMetric skip event because condition is false and we are not using diff (for "
867 "pulled metric)");
868 return;
869 }
870
871 if (hitGuardRailLocked(eventKey)) {
872 return;
873 }
874
875 const auto& returnVal =
876 mCurrentBaseInfo.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
877 DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
878 const HashableDimensionKey oldStateKey = dimensionsInWhatInfo.currentState;
879 vector<BaseInfo>& baseInfos = dimensionsInWhatInfo.baseInfos;
880 if (baseInfos.size() < mFieldMatchers.size()) {
881 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
882 baseInfos.resize(mFieldMatchers.size());
883 }
884
885 // Ensure we turn on the condition timer in the case where dimensions
886 // were missing on a previous pull due to a state change.
887 bool stateChange = oldStateKey != stateKey;
888 if (!dimensionsInWhatInfo.hasCurrentState) {
889 stateChange = true;
890 dimensionsInWhatInfo.hasCurrentState = true;
891 }
892
893 // We need to get the intervals stored with the previous state key so we can
894 // close these value intervals.
895 vector<Interval>& intervals =
896 mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)].intervals;
897 if (intervals.size() < mFieldMatchers.size()) {
898 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
899 intervals.resize(mFieldMatchers.size());
900 }
901
902 // We only use anomaly detection under certain cases.
903 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
904 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
905 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
906 // Whoever next works on it should look into the cases where it is triggered in this function.
907 // Discussion here: http://ag/6124370.
908 bool useAnomalyDetection = true;
909
910 dimensionsInWhatInfo.hasCurrentState = true;
911 dimensionsInWhatInfo.currentState = stateKey;
912 for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
913 const Matcher& matcher = mFieldMatchers[i];
914 BaseInfo& baseInfo = baseInfos[i];
915 Interval& interval = intervals[i];
916 interval.valueIndex = i;
917 Value value;
918 if (!getDoubleOrLong(event, matcher, value)) {
919 VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
920 StatsdStats::getInstance().noteBadValueType(mMetricId);
921 return;
922 }
923 interval.seenNewData = true;
924
925 if (mUseDiff) {
926 if (!baseInfo.hasBase) {
927 if (mHasGlobalBase && mUseZeroDefaultBase) {
928 // The bucket has global base. This key does not.
929 // Optionally use zero as base.
930 baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
931 baseInfo.hasBase = true;
932 } else {
933 // no base. just update base and return.
934 baseInfo.base = value;
935 baseInfo.hasBase = true;
936 // If we're missing a base, do not use anomaly detection on incomplete data
937 useAnomalyDetection = false;
938 // Continue (instead of return) here in order to set baseInfo.base and
939 // baseInfo.hasBase for other baseInfos
940 continue;
941 }
942 }
943
944 Value diff;
945 switch (mValueDirection) {
946 case ValueMetric::INCREASING:
947 if (value >= baseInfo.base) {
948 diff = value - baseInfo.base;
949 } else if (mUseAbsoluteValueOnReset) {
950 diff = value;
951 } else {
952 VLOG("Unexpected decreasing value");
953 StatsdStats::getInstance().notePullDataError(mPullTagId);
954 baseInfo.base = value;
955 // If we've got bad data, do not use anomaly detection
956 useAnomalyDetection = false;
957 continue;
958 }
959 break;
960 case ValueMetric::DECREASING:
961 if (baseInfo.base >= value) {
962 diff = baseInfo.base - value;
963 } else if (mUseAbsoluteValueOnReset) {
964 diff = value;
965 } else {
966 VLOG("Unexpected increasing value");
967 StatsdStats::getInstance().notePullDataError(mPullTagId);
968 baseInfo.base = value;
969 // If we've got bad data, do not use anomaly detection
970 useAnomalyDetection = false;
971 continue;
972 }
973 break;
974 case ValueMetric::ANY:
975 diff = value - baseInfo.base;
976 break;
977 default:
978 break;
979 }
980 baseInfo.base = value;
981 value = diff;
982 }
983
984 if (interval.hasValue) {
985 switch (mAggregationType) {
986 case ValueMetric::SUM:
987 // for AVG, we add up and take average when flushing the bucket
988 case ValueMetric::AVG:
989 interval.value += value;
990 break;
991 case ValueMetric::MIN:
992 interval.value = std::min(value, interval.value);
993 break;
994 case ValueMetric::MAX:
995 interval.value = std::max(value, interval.value);
996 break;
997 default:
998 break;
999 }
1000 } else {
1001 interval.value = value;
1002 interval.hasValue = true;
1003 }
1004 interval.sampleSize += 1;
1005 }
1006
1007 // State change.
1008 if (!mSlicedStateAtoms.empty() && stateChange) {
1009 // Turn OFF the condition timer for the previous state key.
1010 mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)]
1011 .conditionTimer.onConditionChanged(false, eventTimeNs);
1012
1013 // Turn ON the condition timer for the new state key.
1014 mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
1015 .conditionTimer.onConditionChanged(true, eventTimeNs);
1016 }
1017
1018 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
1019 // to MULTIPLE_BUCKETS_SKIPPED.
1020 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
1021 // TODO: propgate proper values down stream when anomaly support doubles
1022 long wholeBucketVal = intervals[0].value.long_value;
1023 auto prev = mCurrentFullBucket.find(eventKey);
1024 if (prev != mCurrentFullBucket.end()) {
1025 wholeBucketVal += prev->second;
1026 }
1027 for (auto& tracker : mAnomalyTrackers) {
1028 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
1029 wholeBucketVal);
1030 }
1031 }
1032 }
1033
1034 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
1035 // if mCondition is true!
flushIfNeededLocked(const int64_t & eventTimeNs)1036 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
1037 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
1038 if (eventTimeNs < currentBucketEndTimeNs) {
1039 VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
1040 (long long)(currentBucketEndTimeNs));
1041 return;
1042 }
1043 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
1044 int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
1045 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
1046 }
1047
calcBucketsForwardCount(const int64_t & eventTimeNs) const1048 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
1049 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
1050 if (eventTimeNs < currentBucketEndTimeNs) {
1051 return 0;
1052 }
1053 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
1054 }
1055
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)1056 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
1057 const int64_t& nextBucketStartTimeNs) {
1058 if (mCondition == ConditionState::kUnknown) {
1059 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
1060 invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
1061 }
1062
1063 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
1064 (int)mCurrentSlicedBucket.size());
1065
1066 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
1067 int64_t bucketEndTime = fullBucketEndTimeNs;
1068 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
1069
1070 if (multipleBucketsSkipped(numBucketsForward)) {
1071 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
1072 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
1073 // Something went wrong. Maybe the device was sleeping for a long time. It is better
1074 // to mark the current bucket as invalid. The last pull might have been successful through.
1075 invalidateCurrentBucketWithoutResetBase(eventTimeNs,
1076 BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
1077 // End the bucket at the next bucket start time so the entire interval is skipped.
1078 bucketEndTime = nextBucketStartTimeNs;
1079 } else if (eventTimeNs < fullBucketEndTimeNs) {
1080 bucketEndTime = eventTimeNs;
1081 }
1082
1083 // Close the current bucket.
1084 int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
1085 bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
1086 if (!isBucketLargeEnough) {
1087 skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
1088 }
1089 if (!mCurrentBucketIsSkipped) {
1090 bool bucketHasData = false;
1091 // The current bucket is large enough to keep.
1092 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
1093 PastValueBucket bucket =
1094 buildPartialBucket(bucketEndTime, currentValueBucket.intervals);
1095 if (!mSlicedStateAtoms.empty()) {
1096 bucket.mConditionTrueNs =
1097 currentValueBucket.conditionTimer.newBucketStart(bucketEndTime);
1098 } else {
1099 bucket.mConditionTrueNs = conditionTrueDuration;
1100 }
1101 // it will auto create new vector of ValuebucketInfo if the key is not found.
1102 if (bucket.valueIndex.size() > 0) {
1103 auto& bucketList = mPastBuckets[metricDimensionKey];
1104 bucketList.push_back(bucket);
1105 bucketHasData = true;
1106 }
1107 }
1108 if (!bucketHasData) {
1109 skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
1110 }
1111 }
1112
1113 if (mCurrentBucketIsSkipped) {
1114 mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
1115 mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
1116 mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
1117 }
1118
1119 // This means that the current bucket was not flushed before a forced bucket split.
1120 // This can happen if an app update or a dump report with include_current_partial_bucket is
1121 // requested before we get a chance to flush the bucket due to receiving new data, either from
1122 // the statsd socket or the StatsPullerManager.
1123 if (bucketEndTime < nextBucketStartTimeNs) {
1124 SkippedBucket bucketInGap;
1125 bucketInGap.bucketStartTimeNs = bucketEndTime;
1126 bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
1127 bucketInGap.dropEvents.emplace_back(
1128 buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
1129 mSkippedBuckets.emplace_back(bucketInGap);
1130 }
1131 appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
1132 initCurrentSlicedBucket(nextBucketStartTimeNs);
1133 // Update the condition timer again, in case we skipped buckets.
1134 mConditionTimer.newBucketStart(nextBucketStartTimeNs);
1135
1136 // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
1137 // by state. Otherwise, the "global" condition timer will be used.
1138 if (!mSlicedStateAtoms.empty()) {
1139 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
1140 currentValueBucket.conditionTimer.newBucketStart(nextBucketStartTimeNs);
1141 }
1142 }
1143 mCurrentBucketNum += numBucketsForward;
1144 }
1145
buildPartialBucket(int64_t bucketEndTime,const std::vector<Interval> & intervals)1146 PastValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
1147 const std::vector<Interval>& intervals) {
1148 PastValueBucket bucket;
1149 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
1150 bucket.mBucketEndNs = bucketEndTime;
1151
1152 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
1153 // then all interval values are discarded for this bucket.
1154 if ((intervals.size() <= 0) || (intervals[0].hasValue && !valuePassesThreshold(intervals[0]))) {
1155 return bucket;
1156 }
1157
1158 for (const Interval& interval : intervals) {
1159 // Skip the output if the diff is zero
1160 if (!interval.hasValue || (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero())) {
1161 continue;
1162 }
1163
1164 bucket.valueIndex.push_back(interval.valueIndex);
1165 bucket.values.push_back(getFinalValue(interval));
1166 }
1167 return bucket;
1168 }
1169
initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)1170 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
1171 StatsdStats::getInstance().noteBucketCount(mMetricId);
1172 // Cleanup data structure to aggregate values.
1173 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
1174 bool obsolete = true;
1175 for (auto& interval : it->second.intervals) {
1176 interval.hasValue = false;
1177 interval.sampleSize = 0;
1178 if (interval.seenNewData) {
1179 obsolete = false;
1180 }
1181 interval.seenNewData = false;
1182 }
1183
1184 if (obsolete && !mSlicedStateAtoms.empty()) {
1185 // When slicing by state, only delete the MetricDimensionKey when the
1186 // state key in the MetricDimensionKey is not the current state key.
1187 const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
1188 const auto& currentBaseInfoItr = mCurrentBaseInfo.find(dimensionInWhatKey);
1189
1190 if ((currentBaseInfoItr != mCurrentBaseInfo.end()) &&
1191 (it->first.getStateValuesKey() == currentBaseInfoItr->second.currentState)) {
1192 obsolete = false;
1193 }
1194 }
1195 if (obsolete) {
1196 it = mCurrentSlicedBucket.erase(it);
1197 } else {
1198 it++;
1199 }
1200 // TODO(b/157655103): remove mCurrentBaseInfo entries when obsolete
1201 }
1202
1203 mCurrentBucketIsSkipped = false;
1204 mCurrentSkippedBucket.reset();
1205
1206 // If we do not have a global base when the condition is true,
1207 // we will have incomplete bucket for the next bucket.
1208 if (mUseDiff && !mHasGlobalBase && mCondition) {
1209 mCurrentBucketIsSkipped = false;
1210 }
1211 mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
1212 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
1213 (long long)mCurrentBucketStartTimeNs);
1214 }
1215
appendToFullBucket(const bool isFullBucketReached)1216 void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
1217 if (mCurrentBucketIsSkipped) {
1218 if (isFullBucketReached) {
1219 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
1220 mCurrentFullBucket.clear();
1221 }
1222 // Current bucket is invalid, we do not add it to the full bucket.
1223 return;
1224 }
1225
1226 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
1227 // Accumulate partial buckets with current value and then send to anomaly tracker.
1228 if (mCurrentFullBucket.size() > 0) {
1229 for (const auto& slice : mCurrentSlicedBucket) {
1230 if (hitFullBucketGuardRailLocked(slice.first) || slice.second.intervals.empty()) {
1231 continue;
1232 }
1233 // TODO: fix this when anomaly can accept double values
1234 auto& interval = slice.second.intervals[0];
1235 if (interval.hasValue) {
1236 mCurrentFullBucket[slice.first] += interval.value.long_value;
1237 }
1238 }
1239 for (const auto& slice : mCurrentFullBucket) {
1240 for (auto& tracker : mAnomalyTrackers) {
1241 if (tracker != nullptr) {
1242 tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
1243 }
1244 }
1245 }
1246 mCurrentFullBucket.clear();
1247 } else {
1248 // Skip aggregating the partial buckets since there's no previous partial bucket.
1249 for (const auto& slice : mCurrentSlicedBucket) {
1250 for (auto& tracker : mAnomalyTrackers) {
1251 if (tracker != nullptr && !slice.second.intervals.empty()) {
1252 // TODO: fix this when anomaly can accept double values
1253 auto& interval = slice.second.intervals[0];
1254 if (interval.hasValue) {
1255 tracker->addPastBucket(slice.first, interval.value.long_value,
1256 mCurrentBucketNum);
1257 }
1258 }
1259 }
1260 }
1261 }
1262 } else {
1263 // Accumulate partial bucket.
1264 for (const auto& slice : mCurrentSlicedBucket) {
1265 if (!slice.second.intervals.empty()) {
1266 // TODO: fix this when anomaly can accept double values
1267 auto& interval = slice.second.intervals[0];
1268 if (interval.hasValue) {
1269 mCurrentFullBucket[slice.first] += interval.value.long_value;
1270 }
1271 }
1272 }
1273 }
1274 }
1275
byteSizeLocked() const1276 size_t ValueMetricProducer::byteSizeLocked() const {
1277 size_t totalSize = 0;
1278 for (const auto& pair : mPastBuckets) {
1279 totalSize += pair.second.size() * kBucketSize;
1280 }
1281 return totalSize;
1282 }
1283
valuePassesThreshold(const Interval & interval)1284 bool ValueMetricProducer::valuePassesThreshold(const Interval& interval) {
1285 if (mUploadThreshold == nullopt) {
1286 return true;
1287 }
1288
1289 Value finalValue = getFinalValue(interval);
1290
1291 double doubleValue =
1292 finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
1293 switch (mUploadThreshold->value_comparison_case()) {
1294 case UploadThreshold::kLtInt:
1295 return doubleValue < (double)mUploadThreshold->lt_int();
1296 case UploadThreshold::kGtInt:
1297 return doubleValue > (double)mUploadThreshold->gt_int();
1298 case UploadThreshold::kLteInt:
1299 return doubleValue <= (double)mUploadThreshold->lte_int();
1300 case UploadThreshold::kGteInt:
1301 return doubleValue >= (double)mUploadThreshold->gte_int();
1302 case UploadThreshold::kLtFloat:
1303 return doubleValue <= (double)mUploadThreshold->lt_float();
1304 case UploadThreshold::kGtFloat:
1305 return doubleValue >= (double)mUploadThreshold->gt_float();
1306 default:
1307 ALOGE("Value metric no upload threshold type used");
1308 return false;
1309 }
1310 }
1311
getFinalValue(const Interval & interval)1312 Value ValueMetricProducer::getFinalValue(const Interval& interval) {
1313 if (mAggregationType != ValueMetric::AVG) {
1314 return interval.value;
1315 } else {
1316 double sum = interval.value.type == LONG ? (double)interval.value.long_value
1317 : interval.value.double_value;
1318 return Value((double)sum / interval.sampleSize);
1319 }
1320 }
1321
1322 } // namespace statsd
1323 } // namespace os
1324 } // namespace android
1325