refactor(mobile): DB repository for asset, backup, sync service (#12953)

* refactor(mobile): DB repository for asset, backup, sync service

* review feedback

* fix bug found by Alex

---------

Co-authored-by: Alex <alex.tran1502@gmail.com>
This commit is contained in:
Fynn Petersen-Frey 2024-09-30 16:37:30 +02:00 committed by GitHub
parent a2d457b01d
commit 15c04d3056
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 873 additions and 450 deletions

View file

@ -5,48 +5,66 @@ import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/entities/album.entity.dart';
import 'package:immich_mobile/entities/asset.entity.dart';
import 'package:immich_mobile/entities/etag.entity.dart';
import 'package:immich_mobile/entities/exif_info.entity.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/entities/user.entity.dart';
import 'package:immich_mobile/interfaces/album.interface.dart';
import 'package:immich_mobile/interfaces/album_api.interface.dart';
import 'package:immich_mobile/interfaces/album_media.interface.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/interfaces/asset.interface.dart';
import 'package:immich_mobile/interfaces/etag.interface.dart';
import 'package:immich_mobile/interfaces/exif_info.interface.dart';
import 'package:immich_mobile/interfaces/user.interface.dart';
import 'package:immich_mobile/repositories/album.repository.dart';
import 'package:immich_mobile/repositories/album_api.repository.dart';
import 'package:immich_mobile/repositories/album_media.repository.dart';
import 'package:immich_mobile/repositories/asset.repository.dart';
import 'package:immich_mobile/repositories/etag.repository.dart';
import 'package:immich_mobile/repositories/exif_info.repository.dart';
import 'package:immich_mobile/repositories/user.repository.dart';
import 'package:immich_mobile/services/entity.service.dart';
import 'package:immich_mobile/services/hash.service.dart';
import 'package:immich_mobile/utils/async_mutex.dart';
import 'package:immich_mobile/extensions/collection_extensions.dart';
import 'package:immich_mobile/utils/datetime_comparison.dart';
import 'package:immich_mobile/utils/diff.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
final syncServiceProvider = Provider(
(ref) => SyncService(
ref.watch(dbProvider),
ref.watch(hashServiceProvider),
ref.watch(entityServiceProvider),
ref.watch(albumMediaRepositoryProvider),
ref.watch(albumApiRepositoryProvider),
ref.watch(albumRepositoryProvider),
ref.watch(assetRepositoryProvider),
ref.watch(exifInfoRepositoryProvider),
ref.watch(userRepositoryProvider),
ref.watch(etagRepositoryProvider),
),
);
class SyncService {
final Isar _db;
final HashService _hashService;
final EntityService _entityService;
final IAlbumMediaRepository _albumMediaRepository;
final IAlbumApiRepository _albumApiRepository;
final IAlbumRepository _albumRepository;
final IAssetRepository _assetRepository;
final IExifInfoRepository _exifInfoRepository;
final IUserRepository _userRepository;
final IETagRepository _eTagRepository;
final AsyncMutex _lock = AsyncMutex();
final Logger _log = Logger('SyncService');
SyncService(
this._db,
this._hashService,
this._entityService,
this._albumMediaRepository,
this._albumApiRepository,
this._albumRepository,
this._assetRepository,
this._exifInfoRepository,
this._userRepository,
this._eTagRepository,
);
// public methods:
@ -119,7 +137,7 @@ class SyncService {
/// Returns `true`if there were any changes
Future<bool> _syncUsersFromServer(List<User> users) async {
users.sortBy((u) => u.id);
final dbUsers = await _db.users.where().sortById().findAll();
final dbUsers = await _userRepository.getAll(sortBy: UserSort.id);
assert(dbUsers.isSortedBy((u) => u.id), "dbUsers not sorted!");
final List<int> toDelete = [];
final List<User> toUpsert = [];
@ -141,9 +159,9 @@ class SyncService {
onlySecond: (User b) => toDelete.add(b.isarId),
);
if (changes) {
await _db.writeTxn(() async {
await _db.users.deleteAll(toDelete);
await _db.users.putAll(toUpsert);
await _userRepository.transaction(() async {
await _userRepository.deleteById(toDelete);
await _userRepository.upsertAll(toUpsert);
});
}
return changes;
@ -152,15 +170,15 @@ class SyncService {
/// Syncs a new asset to the db. Returns `true` if successful
Future<bool> _syncNewAssetToDb(Asset a) async {
final Asset? inDb =
await _db.assets.getByOwnerIdChecksum(a.ownerId, a.checksum);
await _assetRepository.getByOwnerIdChecksum(a.ownerId, a.checksum);
if (inDb != null) {
// unify local/remote assets by replacing the
// local-only asset in the DB with a local&remote asset
a = inDb.updatedCopy(a);
}
try {
await _db.writeTxn(() => a.put(_db));
} on IsarError catch (e) {
await _assetRepository.update(a);
} catch (e) {
_log.severe("Failed to put new asset into db", e);
return false;
}
@ -175,9 +193,9 @@ class SyncService {
DateTime since,
) getChangedAssets,
) async {
final currentUser = Store.get(StoreKey.currentUser);
final currentUser = await _userRepository.me();
final DateTime? since =
_db.eTags.getSync(currentUser.isarId)?.time?.toUtc();
(await _eTagRepository.get(currentUser.isarId))?.time?.toUtc();
if (since == null) return null;
final DateTime now = DateTime.now();
final (toUpsert, toDelete) = await getChangedAssets(users, since);
@ -198,7 +216,7 @@ class SyncService {
return true;
}
return false;
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to sync remote assets to db", e);
}
return null;
@ -206,23 +224,21 @@ class SyncService {
/// Deletes remote-only assets, updates merged assets to be local-only
Future<void> handleRemoteAssetRemoval(List<String> idsToDelete) {
return _db.writeTxn(() async {
final idsToRemove = await _db.assets
.remote(idsToDelete)
.filter()
.localIdIsNull()
.idProperty()
.findAll();
await _db.assets.deleteAll(idsToRemove);
await _db.exifInfos.deleteAll(idsToRemove);
final onlyLocal = await _db.assets.remote(idsToDelete).findAll();
if (onlyLocal.isNotEmpty) {
for (final Asset a in onlyLocal) {
a.remoteId = null;
a.isTrashed = false;
}
await _db.assets.putAll(onlyLocal);
return _assetRepository.transaction(() async {
await _assetRepository.deleteAllByRemoteId(
idsToDelete,
state: AssetState.remote,
);
final merged = await _assetRepository.getAllByRemoteId(
idsToDelete,
state: AssetState.merged,
);
if (merged.isEmpty) return;
for (final Asset asset in merged) {
asset.remoteId = null;
asset.isTrashed = false;
}
await _assetRepository.updateAll(merged);
});
}
@ -237,12 +253,7 @@ class SyncService {
return false;
}
await _syncUsersFromServer(serverUsers);
final List<User> users = await _db.users
.filter()
.isPartnerSharedWithEqualTo(true)
.or()
.isarIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.findAll();
final List<User> users = await _userRepository.getAllAccessible();
bool changes = false;
for (User u in users) {
changes |= await _syncRemoteAssetsForUser(u, loadAssets);
@ -259,11 +270,10 @@ class SyncService {
if (remote == null) {
return false;
}
final List<Asset> inDb = await _db.assets
.where()
.ownerIdEqualToAnyChecksum(user.isarId)
.sortByChecksum()
.findAll();
final List<Asset> inDb = await _assetRepository.getAll(
ownerId: user.isarId,
sortBy: AssetSort.checksum,
);
assert(inDb.isSorted(Asset.compareByChecksum), "inDb not sorted!");
remote.sort(Asset.compareByChecksum);
@ -278,9 +288,9 @@ class SyncService {
}
final idsToDelete = toRemove.map((e) => e.id).toList();
try {
await _db.writeTxn(() => _db.assets.deleteAll(idsToDelete));
await _assetRepository.deleteById(idsToDelete);
await upsertAssetsWithExif(toAdd + toUpdate);
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to sync remote assets to db", e);
}
await _updateUserAssetsETag([user], now);
@ -289,12 +299,12 @@ class SyncService {
Future<void> _updateUserAssetsETag(List<User> users, DateTime time) {
final etags = users.map((u) => ETag(id: u.id, time: time)).toList();
return _db.writeTxn(() => _db.eTags.putAll(etags));
return _eTagRepository.upsertAll(etags);
}
Future<void> _clearUserAssetsETag(List<User> users) {
final ids = users.map((u) => u.id).toList();
return _db.writeTxn(() => _db.eTags.deleteAllById(ids));
return _eTagRepository.deleteByIds(ids);
}
/// Syncs remote albums to the database
@ -305,15 +315,13 @@ class SyncService {
) async {
remoteAlbums.sortBy((e) => e.remoteId!);
final baseQuery = _db.albums.where().remoteIdIsNotNull().filter();
final QueryBuilder<Album, Album, QAfterFilterCondition> query;
if (isShared) {
query = baseQuery.sharedEqualTo(true);
} else {
final User me = Store.get(StoreKey.currentUser);
query = baseQuery.owner((q) => q.isarIdEqualTo(me.isarId));
}
final List<Album> dbAlbums = await query.sortByRemoteId().findAll();
final User me = await _userRepository.me();
final List<Album> dbAlbums = await _albumRepository.getAll(
remote: true,
shared: isShared ? true : null,
ownerId: isShared ? null : me.isarId,
sortBy: AlbumSort.remoteId,
);
assert(dbAlbums.isSortedBy((e) => e.remoteId!), "dbAlbums not sorted!");
final List<Asset> toDelete = [];
@ -333,10 +341,7 @@ class SyncService {
if (isShared && toDelete.isNotEmpty) {
final List<int> idsToRemove = sharedAssetsToRemove(toDelete, existing);
if (idsToRemove.isNotEmpty) {
await _db.writeTxn(() async {
await _db.assets.deleteAll(idsToRemove);
await _db.exifInfos.deleteAll(idsToRemove);
});
await _assetRepository.deleteById(idsToRemove);
}
} else {
assert(toDelete.isEmpty);
@ -360,8 +365,11 @@ class SyncService {
// i.e. it will always be null. Save it here.
final originalDto = dto;
dto = await _albumApiRepository.get(dto.remoteId!);
final assetsInDb =
await album.assets.filter().sortByOwnerId().thenByChecksum().findAll();
final assetsInDb = await _assetRepository.getByAlbum(
album,
sortBy: AssetSort.ownerIdChecksum,
);
assert(assetsInDb.isSorted(Asset.compareByOwnerChecksum), "inDb unsorted!");
final List<Asset> assetsOnRemote = dto.remoteAssets.toList();
assetsOnRemote.sort(Asset.compareByOwnerChecksum);
@ -391,7 +399,7 @@ class SyncService {
final (existingInDb, updated) = await _linkWithExistingFromDb(toAdd);
await upsertAssetsWithExif(updated);
final assetsToLink = existingInDb + updated;
final usersToLink = (await _db.users.getAllById(userIdsToAdd)).cast<User>();
final usersToLink = await _userRepository.getByIds(userIdsToAdd);
album.name = dto.name;
album.shared = dto.shared;
@ -402,32 +410,33 @@ class SyncService {
album.lastModifiedAssetTimestamp = originalDto.lastModifiedAssetTimestamp;
album.shared = dto.shared;
album.activityEnabled = dto.activityEnabled;
if (album.thumbnail.value?.remoteId != dto.remoteThumbnailAssetId) {
album.thumbnail.value = await _db.assets
.where()
.remoteIdEqualTo(dto.remoteThumbnailAssetId)
.findFirst();
final remoteThumbnailAssetId = dto.remoteThumbnailAssetId;
if (remoteThumbnailAssetId != null &&
album.thumbnail.value?.remoteId != remoteThumbnailAssetId) {
album.thumbnail.value =
await _assetRepository.getByRemoteId(remoteThumbnailAssetId);
}
// write & commit all changes to DB
try {
await _db.writeTxn(() async {
await _db.assets.putAll(toUpdate);
await album.thumbnail.save();
await album.sharedUsers
.update(link: usersToLink, unlink: usersToUnlink);
await album.assets.update(link: assetsToLink, unlink: toUnlink.cast());
await _db.albums.put(album);
await _assetRepository.transaction(() async {
await _assetRepository.updateAll(toUpdate);
await _albumRepository.addUsers(album, usersToLink);
await _albumRepository.removeUsers(album, usersToUnlink);
await _albumRepository.addAssets(album, assetsToLink);
await _albumRepository.removeAssets(album, toUnlink);
await _albumRepository.recalculateMetadata(album);
await _albumRepository.update(album);
});
_log.info("Synced changes of remote album ${album.name} to DB");
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to sync remote album to database", e);
}
if (album.shared || dto.shared) {
final userId = Store.get(StoreKey.currentUser).isarId;
final userId = (await _userRepository.me()).isarId;
final foreign =
await album.assets.filter().not().ownerIdEqualTo(userId).findAll();
await _assetRepository.getByAlbum(album, notOwnedBy: [userId]);
existing.addAll(foreign);
// delete assets in DB unless they belong to this user or part of some other shared album
@ -456,7 +465,7 @@ class SyncService {
await upsertAssetsWithExif(updated);
await _entityService.fillAlbumWithDatabaseEntities(album);
await _db.writeTxn(() => _db.albums.store(album));
await _albumRepository.create(album);
} else {
_log.warning(
"Failed to add album from server: assetCount ${album.remoteAssetCount} != "
@ -474,27 +483,18 @@ class SyncService {
_log.info("Removing local album $album from DB");
// delete assets in DB unless they are remote or part of some other album
deleteCandidates.addAll(
await album.assets.filter().remoteIdIsNull().findAll(),
await _assetRepository.getByAlbum(album, state: AssetState.local),
);
} else if (album.shared) {
final User user = Store.get(StoreKey.currentUser);
// delete assets in DB unless they belong to this user or are part of some other shared album or belong to a partner
final userIds = await _db.users
.filter()
.isPartnerSharedWithEqualTo(true)
.isarIdProperty()
.findAll();
userIds.add(user.isarId);
final orphanedAssets = await album.assets
.filter()
.not()
.anyOf(userIds, (q, int id) => q.ownerIdEqualTo(id))
.findAll();
final userIds =
(await _userRepository.getAllAccessible()).map((user) => user.isarId);
final orphanedAssets =
await _assetRepository.getByAlbum(album, notOwnedBy: userIds);
deleteCandidates.addAll(orphanedAssets);
}
try {
final bool ok = await _db.writeTxn(() => _db.albums.delete(album.id));
assert(ok);
await _albumRepository.delete(album.id);
_log.info("Removed local album $album from DB");
} catch (e) {
_log.severe("Failed to remove local album $album from DB", e);
@ -509,7 +509,7 @@ class SyncService {
]) async {
onDevice.sort((a, b) => a.id.compareTo(b.id));
final inDb =
await _db.albums.where().localIdIsNotNull().sortByLocalId().findAll();
await _albumRepository.getAll(remote: false, sortBy: AlbumSort.localId);
final List<Asset> deleteCandidates = [];
final List<Asset> existing = [];
assert(inDb.isSorted((a, b) => a.localId!.compareTo(b.localId!)), "sort!");
@ -536,10 +536,9 @@ class SyncService {
"${toDelete.length} assets to delete, ${toUpdate.length} to update",
);
if (toDelete.isNotEmpty || toUpdate.isNotEmpty) {
await _db.writeTxn(() async {
await _db.assets.deleteAll(toDelete);
await _db.exifInfos.deleteAll(toDelete);
await _db.assets.putAll(toUpdate);
await _assetRepository.transaction(() async {
await _assetRepository.deleteById(toDelete);
await _assetRepository.updateAll(toUpdate);
});
_log.info(
"Removed ${toDelete.length} and updated ${toUpdate.length} local assets from DB",
@ -570,13 +569,13 @@ class SyncService {
await _syncDeviceAlbumFast(deviceAlbum, dbAlbum)) {
return true;
}
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
final inDb = await dbAlbum.assets
.filter()
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.sortByChecksum()
.findAll();
final inDb = await _assetRepository.getByAlbum(
dbAlbum,
ownerId: (await _userRepository.me()).isarId,
sortBy: AssetSort.checksum,
);
assert(inDb.isSorted(Asset.compareByChecksum), "inDb not sorted!");
final int assetCountOnDevice =
await _albumMediaRepository.getAssetCount(deviceAlbum.localId!);
@ -597,15 +596,14 @@ class SyncService {
"Only excluded assets in local album ${deviceAlbum.name} changed. Stopping sync.",
);
if (assetCountOnDevice !=
_db.eTags.getByIdSync(deviceAlbum.eTagKeyAssetCount)?.assetCount) {
await _db.writeTxn(
() => _db.eTags.put(
ETag(
id: deviceAlbum.eTagKeyAssetCount,
assetCount: assetCountOnDevice,
),
(await _eTagRepository.getById(deviceAlbum.eTagKeyAssetCount))
?.assetCount) {
await _eTagRepository.upsertAll([
ETag(
id: deviceAlbum.eTagKeyAssetCount,
assetCount: assetCountOnDevice,
),
);
]);
}
return false;
}
@ -625,23 +623,21 @@ class SyncService {
dbAlbum.thumbnail.value = null;
}
try {
await _db.writeTxn(() async {
await _db.assets.putAll(updated);
await _db.assets.putAll(toUpdate);
await dbAlbum.assets
.update(link: existingInDb + updated, unlink: toDelete);
await _db.albums.put(dbAlbum);
dbAlbum.thumbnail.value ??= await dbAlbum.assets.filter().findFirst();
await dbAlbum.thumbnail.save();
await _db.eTags.put(
await _assetRepository.transaction(() async {
await _assetRepository.updateAll(updated + toUpdate);
await _albumRepository.addAssets(dbAlbum, existingInDb + updated);
await _albumRepository.removeAssets(dbAlbum, toDelete);
await _albumRepository.recalculateMetadata(dbAlbum);
await _albumRepository.update(dbAlbum);
await _eTagRepository.upsertAll([
ETag(
id: deviceAlbum.eTagKeyAssetCount,
assetCount: assetCountOnDevice,
),
);
]);
});
_log.info("Synced changes of local album ${deviceAlbum.name} to DB");
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to update synced album ${deviceAlbum.name} in DB", e);
}
@ -657,7 +653,8 @@ class SyncService {
final int totalOnDevice =
await _albumMediaRepository.getAssetCount(deviceAlbum.localId!);
final int lastKnownTotal =
(await _db.eTags.getById(deviceAlbum.eTagKeyAssetCount))?.assetCount ??
(await _eTagRepository.getById(deviceAlbum.eTagKeyAssetCount))
?.assetCount ??
0;
if (totalOnDevice <= lastKnownTotal) {
return false;
@ -675,16 +672,17 @@ class SyncService {
_removeDuplicates(newAssets);
final (existingInDb, updated) = await _linkWithExistingFromDb(newAssets);
try {
await _db.writeTxn(() async {
await _db.assets.putAll(updated);
await dbAlbum.assets.update(link: existingInDb + updated);
await _db.albums.put(dbAlbum);
await _db.eTags.put(
ETag(id: deviceAlbum.eTagKeyAssetCount, assetCount: totalOnDevice),
await _assetRepository.transaction(() async {
await _assetRepository.updateAll(updated);
await _albumRepository.addAssets(dbAlbum, existingInDb + updated);
await _albumRepository.recalculateMetadata(dbAlbum);
await _albumRepository.update(dbAlbum);
await _eTagRepository.upsertAll(
[ETag(id: deviceAlbum.eTagKeyAssetCount, assetCount: totalOnDevice)],
);
});
_log.info("Fast synced local album ${deviceAlbum.name} to DB");
} on IsarError catch (e) {
} catch (e) {
_log.severe(
"Failed to fast sync local album ${deviceAlbum.name} to DB",
e,
@ -719,9 +717,9 @@ class SyncService {
final thumb = existingInDb.firstOrNull ?? updated.firstOrNull;
album.thumbnail.value = thumb;
try {
await _db.writeTxn(() => _db.albums.store(album));
await _albumRepository.create(album);
_log.info("Added a new local album to DB: ${album.name}");
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to add new local album ${album.name} to DB", e);
}
}
@ -732,7 +730,7 @@ class SyncService {
) async {
if (assets.isEmpty) return ([].cast<Asset>(), [].cast<Asset>());
final List<Asset?> inDb = await _db.assets.getAllByOwnerIdChecksum(
final List<Asset?> inDb = await _assetRepository.getAllByOwnerIdChecksum(
assets.map((a) => a.ownerId).toInt64List(),
assets.map((a) => a.checksum).toList(growable: false),
);
@ -746,7 +744,7 @@ class SyncService {
}
if (b.canUpdate(assets[i])) {
final updated = b.updatedCopy(assets[i]);
assert(updated.id != Isar.autoIncrement);
assert(updated.isInDb);
toUpsert.add(updated);
} else {
existing.add(b);
@ -758,24 +756,22 @@ class SyncService {
/// Inserts or updates the assets in the database with their ExifInfo (if any)
Future<void> upsertAssetsWithExif(List<Asset> assets) async {
if (assets.isEmpty) {
return;
}
final exifInfos = assets.map((e) => e.exifInfo).whereNotNull().toList();
if (assets.isEmpty) return;
final exifInfos = assets.map((e) => e.exifInfo).nonNulls.toList();
try {
await _db.writeTxn(() async {
await _db.assets.putAll(assets);
await _assetRepository.transaction(() async {
await _assetRepository.updateAll(assets);
for (final Asset added in assets) {
added.exifInfo?.id = added.id;
}
await _db.exifInfos.putAll(exifInfos);
await _exifInfoRepository.updateAll(exifInfos);
});
_log.info("Upserted ${assets.length} assets into the DB");
} on IsarError catch (e) {
} catch (e) {
_log.severe("Failed to upsert ${assets.length} assets into the DB", e);
// give details on the errors
assets.sort(Asset.compareByOwnerChecksum);
final inDb = await _db.assets.getAllByOwnerIdChecksum(
final inDb = await _assetRepository.getAllByOwnerIdChecksum(
assets.map((e) => e.ownerId).toInt64List(),
assets.map((e) => e.checksum).toList(growable: false),
);
@ -783,7 +779,7 @@ class SyncService {
final Asset a = assets[i];
final Asset? b = inDb[i];
if (b == null) {
if (a.id != Isar.autoIncrement) {
if (!a.isInDb) {
_log.warning(
"Trying to update an asset that does not exist in DB:\n$a",
);
@ -827,19 +823,19 @@ class SyncService {
return deviceAlbum.name != dbAlbum.name ||
!deviceAlbum.modifiedAt.isAtSameMomentAs(dbAlbum.modifiedAt) ||
await _albumMediaRepository.getAssetCount(deviceAlbum.localId!) !=
(await _db.eTags.getById(deviceAlbum.eTagKeyAssetCount))
(await _eTagRepository.getById(deviceAlbum.eTagKeyAssetCount))
?.assetCount;
}
Future<bool> _removeAllLocalAlbumsAndAssets() async {
try {
final assets = await _db.assets.where().localIdIsNotNull().findAll();
final assets = await _assetRepository.getAllLocal();
final (toDelete, toUpdate) =
_handleAssetRemoval(assets, [], remote: false);
await _db.writeTxn(() async {
await _db.assets.deleteAll(toDelete);
await _db.assets.putAll(toUpdate);
await _db.albums.where().localIdIsNotNull().deleteAll();
await _assetRepository.transaction(() async {
await _assetRepository.deleteById(toDelete);
await _assetRepository.updateAll(toUpdate);
await _albumRepository.deleteAllLocal();
});
return true;
} catch (e) {