chore: robust isolation tasks coordination (#21605)

* chore: robust isolation tasks coordination

* give more time for database transaction to clean up

* chore: clean up logs

* chore: clean up logs

* fix: logs
This commit is contained in:
Alex 2025-09-05 14:39:38 -05:00 committed by GitHub
parent 3a29522df6
commit 88c0243a20
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 198 additions and 67 deletions

View file

@ -24,6 +24,7 @@ class BackgroundSyncManager {
Cancelable<void>? _syncTask; Cancelable<void>? _syncTask;
Cancelable<void>? _syncWebsocketTask; Cancelable<void>? _syncWebsocketTask;
Cancelable<void>? _deviceAlbumSyncTask; Cancelable<void>? _deviceAlbumSyncTask;
Cancelable<void>? _linkedAlbumSyncTask;
Cancelable<void>? _hashTask; Cancelable<void>? _hashTask;
BackgroundSyncManager({ BackgroundSyncManager({
@ -53,6 +54,12 @@ class BackgroundSyncManager {
_syncWebsocketTask?.cancel(); _syncWebsocketTask?.cancel();
_syncWebsocketTask = null; _syncWebsocketTask = null;
if (_linkedAlbumSyncTask != null) {
futures.add(_linkedAlbumSyncTask!.future);
}
_linkedAlbumSyncTask?.cancel();
_linkedAlbumSyncTask = null;
try { try {
await Future.wait(futures); await Future.wait(futures);
} on CanceledError { } on CanceledError {
@ -158,8 +165,14 @@ class BackgroundSyncManager {
} }
Future<void> syncLinkedAlbum() { Future<void> syncLinkedAlbum() {
final task = runInIsolateGentle(computation: syncLinkedAlbumsIsolated); if (_linkedAlbumSyncTask != null) {
return task.future; return _linkedAlbumSyncTask!.future;
}
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated);
return _linkedAlbumSyncTask!.whenComplete(() {
_linkedAlbumSyncTask = null;
});
} }
} }

View file

@ -1,6 +1,5 @@
import 'dart:async'; import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/services/log.service.dart'; import 'package:immich_mobile/domain/services/log.service.dart';
import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart';
@ -34,6 +33,12 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
final Ref _ref; final Ref _ref;
bool _wasPaused = false; bool _wasPaused = false;
// Add operation coordination
Completer<void>? _resumeOperation;
Completer<void>? _pauseOperation;
final _log = Logger("AppLifeCycleNotifier");
AppLifeCycleNotifier(this._ref) : super(AppLifeCycleEnum.active); AppLifeCycleNotifier(this._ref) : super(AppLifeCycleEnum.active);
AppLifeCycleEnum getAppState() { AppLifeCycleEnum getAppState() {
@ -43,6 +48,32 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
void handleAppResume() async { void handleAppResume() async {
state = AppLifeCycleEnum.resumed; state = AppLifeCycleEnum.resumed;
// Prevent overlapping resume operations
if (_resumeOperation != null && !_resumeOperation!.isCompleted) {
await _resumeOperation!.future;
return;
}
// Cancel any ongoing pause operation
if (_pauseOperation != null && !_pauseOperation!.isCompleted) {
_pauseOperation!.complete();
}
_resumeOperation = Completer<void>();
try {
await _performResume();
} catch (e, stackTrace) {
_log.severe("Error during app resume", e, stackTrace);
} finally {
if (!_resumeOperation!.isCompleted) {
_resumeOperation!.complete();
}
_resumeOperation = null;
}
}
Future<void> _performResume() async {
// no need to resume because app was never really paused // no need to resume because app was never really paused
if (!_wasPaused) return; if (!_wasPaused) return;
_wasPaused = false; _wasPaused = false;
@ -53,9 +84,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
if (isAuthenticated) { if (isAuthenticated) {
// switch endpoint if needed // switch endpoint if needed
final endpoint = await _ref.read(authProvider.notifier).setOpenApiServiceEndpoint(); final endpoint = await _ref.read(authProvider.notifier).setOpenApiServiceEndpoint();
if (kDebugMode) { _log.info("Using server URL: $endpoint");
debugPrint("Using server URL: $endpoint");
}
if (!Store.isBetaTimelineEnabled) { if (!Store.isBetaTimelineEnabled) {
final permission = _ref.watch(galleryPermissionNotifier); final permission = _ref.watch(galleryPermissionNotifier);
@ -81,52 +110,10 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
break; break;
} }
} else { } else {
_ref.read(backupProvider.notifier).cancelBackup(); _ref.read(websocketProvider.notifier).connect();
final lockManager = _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)); await _handleBetaTimelineResume();
lockManager.requestHolderToClose();
debugPrint("Requested lock holder to close on resume");
await lockManager.acquireLock();
debugPrint("Lock acquired for background sync on resume");
final backgroundManager = _ref.read(backgroundSyncProvider);
// Ensure proper cleanup before starting new background tasks
try {
await Future.wait([
Future(() async {
await backgroundManager.syncLocal();
Logger("AppLifeCycleNotifier").fine("Hashing assets after syncLocal");
// Check if app is still active before hashing
if ([AppLifeCycleEnum.resumed, AppLifeCycleEnum.active].contains(state)) {
await backgroundManager.hashAssets();
}
}),
backgroundManager.syncRemote(),
]).then((_) async {
final isEnableBackup = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
final isAlbumLinkedSyncEnable = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.syncAlbums);
if (isEnableBackup) {
final currentUser = _ref.read(currentUserProvider);
if (currentUser == null) {
return;
}
await _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
}
if (isAlbumLinkedSyncEnable) {
await backgroundManager.syncLinkedAlbum();
}
});
} catch (e, stackTrace) {
Logger("AppLifeCycleNotifier").severe("Error during background sync", e, stackTrace);
}
} }
_ref.read(websocketProvider.notifier).connect();
await _ref.read(notificationPermissionProvider.notifier).getNotificationPermission(); await _ref.read(notificationPermissionProvider.notifier).getNotificationPermission();
await _ref.read(galleryPermissionNotifier.notifier).getGalleryPermissionStatus(); await _ref.read(galleryPermissionNotifier.notifier).getGalleryPermissionStatus();
@ -138,6 +125,103 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
} }
} }
Future<void> _handleBetaTimelineResume() async {
_ref.read(backupProvider.notifier).cancelBackup();
final lockManager = _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort));
// Give isolates time to complete any ongoing database transactions
await Future.delayed(const Duration(milliseconds: 500));
lockManager.requestHolderToClose();
// Add timeout to prevent deadlock on lock acquisition
try {
await lockManager.acquireLock().timeout(
const Duration(seconds: 10),
onTimeout: () {
_log.warning("Lock acquisition timed out, proceeding without lock");
throw TimeoutException("Lock acquisition timed out", const Duration(seconds: 10));
},
);
} catch (e) {
_log.warning("Failed to acquire lock: $e");
return;
}
final backgroundManager = _ref.read(backgroundSyncProvider);
final isAlbumLinkedSyncEnable = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.syncAlbums);
try {
// Run operations sequentially with state checks and error handling for each
if (_shouldContinueOperation()) {
try {
await backgroundManager.syncLocal();
} catch (e, stackTrace) {
_log.warning("Failed syncLocal: $e", e, stackTrace);
}
}
// Check if app is still active before hashing
if (_shouldContinueOperation()) {
try {
await backgroundManager.hashAssets();
} catch (e, stackTrace) {
_log.warning("Failed hashAssets: $e", e, stackTrace);
}
}
// Check if app is still active before remote sync
if (_shouldContinueOperation()) {
try {
await backgroundManager.syncRemote();
} catch (e, stackTrace) {
_log.warning("Failed syncRemote: $e", e, stackTrace);
}
if (isAlbumLinkedSyncEnable && _shouldContinueOperation()) {
try {
await backgroundManager.syncLinkedAlbum();
} catch (e, stackTrace) {
_log.warning("Failed syncLinkedAlbum: $e", e, stackTrace);
}
}
}
// Handle backup resume only if still active
if (_shouldContinueOperation()) {
final isEnableBackup = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
if (isEnableBackup) {
final currentUser = _ref.read(currentUserProvider);
if (currentUser != null) {
try {
await _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
_log.fine("Completed backup resume");
} catch (e, stackTrace) {
_log.warning("Failed backup resume: $e", e, stackTrace);
}
}
}
}
} catch (e, stackTrace) {
_log.severe("Error during background sync", e, stackTrace);
} finally {
// Ensure lock is released even if operations fail
try {
lockManager.releaseLock();
_log.fine("Lock released after background sync operations");
} catch (lockError) {
_log.warning("Failed to release lock after error: $lockError");
}
}
}
// Helper method to check if operations should continue
bool _shouldContinueOperation() {
return [AppLifeCycleEnum.resumed, AppLifeCycleEnum.active].contains(state) &&
(_resumeOperation?.isCompleted == false || _resumeOperation == null);
}
void handleAppInactivity() { void handleAppInactivity() {
state = AppLifeCycleEnum.inactive; state = AppLifeCycleEnum.inactive;
// do not stop/clean up anything on inactivity: issued on every orientation change // do not stop/clean up anything on inactivity: issued on every orientation change
@ -147,6 +231,32 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
state = AppLifeCycleEnum.paused; state = AppLifeCycleEnum.paused;
_wasPaused = true; _wasPaused = true;
// Prevent overlapping pause operations
if (_pauseOperation != null && !_pauseOperation!.isCompleted) {
await _pauseOperation!.future;
return;
}
// Cancel any ongoing resume operation
if (_resumeOperation != null && !_resumeOperation!.isCompleted) {
_resumeOperation!.complete();
}
_pauseOperation = Completer<void>();
try {
await _performPause();
} catch (e, stackTrace) {
_log.severe("Error during app pause", e, stackTrace);
} finally {
if (!_pauseOperation!.isCompleted) {
_pauseOperation!.complete();
}
_pauseOperation = null;
}
}
Future<void> _performPause() async {
if (_ref.read(authProvider).isAuthenticated) { if (_ref.read(authProvider).isAuthenticated) {
if (!Store.isBetaTimelineEnabled) { if (!Store.isBetaTimelineEnabled) {
// Do not cancel backup if manual upload is in progress // Do not cancel backup if manual upload is in progress
@ -155,10 +265,26 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
} }
} else { } else {
final backgroundManager = _ref.read(backgroundSyncProvider); final backgroundManager = _ref.read(backgroundSyncProvider);
await backgroundManager.cancel();
await backgroundManager.cancelLocal(); // Cancel operations with extended timeout to allow database transactions to complete
_ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); try {
debugPrint("Lock released on app pause"); await Future.wait([
backgroundManager.cancel().timeout(const Duration(seconds: 10)),
backgroundManager.cancelLocal().timeout(const Duration(seconds: 10)),
]).timeout(const Duration(seconds: 15));
// Give additional time for isolates to clean up database connections
await Future.delayed(const Duration(milliseconds: 1000));
} catch (e) {
_log.warning("Timeout during background cancellation: $e");
}
// Always release the lock, even if cancellation failed
try {
_ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock();
} catch (e) {
_log.warning("Failed to release lock on pause: $e");
}
} }
_ref.read(websocketProvider.notifier).disconnect(); _ref.read(websocketProvider.notifier).disconnect();
@ -166,9 +292,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
try { try {
LogService.I.flush(); LogService.I.flush();
} catch (e) { } catch (_) {}
// Ignore flush errors during pause
}
} }
Future<void> handleAppDetached() async { Future<void> handleAppDetached() async {
@ -177,9 +301,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
// Flush logs before closing database // Flush logs before closing database
try { try {
LogService.I.flush(); LogService.I.flush();
} catch (e) { } catch (_) {}
// Ignore flush errors during shutdown
}
// Close Isar database safely // Close Isar database safely
try { try {
@ -187,9 +309,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
if (isar != null && isar.isOpen) { if (isar != null && isar.isOpen) {
await isar.close(); await isar.close();
} }
} catch (e) { } catch (_) {}
// Ignore close errors during shutdown
}
if (Store.isBetaTimelineEnabled) { if (Store.isBetaTimelineEnabled) {
_ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock();
@ -199,9 +319,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
// no guarantee this is called at all // no guarantee this is called at all
try { try {
_ref.read(manualUploadProvider.notifier).cancelBackup(); _ref.read(manualUploadProvider.notifier).cancelBackup();
} catch (e) { } catch (_) {}
// Ignore errors during shutdown
}
} }
void handleAppHidden() { void handleAppHidden() {

View file

@ -296,7 +296,7 @@ class _SyncStatusIndicatorState extends ConsumerState<_SyncStatusIndicator> with
@override @override
Widget build(BuildContext context) { Widget build(BuildContext context) {
final syncStatus = ref.watch(syncStatusProvider); final syncStatus = ref.watch(syncStatusProvider);
final isSyncing = syncStatus.isRemoteSyncing; final isSyncing = syncStatus.isRemoteSyncing || syncStatus.isLocalSyncing;
// Control animations based on sync status // Control animations based on sync status
if (isSyncing) { if (isSyncing) {