mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
feature(mobile): hash assets & sync via checksum (#2592)
* compare different sha1 implementations * remove openssl sha1 * sync via checksum * hash assets in batches * hash in background, show spinner in tab * undo tmp changes * migrate by clearing assets * ignore duplicate assets * error handling * trigger sync/merge after download and update view * review feedback improvements * hash in background isolate on iOS * rework linking assets with existing from DB * fine-grained errors on unique index violation * hash lenth validation * revert compute in background on iOS * ignore duplicate assets on device * fix bug with batching based on accumulated size --------- Co-authored-by: Fynn Petersen-Frey <zoodyy@users.noreply.github.com>
This commit is contained in:
parent
053a0482b4
commit
73075c64d1
28 changed files with 2315 additions and 507 deletions
|
|
@ -4,10 +4,12 @@ import 'package:collection/collection.dart';
|
|||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/shared/models/album.dart';
|
||||
import 'package:immich_mobile/shared/models/asset.dart';
|
||||
import 'package:immich_mobile/shared/models/etag.dart';
|
||||
import 'package:immich_mobile/shared/models/exif_info.dart';
|
||||
import 'package:immich_mobile/shared/models/store.dart';
|
||||
import 'package:immich_mobile/shared/models/user.dart';
|
||||
import 'package:immich_mobile/shared/providers/db.provider.dart';
|
||||
import 'package:immich_mobile/shared/services/hash.service.dart';
|
||||
import 'package:immich_mobile/utils/async_mutex.dart';
|
||||
import 'package:immich_mobile/utils/builtin_extensions.dart';
|
||||
import 'package:immich_mobile/utils/diff.dart';
|
||||
|
|
@ -16,15 +18,17 @@ import 'package:logging/logging.dart';
|
|||
import 'package:openapi/api.dart';
|
||||
import 'package:photo_manager/photo_manager.dart';
|
||||
|
||||
final syncServiceProvider =
|
||||
Provider((ref) => SyncService(ref.watch(dbProvider)));
|
||||
final syncServiceProvider = Provider(
|
||||
(ref) => SyncService(ref.watch(dbProvider), ref.watch(hashServiceProvider)),
|
||||
);
|
||||
|
||||
class SyncService {
|
||||
final Isar _db;
|
||||
final HashService _hashService;
|
||||
final AsyncMutex _lock = AsyncMutex();
|
||||
final Logger _log = Logger('SyncService');
|
||||
|
||||
SyncService(this._db);
|
||||
SyncService(this._db, this._hashService);
|
||||
|
||||
// public methods:
|
||||
|
||||
|
|
@ -33,6 +37,7 @@ class SyncService {
|
|||
Future<bool> syncUsersFromServer(List<User> users) async {
|
||||
users.sortBy((u) => u.id);
|
||||
final dbUsers = await _db.users.where().sortById().findAll();
|
||||
assert(dbUsers.isSortedBy((u) => u.id), "dbUsers not sorted!");
|
||||
final List<int> toDelete = [];
|
||||
final List<User> toUpsert = [];
|
||||
final changes = diffSortedListsSync(
|
||||
|
|
@ -108,40 +113,16 @@ class SyncService {
|
|||
// private methods:
|
||||
|
||||
/// Syncs a new asset to the db. Returns `true` if successful
|
||||
Future<bool> _syncNewAssetToDb(Asset newAsset) async {
|
||||
final List<Asset> inDb = await _db.assets
|
||||
.where()
|
||||
.localIdDeviceIdEqualTo(newAsset.localId, newAsset.deviceId)
|
||||
.findAll();
|
||||
Asset? match;
|
||||
if (inDb.length == 1) {
|
||||
// exactly one match: trivial case
|
||||
match = inDb.first;
|
||||
} else if (inDb.length > 1) {
|
||||
// TODO instead of this heuristics: match by checksum once available
|
||||
for (Asset a in inDb) {
|
||||
if (a.ownerId == newAsset.ownerId &&
|
||||
a.fileModifiedAt.isAtSameMomentAs(newAsset.fileModifiedAt)) {
|
||||
assert(match == null);
|
||||
match = a;
|
||||
}
|
||||
}
|
||||
if (match == null) {
|
||||
for (Asset a in inDb) {
|
||||
if (a.ownerId == newAsset.ownerId) {
|
||||
assert(match == null);
|
||||
match = a;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (match != null) {
|
||||
Future<bool> _syncNewAssetToDb(Asset a) async {
|
||||
final Asset? inDb =
|
||||
await _db.assets.getByChecksumOwnerId(a.checksum, a.ownerId);
|
||||
if (inDb != null) {
|
||||
// unify local/remote assets by replacing the
|
||||
// local-only asset in the DB with a local&remote asset
|
||||
newAsset = match.updatedCopy(newAsset);
|
||||
a = inDb.updatedCopy(a);
|
||||
}
|
||||
try {
|
||||
await _db.writeTxn(() => newAsset.put(_db));
|
||||
await _db.writeTxn(() => a.put(_db));
|
||||
} on IsarError catch (e) {
|
||||
_log.severe("Failed to put new asset into db: $e");
|
||||
return false;
|
||||
|
|
@ -162,11 +143,11 @@ class SyncService {
|
|||
final List<Asset> inDb = await _db.assets
|
||||
.filter()
|
||||
.ownerIdEqualTo(user.isarId)
|
||||
.sortByDeviceId()
|
||||
.thenByLocalId()
|
||||
.thenByFileModifiedAt()
|
||||
.sortByChecksum()
|
||||
.findAll();
|
||||
remote.sort(Asset.compareByOwnerDeviceLocalIdModified);
|
||||
assert(inDb.isSorted(Asset.compareByChecksum), "inDb not sorted!");
|
||||
|
||||
remote.sort(Asset.compareByChecksum);
|
||||
final (toAdd, toUpdate, toRemove) = _diffAssets(remote, inDb, remote: true);
|
||||
if (toAdd.isEmpty && toUpdate.isEmpty && toRemove.isEmpty) {
|
||||
return false;
|
||||
|
|
@ -199,6 +180,7 @@ class SyncService {
|
|||
query = baseQuery.owner((q) => q.isarIdEqualTo(me.isarId));
|
||||
}
|
||||
final List<Album> dbAlbums = await query.sortByRemoteId().findAll();
|
||||
assert(dbAlbums.isSortedBy((e) => e.remoteId!), "dbAlbums not sorted!");
|
||||
|
||||
final List<Asset> toDelete = [];
|
||||
final List<Asset> existing = [];
|
||||
|
|
@ -245,16 +227,16 @@ class SyncService {
|
|||
if (dto.assetCount != dto.assets.length) {
|
||||
return false;
|
||||
}
|
||||
final assetsInDb = await album.assets
|
||||
.filter()
|
||||
.sortByOwnerId()
|
||||
.thenByDeviceId()
|
||||
.thenByLocalId()
|
||||
.thenByFileModifiedAt()
|
||||
.findAll();
|
||||
final assetsInDb =
|
||||
await album.assets.filter().sortByOwnerId().thenByChecksum().findAll();
|
||||
assert(assetsInDb.isSorted(Asset.compareByOwnerChecksum), "inDb unsorted!");
|
||||
final List<Asset> assetsOnRemote = dto.getAssets();
|
||||
assetsOnRemote.sort(Asset.compareByOwnerDeviceLocalIdModified);
|
||||
final (toAdd, toUpdate, toUnlink) = _diffAssets(assetsOnRemote, assetsInDb);
|
||||
assetsOnRemote.sort(Asset.compareByOwnerChecksum);
|
||||
final (toAdd, toUpdate, toUnlink) = _diffAssets(
|
||||
assetsOnRemote,
|
||||
assetsInDb,
|
||||
compare: Asset.compareByOwnerChecksum,
|
||||
);
|
||||
|
||||
// update shared users
|
||||
final List<User> sharedUsers = album.sharedUsers.toList(growable: false);
|
||||
|
|
@ -297,6 +279,7 @@ class SyncService {
|
|||
await album.assets.update(link: assetsToLink, unlink: toUnlink.cast());
|
||||
await _db.albums.put(album);
|
||||
});
|
||||
_log.info("Synced changes of remote album ${album.name} to DB");
|
||||
} on IsarError catch (e) {
|
||||
_log.severe("Failed to sync remote album to database $e");
|
||||
}
|
||||
|
|
@ -382,10 +365,11 @@ class SyncService {
|
|||
Set<String>? excludedAssets,
|
||||
]) async {
|
||||
onDevice.sort((a, b) => a.id.compareTo(b.id));
|
||||
final List<Album> inDb =
|
||||
final inDb =
|
||||
await _db.albums.where().localIdIsNotNull().sortByLocalId().findAll();
|
||||
final List<Asset> deleteCandidates = [];
|
||||
final List<Asset> existing = [];
|
||||
assert(inDb.isSorted((a, b) => a.localId!.compareTo(b.localId!)), "sort!");
|
||||
final bool anyChanges = await diffSortedLists(
|
||||
onDevice,
|
||||
inDb,
|
||||
|
|
@ -447,14 +431,15 @@ class SyncService {
|
|||
final inDb = await album.assets
|
||||
.filter()
|
||||
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
|
||||
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
|
||||
.sortByLocalId()
|
||||
.sortByChecksum()
|
||||
.findAll();
|
||||
assert(inDb.isSorted(Asset.compareByChecksum), "inDb not sorted!");
|
||||
final int assetCountOnDevice = await ape.assetCountAsync;
|
||||
final List<Asset> onDevice =
|
||||
await ape.getAssets(excludedAssets: excludedAssets);
|
||||
onDevice.sort(Asset.compareByLocalId);
|
||||
final (toAdd, toUpdate, toDelete) =
|
||||
_diffAssets(onDevice, inDb, compare: Asset.compareByLocalId);
|
||||
await _hashService.getHashedAssets(ape, excludedAssets: excludedAssets);
|
||||
_removeDuplicates(onDevice);
|
||||
// _removeDuplicates sorts `onDevice` by checksum
|
||||
final (toAdd, toUpdate, toDelete) = _diffAssets(onDevice, inDb);
|
||||
if (toAdd.isEmpty &&
|
||||
toUpdate.isEmpty &&
|
||||
toDelete.isEmpty &&
|
||||
|
|
@ -491,6 +476,9 @@ class SyncService {
|
|||
await _db.albums.put(album);
|
||||
album.thumbnail.value ??= await album.assets.filter().findFirst();
|
||||
await album.thumbnail.save();
|
||||
await _db.eTags.put(
|
||||
ETag(id: ape.eTagKeyAssetCount, value: assetCountOnDevice.toString()),
|
||||
);
|
||||
});
|
||||
_log.info("Synced changes of local album ${ape.name} to DB");
|
||||
} on IsarError catch (e) {
|
||||
|
|
@ -503,8 +491,13 @@ class SyncService {
|
|||
/// fast path for common case: only new assets were added to device album
|
||||
/// returns `true` if successfull, else `false`
|
||||
Future<bool> _syncDeviceAlbumFast(AssetPathEntity ape, Album album) async {
|
||||
if (!(ape.lastModified ?? DateTime.now()).isAfter(album.modifiedAt)) {
|
||||
return false;
|
||||
}
|
||||
final int totalOnDevice = await ape.assetCountAsync;
|
||||
final AssetPathEntity? modified = totalOnDevice > album.assetCount
|
||||
final int lastKnownTotal =
|
||||
(await _db.eTags.getById(ape.eTagKeyAssetCount))?.value?.toInt() ?? 0;
|
||||
final AssetPathEntity? modified = totalOnDevice > lastKnownTotal
|
||||
? await ape.fetchPathProperties(
|
||||
filterOptionGroup: FilterOptionGroup(
|
||||
updateTimeCond: DateTimeCond(
|
||||
|
|
@ -517,17 +510,22 @@ class SyncService {
|
|||
if (modified == null) {
|
||||
return false;
|
||||
}
|
||||
final List<Asset> newAssets = await modified.getAssets();
|
||||
if (totalOnDevice != album.assets.length + newAssets.length) {
|
||||
final List<Asset> newAssets = await _hashService.getHashedAssets(modified);
|
||||
|
||||
if (totalOnDevice != lastKnownTotal + newAssets.length) {
|
||||
return false;
|
||||
}
|
||||
album.modifiedAt = ape.lastModified ?? DateTime.now();
|
||||
_removeDuplicates(newAssets);
|
||||
final (existingInDb, updated) = await _linkWithExistingFromDb(newAssets);
|
||||
try {
|
||||
await _db.writeTxn(() async {
|
||||
await _db.assets.putAll(updated);
|
||||
await album.assets.update(link: existingInDb + updated);
|
||||
await _db.albums.put(album);
|
||||
await _db.eTags.put(
|
||||
ETag(id: ape.eTagKeyAssetCount, value: totalOnDevice.toString()),
|
||||
);
|
||||
});
|
||||
_log.info("Fast synced local album ${ape.name} to DB");
|
||||
} on IsarError catch (e) {
|
||||
|
|
@ -547,7 +545,9 @@ class SyncService {
|
|||
]) async {
|
||||
_log.info("Syncing a new local album to DB: ${ape.name}");
|
||||
final Album a = Album.local(ape);
|
||||
final assets = await ape.getAssets(excludedAssets: excludedAssets);
|
||||
final assets =
|
||||
await _hashService.getHashedAssets(ape, excludedAssets: excludedAssets);
|
||||
_removeDuplicates(assets);
|
||||
final (existingInDb, updated) = await _linkWithExistingFromDb(assets);
|
||||
_log.info(
|
||||
"${existingInDb.length} assets already existed in DB, to upsert ${updated.length}",
|
||||
|
|
@ -570,44 +570,29 @@ class SyncService {
|
|||
Future<(List<Asset> existing, List<Asset> updated)> _linkWithExistingFromDb(
|
||||
List<Asset> assets,
|
||||
) async {
|
||||
if (assets.isEmpty) {
|
||||
return ([].cast<Asset>(), [].cast<Asset>());
|
||||
}
|
||||
final List<Asset> inDb = await _db.assets
|
||||
.where()
|
||||
.anyOf(
|
||||
assets,
|
||||
(q, Asset e) => q.localIdDeviceIdEqualTo(e.localId, e.deviceId),
|
||||
)
|
||||
.sortByOwnerId()
|
||||
.thenByDeviceId()
|
||||
.thenByLocalId()
|
||||
.thenByFileModifiedAt()
|
||||
.findAll();
|
||||
assets.sort(Asset.compareByOwnerDeviceLocalIdModified);
|
||||
final List<Asset> existing = [], toUpsert = [];
|
||||
diffSortedListsSync(
|
||||
inDb,
|
||||
assets,
|
||||
// do not compare by modified date because for some assets dates differ on
|
||||
// client and server, thus never reaching "both" case below
|
||||
compare: Asset.compareByOwnerDeviceLocalId,
|
||||
both: (Asset a, Asset b) {
|
||||
if (a.canUpdate(b)) {
|
||||
toUpsert.add(a.updatedCopy(b));
|
||||
return true;
|
||||
} else {
|
||||
existing.add(a);
|
||||
return false;
|
||||
}
|
||||
},
|
||||
onlyFirst: (Asset a) => _log.finer(
|
||||
"_linkWithExistingFromDb encountered asset only in DB: $a",
|
||||
null,
|
||||
StackTrace.current,
|
||||
),
|
||||
onlySecond: (Asset b) => toUpsert.add(b),
|
||||
if (assets.isEmpty) return ([].cast<Asset>(), [].cast<Asset>());
|
||||
|
||||
final List<Asset?> inDb = await _db.assets.getAllByChecksumOwnerId(
|
||||
assets.map((a) => a.checksum).toList(growable: false),
|
||||
assets.map((a) => a.ownerId).toInt64List(),
|
||||
);
|
||||
assert(inDb.length == assets.length);
|
||||
final List<Asset> existing = [], toUpsert = [];
|
||||
for (int i = 0; i < assets.length; i++) {
|
||||
final Asset? b = inDb[i];
|
||||
if (b == null) {
|
||||
toUpsert.add(assets[i]);
|
||||
continue;
|
||||
}
|
||||
if (b.canUpdate(assets[i])) {
|
||||
final updated = b.updatedCopy(assets[i]);
|
||||
assert(updated.id != Isar.autoIncrement);
|
||||
toUpsert.add(updated);
|
||||
} else {
|
||||
existing.add(b);
|
||||
}
|
||||
}
|
||||
assert(existing.length + toUpsert.length == assets.length);
|
||||
return (existing, toUpsert);
|
||||
}
|
||||
|
||||
|
|
@ -627,11 +612,63 @@ class SyncService {
|
|||
});
|
||||
_log.info("Upserted ${assets.length} assets into the DB");
|
||||
} on IsarError catch (e) {
|
||||
_log.warning(
|
||||
_log.severe(
|
||||
"Failed to upsert ${assets.length} assets into the DB: ${e.toString()}",
|
||||
);
|
||||
// give details on the errors
|
||||
assets.sort(Asset.compareByOwnerChecksum);
|
||||
final inDb = await _db.assets.getAllByChecksumOwnerId(
|
||||
assets.map((e) => e.checksum).toList(growable: false),
|
||||
assets.map((e) => e.ownerId).toInt64List(),
|
||||
);
|
||||
for (int i = 0; i < assets.length; i++) {
|
||||
final Asset a = assets[i];
|
||||
final Asset? b = inDb[i];
|
||||
if (b == null) {
|
||||
if (a.id != Isar.autoIncrement) {
|
||||
_log.warning(
|
||||
"Trying to update an asset that does not exist in DB:\n$a",
|
||||
);
|
||||
}
|
||||
} else if (a.id != b.id) {
|
||||
_log.warning(
|
||||
"Trying to insert another asset with the same checksum+owner. In DB:\n$b\nTo insert:\n$a",
|
||||
);
|
||||
}
|
||||
}
|
||||
for (int i = 1; i < assets.length; i++) {
|
||||
if (Asset.compareByOwnerChecksum(assets[i - 1], assets[i]) == 0) {
|
||||
_log.warning(
|
||||
"Trying to insert duplicate assets:\n${assets[i - 1]}\n${assets[i]}",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<Asset> _removeDuplicates(List<Asset> assets) {
|
||||
final int before = assets.length;
|
||||
assets.sort(Asset.compareByOwnerChecksumCreatedModified);
|
||||
assets.uniqueConsecutive(
|
||||
compare: Asset.compareByOwnerChecksum,
|
||||
onDuplicate: (a, b) =>
|
||||
_log.info("Ignoring duplicate assets on device:\n$a\n$b"),
|
||||
);
|
||||
final int duplicates = before - assets.length;
|
||||
if (duplicates > 0) {
|
||||
_log.warning("Ignored $duplicates duplicate assets on device");
|
||||
}
|
||||
return assets;
|
||||
}
|
||||
|
||||
/// returns `true` if the albums differ on the surface
|
||||
Future<bool> _hasAssetPathEntityChanged(AssetPathEntity a, Album b) async {
|
||||
return a.name != b.name ||
|
||||
a.lastModified == null ||
|
||||
!a.lastModified!.isAtSameMomentAs(b.modifiedAt) ||
|
||||
await a.assetCountAsync !=
|
||||
(await _db.eTags.getById(a.eTagKeyAssetCount))?.value?.toInt();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a triple(toAdd, toUpdate, toRemove)
|
||||
|
|
@ -639,7 +676,7 @@ class SyncService {
|
|||
List<Asset> assets,
|
||||
List<Asset> inDb, {
|
||||
bool? remote,
|
||||
int Function(Asset, Asset) compare = Asset.compareByOwnerDeviceLocalId,
|
||||
int Function(Asset, Asset) compare = Asset.compareByChecksum,
|
||||
}) {
|
||||
final List<Asset> toAdd = [];
|
||||
final List<Asset> toUpdate = [];
|
||||
|
|
@ -663,7 +700,7 @@ class SyncService {
|
|||
}
|
||||
} else if (remote == false && a.isRemote) {
|
||||
if (a.isLocal) {
|
||||
a.isLocal = false;
|
||||
a.localId = null;
|
||||
toUpdate.add(a);
|
||||
}
|
||||
} else {
|
||||
|
|
@ -685,9 +722,9 @@ class SyncService {
|
|||
return const ([], []);
|
||||
}
|
||||
deleteCandidates.sort(Asset.compareById);
|
||||
deleteCandidates.uniqueConsecutive((a) => a.id);
|
||||
deleteCandidates.uniqueConsecutive(compare: Asset.compareById);
|
||||
existing.sort(Asset.compareById);
|
||||
existing.uniqueConsecutive((a) => a.id);
|
||||
existing.uniqueConsecutive(compare: Asset.compareById);
|
||||
final (tooAdd, toUpdate, toRemove) = _diffAssets(
|
||||
existing,
|
||||
deleteCandidates,
|
||||
|
|
@ -698,14 +735,6 @@ class SyncService {
|
|||
return (toRemove.map((e) => e.id).toList(), toUpdate);
|
||||
}
|
||||
|
||||
/// returns `true` if the albums differ on the surface
|
||||
Future<bool> _hasAssetPathEntityChanged(AssetPathEntity a, Album b) async {
|
||||
return a.name != b.name ||
|
||||
a.lastModified == null ||
|
||||
!a.lastModified!.isAtSameMomentAs(b.modifiedAt) ||
|
||||
await a.assetCountAsync != b.assetCount;
|
||||
}
|
||||
|
||||
/// returns `true` if the albums differ on the surface
|
||||
bool _hasAlbumResponseDtoChanged(AlbumResponseDto dto, Album a) {
|
||||
return dto.assetCount != a.assetCount ||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue