diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/background/BackgroundWorker.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/background/BackgroundWorker.kt index d24852b1fa..43124a957e 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/background/BackgroundWorker.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/background/BackgroundWorker.kt @@ -130,8 +130,10 @@ class BackgroundWorker(context: Context, params: WorkerParameters) : * - Parameter success: Indicates whether the background task completed successfully */ private fun complete(success: Result) { + Log.d(TAG, "About to complete BackupWorker with result: $success") isComplete = true engine?.destroy() + engine = null flutterApi = null completionHandler.set(success) } diff --git a/mobile/ios/Runner.xcodeproj/project.pbxproj b/mobile/ios/Runner.xcodeproj/project.pbxproj index 14c542b068..4e68390113 100644 --- a/mobile/ios/Runner.xcodeproj/project.pbxproj +++ b/mobile/ios/Runner.xcodeproj/project.pbxproj @@ -3,7 +3,7 @@ archiveVersion = 1; classes = { }; - objectVersion = 77; + objectVersion = 54; objects = { /* Begin PBXBuildFile section */ @@ -507,14 +507,10 @@ inputFileListPaths = ( "${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-input-files.xcfilelist", ); - inputPaths = ( - ); name = "[CP] Copy Pods Resources"; outputFileListPaths = ( "${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources-${CONFIGURATION}-output-files.xcfilelist", ); - outputPaths = ( - ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-resources.sh\"\n"; @@ -543,14 +539,10 @@ inputFileListPaths = ( "${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-input-files.xcfilelist", ); - inputPaths = ( - ); name = "[CP] Embed Pods Frameworks"; outputFileListPaths = ( "${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks-${CONFIGURATION}-output-files.xcfilelist", ); - outputPaths = ( - ); runOnlyForDeploymentPostprocessing = 0; shellPath = /bin/sh; shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-Runner/Pods-Runner-frameworks.sh\"\n"; diff --git a/mobile/ios/Runner/Background/BackgroundWorker.swift b/mobile/ios/Runner/Background/BackgroundWorker.swift index eeaa071653..835632a5d0 100644 --- a/mobile/ios/Runner/Background/BackgroundWorker.swift +++ b/mobile/ios/Runner/Background/BackgroundWorker.swift @@ -118,7 +118,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { self.handleHostResult(result: result) }) } - + /** * Cancels the currently running background task, either due to timeout or external request. * Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure @@ -140,6 +140,7 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { self.complete(success: false) } } + /** * Handles the result from Flutter API calls and determines the success/failure status. diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index cf8c6e7961..9f366ad30b 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -5,6 +5,7 @@ import 'package:background_downloader/background_downloader.dart'; import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; +import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart'; import 'package:immich_mobile/platform/background_worker_api.g.dart'; @@ -41,7 +42,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { final Drift _drift; final DriftLogger _driftLogger; final BackgroundWorkerBgHostApi _backgroundHostApi; - final Logger _logger = Logger('BackgroundWorkerBgService'); + final Logger _logger = Logger('BackgroundUploadBgService'); + late final IsolateLockManager _lockManager; bool _isCleanedUp = false; @@ -57,6 +59,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { driftProvider.overrideWith(driftOverride(drift)), ], ); + _lockManager = IsolateLockManager(onCloseRequest: _cleanup); BackgroundWorkerFlutterApi.setUp(this); } @@ -80,11 +83,25 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false); await FileDownloader().trackTasks(); configureFileDownloaderNotifications(); - await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess(); - // Notify the host that the background worker service has been initialized and is ready to use - _backgroundHostApi.onInitialized(); + // Notify the host that the background upload service has been initialized and is ready to use + debugPrint("Acquiring background worker lock"); + if (await _lockManager.acquireLock().timeout( + const Duration(seconds: 5), + onTimeout: () { + _lockManager.cancel(); + return false; + }, + )) { + _logger.info("Acquired background worker lock"); + await _backgroundHostApi.onInitialized(); + return; + } + + _logger.warning("Failed to acquire background worker lock"); + await _cleanup(); + await _backgroundHostApi.close(); } catch (error, stack) { _logger.severe("Failed to initialize background worker", error, stack); _backgroundHostApi.close(); @@ -160,7 +177,8 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { await _drift.close(); await _driftLogger.close(); _ref.dispose(); - debugPrint("Background worker cleaned up"); + _lockManager.releaseLock(); + _logger.info("Background worker resources cleaned up"); } catch (error, stack) { debugPrint('Failed to cleanup background worker: $error with stack: $stack'); } diff --git a/mobile/lib/domain/utils/isolate_lock_manager.dart b/mobile/lib/domain/utils/isolate_lock_manager.dart new file mode 100644 index 0000000000..37de649204 --- /dev/null +++ b/mobile/lib/domain/utils/isolate_lock_manager.dart @@ -0,0 +1,235 @@ +import 'dart:isolate'; +import 'dart:ui'; + +import 'package:flutter/foundation.dart'; +import 'package:logging/logging.dart'; + +const String kIsolateLockManagerPort = "immich://isolate_mutex"; + +enum _LockStatus { active, released } + +class _IsolateRequest { + const _IsolateRequest(); +} + +class _HeartbeatRequest extends _IsolateRequest { + // Port for the receiver to send replies back + final SendPort sendPort; + + const _HeartbeatRequest(this.sendPort); + + Map toJson() { + return {'type': 'heartbeat', 'sendPort': sendPort}; + } +} + +class _CloseRequest extends _IsolateRequest { + const _CloseRequest(); + + Map toJson() { + return {'type': 'close'}; + } +} + +class _IsolateResponse { + const _IsolateResponse(); +} + +class _HeartbeatResponse extends _IsolateResponse { + final _LockStatus status; + + const _HeartbeatResponse(this.status); + + Map toJson() { + return {'type': 'heartbeat', 'status': status.index}; + } +} + +typedef OnCloseLockHolderRequest = void Function(); + +class IsolateLockManager { + final String _portName; + bool _hasLock = false; + ReceivePort? _receivePort; + final OnCloseLockHolderRequest? _onCloseRequest; + final Set _waitingIsolates = {}; + // Token object - a new one is created for each acquisition attempt + Object? _currentAcquisitionToken; + + IsolateLockManager({String? portName, OnCloseLockHolderRequest? onCloseRequest}) + : _portName = portName ?? kIsolateLockManagerPort, + _onCloseRequest = onCloseRequest; + + Future acquireLock() async { + if (_hasLock) { + Logger('BackgroundWorkerLockManager').warning("WARNING: [acquireLock] called more than once"); + return true; + } + + // Create a new token - this invalidates any previous attempt + final token = _currentAcquisitionToken = Object(); + + final ReceivePort rp = _receivePort = ReceivePort(_portName); + final SendPort sp = rp.sendPort; + + while (!IsolateNameServer.registerPortWithName(sp, _portName)) { + // This attempt was superseded by a newer one in the same isolate + if (_currentAcquisitionToken != token) { + return false; + } + + await _lockReleasedByHolder(token); + } + + _hasLock = true; + rp.listen(_onRequest); + return true; + } + + Future _lockReleasedByHolder(Object token) async { + SendPort? holder = IsolateNameServer.lookupPortByName(_portName); + debugPrint("Found lock holder: $holder"); + if (holder == null) { + // No holder, try and acquire lock + return; + } + + final ReceivePort tempRp = ReceivePort(); + final SendPort tempSp = tempRp.sendPort; + final bs = tempRp.asBroadcastStream(); + + try { + while (true) { + // Send a heartbeat request with the send port to receive reply from the holder + + debugPrint("Sending heartbeat request to lock holder"); + holder.send(_HeartbeatRequest(tempSp).toJson()); + dynamic answer = await bs.first.timeout(const Duration(seconds: 3), onTimeout: () => null); + + debugPrint("Received heartbeat response from lock holder: $answer"); + // This attempt was superseded by a newer one in the same isolate + if (_currentAcquisitionToken != token) { + break; + } + + if (answer == null) { + // Holder failed, most likely killed without calling releaseLock + // Check if a different waiting isolate took the lock + if (holder == IsolateNameServer.lookupPortByName(_portName)) { + // No, remove the stale lock + IsolateNameServer.removePortNameMapping(_portName); + } + break; + } + + // Unknown message type received for heartbeat request. Try again + _IsolateResponse? response = _parseResponse(answer); + if (response == null || response is! _HeartbeatResponse) { + break; + } + + if (response.status == _LockStatus.released) { + // Holder has released the lock + break; + } + + // If the _LockStatus is active, we check again if the task completed + // by sending a released messaged again, if not, send a new heartbeat again + + // Check if the holder completed its task after the heartbeat + answer = await bs.first.timeout( + const Duration(seconds: 3), + onTimeout: () => const _HeartbeatResponse(_LockStatus.active).toJson(), + ); + + response = _parseResponse(answer); + if (response is _HeartbeatResponse && response.status == _LockStatus.released) { + break; + } + } + } catch (e) { + // Timeout or error + } finally { + tempRp.close(); + } + return; + } + + _IsolateRequest? _parseRequest(dynamic msg) { + if (msg is! Map) { + return null; + } + + return switch (msg['type']) { + 'heartbeat' => _HeartbeatRequest(msg['sendPort']), + 'close' => const _CloseRequest(), + _ => null, + }; + } + + _IsolateResponse? _parseResponse(dynamic msg) { + if (msg is! Map) { + return null; + } + + return switch (msg['type']) { + 'heartbeat' => _HeartbeatResponse(_LockStatus.values[msg['status']]), + _ => null, + }; + } + + // Executed in the isolate with the lock + void _onRequest(dynamic msg) { + final request = _parseRequest(msg); + if (request == null) { + return; + } + + if (request is _HeartbeatRequest) { + // Add the send port to the list of waiting isolates + _waitingIsolates.add(request.sendPort); + request.sendPort.send(const _HeartbeatResponse(_LockStatus.active).toJson()); + return; + } + + if (request is _CloseRequest) { + _onCloseRequest?.call(); + return; + } + } + + void releaseLock() { + if (_hasLock) { + IsolateNameServer.removePortNameMapping(_portName); + + // Notify waiting isolates + for (final port in _waitingIsolates) { + port.send(const _HeartbeatResponse(_LockStatus.released).toJson()); + } + _waitingIsolates.clear(); + + _hasLock = false; + } + + _receivePort?.close(); + _receivePort = null; + } + + void cancel() { + if (_hasLock) { + return; + } + + debugPrint("Cancelling ongoing acquire lock attempts"); + // Create a new token to invalidate ongoing acquire lock attempts + _currentAcquisitionToken = Object(); + } + + void requestHolderToClose() { + if (_hasLock) { + return; + } + + IsolateNameServer.lookupPortByName(_portName)?.send(const _CloseRequest().toJson()); + } +} diff --git a/mobile/lib/pages/common/splash_screen.page.dart b/mobile/lib/pages/common/splash_screen.page.dart index 87ea7849c6..64db7daee6 100644 --- a/mobile/lib/pages/common/splash_screen.page.dart +++ b/mobile/lib/pages/common/splash_screen.page.dart @@ -2,8 +2,10 @@ import 'package:auto_route/auto_route.dart'; import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/gallery_permission.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; @@ -21,14 +23,23 @@ class SplashScreenPage extends StatefulHookConsumerWidget { class SplashScreenPageState extends ConsumerState { final log = Logger("SplashScreenPage"); + @override void initState() { super.initState(); - ref - .read(authProvider.notifier) - .setOpenApiServiceEndpoint() - .then(logConnectionInfo) - .whenComplete(() => resumeSession()); + final lockManager = ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)); + + lockManager.requestHolderToClose(); + lockManager + .acquireLock() + .timeout(const Duration(seconds: 5)) + .whenComplete( + () => ref + .read(authProvider.notifier) + .setOpenApiServiceEndpoint() + .then(logConnectionInfo) + .whenComplete(() => resumeSession()), + ); } void logConnectionInfo(String? endpoint) { diff --git a/mobile/lib/providers/app_life_cycle.provider.dart b/mobile/lib/providers/app_life_cycle.provider.dart index 0696a8d7f1..ff5dda79c8 100644 --- a/mobile/lib/providers/app_life_cycle.provider.dart +++ b/mobile/lib/providers/app_life_cycle.provider.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:flutter/foundation.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/log.service.dart'; +import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/providers/album/album.provider.dart'; @@ -81,6 +82,12 @@ class AppLifeCycleNotifier extends StateNotifier { } } else { _ref.read(backupProvider.notifier).cancelBackup(); + final lockManager = _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)); + + lockManager.requestHolderToClose(); + debugPrint("Requested lock holder to close on resume"); + await lockManager.acquireLock(); + debugPrint("Lock acquired for background sync on resume"); final backgroundManager = _ref.read(backgroundSyncProvider); // Ensure proper cleanup before starting new background tasks @@ -130,7 +137,7 @@ class AppLifeCycleNotifier extends StateNotifier { // do not stop/clean up anything on inactivity: issued on every orientation change } - void handleAppPause() { + Future handleAppPause() async { state = AppLifeCycleEnum.paused; _wasPaused = true; @@ -140,6 +147,12 @@ class AppLifeCycleNotifier extends StateNotifier { if (_ref.read(backupProvider.notifier).backupProgress != BackUpProgressEnum.manualInProgress) { _ref.read(backupProvider.notifier).cancelBackup(); } + } else { + final backgroundManager = _ref.read(backgroundSyncProvider); + await backgroundManager.cancel(); + await backgroundManager.cancelLocal(); + _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); + debugPrint("Lock released on app pause"); } _ref.read(websocketProvider.notifier).disconnect(); @@ -173,6 +186,7 @@ class AppLifeCycleNotifier extends StateNotifier { } if (Store.isBetaTimelineEnabled) { + _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); return; } diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index e6e83b64df..1981c45fb1 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,5 +1,6 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart'; +import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/providers/sync_status.provider.dart'; final backgroundSyncProvider = Provider((ref) { @@ -18,3 +19,7 @@ final backgroundSyncProvider = Provider((ref) { ref.onDispose(manager.cancel); return manager; }); + +final isolateLockManagerProvider = Provider.family((ref, name) { + return IsolateLockManager(portName: name); +}); diff --git a/mobile/pigeon/background_worker_api.dart b/mobile/pigeon/background_worker_api.dart index 69684b82b1..193bbc5832 100644 --- a/mobile/pigeon/background_worker_api.dart +++ b/mobile/pigeon/background_worker_api.dart @@ -24,6 +24,7 @@ abstract class BackgroundWorkerBgHostApi { // required platform channels to notify the native side to start the background upload void onInitialized(); + // Called from the background flutter engine to request the native side to cleanup void close(); }