import 'dart:convert';
import 'package:gcloud/storage.dart';
import 'package:googleapis/storage/v1.dart'
show DetailedApiRequestError, StorageApi;
import 'package:googleapis_auth/auth_io.dart';
import 'common.dart';
import 'constants.dart';
import 'gcs_lock.dart';
class SkiaPerfPoint extends MetricPoint {
SkiaPerfPoint._(this.githubRepo, this.gitHash, this.testName, this.subResult,
double? value, this._options, this.jsonUrl)
: assert(_options[kGithubRepoKey] == null),
assert(_options[kGitRevisionKey] == null),
assert(_options[kNameKey] == null),
super(
value,
<String, String?>{}
..addAll(_options)
..addAll(<String, String?>{
kGithubRepoKey: githubRepo,
kGitRevisionKey: gitHash,
kNameKey: testName,
kSubResultKey: subResult,
}),
) {
assert(tags[kGithubRepoKey] != null);
assert(tags[kGitRevisionKey] != null);
assert(tags[kNameKey] != null);
}
factory SkiaPerfPoint.fromPoint(MetricPoint p) {
final String? githubRepo = p.tags[kGithubRepoKey];
final String? gitHash = p.tags[kGitRevisionKey];
final String? name = p.tags[kNameKey];
if (githubRepo == null || gitHash == null || name == null) {
throw StateError(
'$kGithubRepoKey, $kGitRevisionKey, $kNameKey must be set in'
' the tags of $p.');
}
final String subResult = p.tags[kSubResultKey] ?? kSkiaPerfValueKey;
final Map<String, String> options = <String, String>{}..addEntries(
p.tags.entries.where(
(MapEntry<String, dynamic> entry) =>
entry.key != kGithubRepoKey &&
entry.key != kGitRevisionKey &&
entry.key != kNameKey &&
entry.key != kSubResultKey &&
entry.key != 'date',
),
);
return SkiaPerfPoint._(
githubRepo, gitHash, name, subResult, p.value, options, null);
}
final String githubRepo;
final String? gitHash;
final String testName;
final String subResult;
final String? jsonUrl;
Map<String, dynamic> _toSubResultJson() {
return <String, dynamic>{
subResult: value,
kSkiaPerfOptionsKey: _options,
};
}
static Map<String, dynamic> toSkiaPerfJson(List<SkiaPerfPoint> points) {
assert(points.isNotEmpty);
assert(() {
for (final SkiaPerfPoint p in points) {
if (p.githubRepo != points[0].githubRepo ||
p.gitHash != points[0].gitHash) {
return false;
}
}
return true;
}(), 'All points must have same githubRepo and gitHash');
final Map<String, dynamic> results = <String, dynamic>{};
for (final SkiaPerfPoint p in points) {
final Map<String, dynamic> subResultJson = p._toSubResultJson();
if (results[p.testName] == null) {
results[p.testName] = <String, dynamic>{
kSkiaPerfDefaultConfig: subResultJson,
};
} else {
assert(results[p.testName][kSkiaPerfDefaultConfig][kSkiaPerfOptionsKey]
.toString() ==
subResultJson[kSkiaPerfOptionsKey].toString());
assert(
results[p.testName][kSkiaPerfDefaultConfig][p.subResult] == null);
results[p.testName][kSkiaPerfDefaultConfig][p.subResult] = p.value;
}
}
return <String, dynamic>{
kSkiaPerfGitHashKey: points[0].gitHash,
kSkiaPerfResultsKey: results,
};
}
final Map<String, String> _options;
}
class SkiaPerfGcsAdaptor {
SkiaPerfGcsAdaptor(this._gcsBucket);
static const int version = 1;
Future<void> writePoints(
String objectName, List<SkiaPerfPoint> points) async {
final String jsonString = jsonEncode(SkiaPerfPoint.toSkiaPerfJson(points));
final List<int> content = utf8.encode(jsonString);
for (int retry = 0; retry < 5; retry += 1) {
try {
await _gcsBucket.writeBytes(objectName, content);
return;
} catch (e) {
if (e is DetailedApiRequestError && e.status == 504) {
continue;
}
rethrow;
}
}
await _gcsBucket.writeBytes(objectName, content);
}
Future<List<SkiaPerfPoint>> readPoints(String objectName) async {
for (int retry = 0; retry < 5; retry += 1) {
try {
return await _readPointsWithoutRetry(objectName);
} catch (e) {
if (e is DetailedApiRequestError && e.status == 504) {
continue;
}
rethrow;
}
}
return _readPointsWithoutRetry(objectName);
}
Future<List<SkiaPerfPoint>> _readPointsWithoutRetry(String objectName) async {
ObjectInfo? info;
try {
info = await _gcsBucket.info(objectName);
} catch (e) {
if (e.toString().contains('No such object')) {
return <SkiaPerfPoint>[];
} else {
rethrow;
}
}
final Stream<List<int>> stream = _gcsBucket.read(objectName);
final Stream<int> byteStream = stream.expand((List<int> x) => x);
final Map<String, dynamic> decodedJson =
jsonDecode(utf8.decode(await byteStream.toList()))
as Map<String, dynamic>;
final List<SkiaPerfPoint> points = <SkiaPerfPoint>[];
final String firstGcsNameComponent = objectName.split('/')[0];
_populateGcsNameToGithubRepoMapIfNeeded();
final String githubRepo = _gcsNameToGithubRepo[firstGcsNameComponent]!;
final String? gitHash = decodedJson[kSkiaPerfGitHashKey] as String?;
final Map<String, dynamic> results =
decodedJson[kSkiaPerfResultsKey] as Map<String, dynamic>;
for (final String name in results.keys) {
final Map<String, dynamic> subResultMap =
results[name][kSkiaPerfDefaultConfig] as Map<String, dynamic>;
for (final String subResult
in subResultMap.keys.where((String s) => s != kSkiaPerfOptionsKey)) {
points.add(SkiaPerfPoint._(
githubRepo,
gitHash,
name,
subResult,
subResultMap[subResult] as double?,
(subResultMap[kSkiaPerfOptionsKey] as Map<String, dynamic>)
.cast<String, String>(),
info.downloadLink.toString(),
));
}
}
return points;
}
static Future<String> computeObjectName(String githubRepo, String? revision,
DateTime commitTime, String taskName) async {
assert(_githubRepoToGcsName[githubRepo] != null);
final String? topComponent = _githubRepoToGcsName[githubRepo];
final DateTime commitUtcTime = commitTime.toUtc();
final String month = commitUtcTime.month.toString().padLeft(2, '0');
final String day = commitUtcTime.day.toString().padLeft(2, '0');
final String hour = commitUtcTime.hour.toString().padLeft(2, '0');
final String dateComponents = '${commitUtcTime.year}/$month/$day/$hour';
return '$topComponent/$dateComponents/$revision/${taskName}_values.json';
}
static final Map<String, String> _githubRepoToGcsName = <String, String>{
kFlutterFrameworkRepo: 'flutter-flutter',
kFlutterEngineRepo: 'flutter-engine',
};
static final Map<String?, String> _gcsNameToGithubRepo = <String?, String>{};
static void _populateGcsNameToGithubRepoMapIfNeeded() {
if (_gcsNameToGithubRepo.isEmpty) {
for (final String repo in _githubRepoToGcsName.keys) {
final String? gcsName = _githubRepoToGcsName[repo];
assert(_gcsNameToGithubRepo[gcsName] == null);
_gcsNameToGithubRepo[gcsName] = repo;
}
}
}
final Bucket _gcsBucket;
}
class SkiaPerfDestination extends MetricDestination {
SkiaPerfDestination(this._gcs, this._lock);
static Future<SkiaPerfDestination> makeFromGcpCredentials(
Map<String, dynamic> credentialsJson,
{bool isTesting = false}) async {
final AutoRefreshingAuthClient client = await clientViaServiceAccount(
ServiceAccountCredentials.fromJson(credentialsJson), Storage.SCOPES);
return make(
client,
credentialsJson[kProjectId] as String,
isTesting: isTesting,
);
}
static Future<SkiaPerfDestination> makeFromAccessToken(
String token, String projectId,
{bool isTesting = false}) async {
final AuthClient client = authClientFromAccessToken(token, Storage.SCOPES);
return make(client, projectId, isTesting: isTesting);
}
static Future<SkiaPerfDestination> make(AuthClient client, String projectId,
{bool isTesting = false}) async {
final Storage storage = Storage(client, projectId);
final String bucketName = isTesting ? kTestBucketName : kBucketName;
if (!await storage.bucketExists(bucketName)) {
throw StateError('Bucket $bucketName does not exist.');
}
final SkiaPerfGcsAdaptor adaptor =
SkiaPerfGcsAdaptor(storage.bucket(bucketName));
final GcsLock lock = GcsLock(StorageApi(client), bucketName);
return SkiaPerfDestination(adaptor, lock);
}
@override
Future<void> update(
List<MetricPoint> points, DateTime commitTime, String taskName) async {
final Map<String, Map<String?, Map<String, SkiaPerfPoint>>> pointMap =
<String, Map<String, Map<String, SkiaPerfPoint>>>{};
for (final SkiaPerfPoint p
in points.map((MetricPoint x) => SkiaPerfPoint.fromPoint(x))) {
pointMap[p.githubRepo] ??= <String, Map<String, SkiaPerfPoint>>{};
pointMap[p.githubRepo]![p.gitHash] ??= <String, SkiaPerfPoint>{};
pointMap[p.githubRepo]![p.gitHash]![p.id] = p;
}
final List<Future<void>> lockFutures = <Future<void>>[];
for (final String repo in pointMap.keys) {
for (final String? revision in pointMap[repo]!.keys) {
final String objectName = await SkiaPerfGcsAdaptor.computeObjectName(
repo, revision, commitTime, taskName);
final Map<String, SkiaPerfPoint>? newPoints = pointMap[repo]![revision];
lockFutures.add(
_lock!.protectedRun('$objectName.lock', () async {
final List<SkiaPerfPoint> oldPoints =
await _gcs.readPoints(objectName);
for (final SkiaPerfPoint p in oldPoints) {
if (newPoints![p.id] == null) {
newPoints[p.id] = p;
}
}
await _gcs.writePoints(objectName, newPoints!.values.toList());
}),
);
}
}
await Future.wait(lockFutures);
}
final SkiaPerfGcsAdaptor _gcs;
late final GcsLock? _lock;
}