mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
fix: android crash on app pause (#21768)
* revert service locks * rename backgroundWorkerFgServiceProvider * refactor: parallel background worker init (#21769) * refactor: parallel background worker init * fix: hashing not running from the background engine (#21773) * init and dispose workmanager from background engine * log message contend --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com> --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com> --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com>
This commit is contained in:
parent
2c7b980eed
commit
e239b8d2fa
9 changed files with 44 additions and 343 deletions
|
|
@ -5,7 +5,6 @@ import 'package:background_downloader/background_downloader.dart';
|
|||
import 'package:flutter/material.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/constants/constants.dart';
|
||||
import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
|
||||
import 'package:immich_mobile/platform/background_worker_api.g.dart';
|
||||
|
|
@ -24,6 +23,7 @@ import 'package:immich_mobile/utils/bootstrap.dart';
|
|||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||
import 'package:isar/isar.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:worker_manager/worker_manager.dart';
|
||||
|
||||
class BackgroundWorkerFgService {
|
||||
final BackgroundWorkerFgHostApi _foregroundHostApi;
|
||||
|
|
@ -42,8 +42,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
final Drift _drift;
|
||||
final DriftLogger _driftLogger;
|
||||
final BackgroundWorkerBgHostApi _backgroundHostApi;
|
||||
final Logger _logger = Logger('BackgroundUploadBgService');
|
||||
late final IsolateLockManager _lockManager;
|
||||
final Logger _logger = Logger('BackgroundWorkerBgService');
|
||||
|
||||
bool _isCleanedUp = false;
|
||||
|
||||
|
|
@ -59,7 +58,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
driftProvider.overrideWith(driftOverride(drift)),
|
||||
],
|
||||
);
|
||||
_lockManager = IsolateLockManager(onCloseRequest: _cleanup);
|
||||
BackgroundWorkerFlutterApi.setUp(this);
|
||||
}
|
||||
|
||||
|
|
@ -67,41 +65,30 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
|
||||
Future<void> init() async {
|
||||
try {
|
||||
await loadTranslations();
|
||||
HttpSSLOptions.apply(applyNative: false);
|
||||
await _ref.read(authServiceProvider).setOpenApiServiceEndpoint();
|
||||
|
||||
// Initialize the file downloader
|
||||
await FileDownloader().configure(
|
||||
globalConfig: [
|
||||
// maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3
|
||||
(Config.holdingQueue, (6, 6, 3)),
|
||||
// On Android, if files are larger than 256MB, run in foreground service
|
||||
(Config.runInForegroundIfFileLargerThan, 256),
|
||||
],
|
||||
);
|
||||
await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false);
|
||||
await FileDownloader().trackTasks();
|
||||
await Future.wait([
|
||||
loadTranslations(),
|
||||
workerManager.init(dynamicSpawning: true),
|
||||
_ref.read(authServiceProvider).setOpenApiServiceEndpoint(),
|
||||
// Initialize the file downloader
|
||||
FileDownloader().configure(
|
||||
globalConfig: [
|
||||
// maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3
|
||||
(Config.holdingQueue, (6, 6, 3)),
|
||||
// On Android, if files are larger than 256MB, run in foreground service
|
||||
(Config.runInForegroundIfFileLargerThan, 256),
|
||||
],
|
||||
),
|
||||
FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false),
|
||||
FileDownloader().trackTasks(),
|
||||
_ref.read(fileMediaRepositoryProvider).enableBackgroundAccess(),
|
||||
]);
|
||||
|
||||
configureFileDownloaderNotifications();
|
||||
await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess();
|
||||
|
||||
// Notify the host that the background upload service has been initialized and is ready to use
|
||||
debugPrint("Acquiring background worker lock");
|
||||
if (await _lockManager.acquireLock().timeout(
|
||||
const Duration(seconds: 5),
|
||||
onTimeout: () {
|
||||
_lockManager.cancel();
|
||||
return false;
|
||||
},
|
||||
)) {
|
||||
_logger.info("Acquired background worker lock");
|
||||
await _backgroundHostApi.onInitialized();
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.warning("Failed to acquire background worker lock");
|
||||
await _cleanup();
|
||||
await _backgroundHostApi.close();
|
||||
// Notify the host that the background worker service has been initialized and is ready to use
|
||||
_backgroundHostApi.onInitialized();
|
||||
} catch (error, stack) {
|
||||
_logger.severe("Failed to initialize background worker", error, stack);
|
||||
_backgroundHostApi.close();
|
||||
|
|
@ -170,6 +157,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
_isCleanedUp = true;
|
||||
_logger.info("Cleaning up background worker");
|
||||
final cleanupFutures = [
|
||||
workerManager.dispose(),
|
||||
_drift.close(),
|
||||
_driftLogger.close(),
|
||||
_ref.read(backgroundSyncProvider).cancel(),
|
||||
|
|
@ -180,8 +168,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
cleanupFutures.add(_isar.close());
|
||||
}
|
||||
_ref.dispose();
|
||||
_lockManager.releaseLock();
|
||||
|
||||
await Future.wait(cleanupFutures);
|
||||
_logger.info("Background worker resources cleaned up");
|
||||
} catch (error, stack) {
|
||||
|
|
@ -191,22 +177,29 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||
|
||||
Future<void> _handleBackup({bool processBulk = true}) async {
|
||||
if (!_isBackupEnabled) {
|
||||
_logger.info("[_handleBackup 1] Backup is disabled. Skipping backup routine");
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.info("[_handleBackup 2] Enqueuing assets for backup from the background service");
|
||||
|
||||
final currentUser = _ref.read(currentUserProvider);
|
||||
if (currentUser == null) {
|
||||
_logger.warning("[_handleBackup 3] No current user found. Skipping backup from background");
|
||||
return;
|
||||
}
|
||||
|
||||
if (processBulk) {
|
||||
_logger.info("[_handleBackup 4] Resume backup from background");
|
||||
return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
|
||||
}
|
||||
|
||||
final activeTask = await _ref.read(uploadServiceProvider).getActiveTasks(currentUser.id);
|
||||
if (activeTask.isNotEmpty) {
|
||||
_logger.info("[_handleBackup 5] Resuming backup for active tasks from background");
|
||||
await _ref.read(uploadServiceProvider).resumeBackup();
|
||||
} else {
|
||||
_logger.info("[_handleBackup 6] Starting serial backup for new tasks from background");
|
||||
await _ref.read(uploadServiceProvider).startBackupSerial(currentUser.id);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,235 +0,0 @@
|
|||
import 'dart:isolate';
|
||||
import 'dart:ui';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
|
||||
const String kIsolateLockManagerPort = "immich://isolate_mutex";
|
||||
|
||||
enum _LockStatus { active, released }
|
||||
|
||||
class _IsolateRequest {
|
||||
const _IsolateRequest();
|
||||
}
|
||||
|
||||
class _HeartbeatRequest extends _IsolateRequest {
|
||||
// Port for the receiver to send replies back
|
||||
final SendPort sendPort;
|
||||
|
||||
const _HeartbeatRequest(this.sendPort);
|
||||
|
||||
Map<String, dynamic> toJson() {
|
||||
return {'type': 'heartbeat', 'sendPort': sendPort};
|
||||
}
|
||||
}
|
||||
|
||||
class _CloseRequest extends _IsolateRequest {
|
||||
const _CloseRequest();
|
||||
|
||||
Map<String, dynamic> toJson() {
|
||||
return {'type': 'close'};
|
||||
}
|
||||
}
|
||||
|
||||
class _IsolateResponse {
|
||||
const _IsolateResponse();
|
||||
}
|
||||
|
||||
class _HeartbeatResponse extends _IsolateResponse {
|
||||
final _LockStatus status;
|
||||
|
||||
const _HeartbeatResponse(this.status);
|
||||
|
||||
Map<String, dynamic> toJson() {
|
||||
return {'type': 'heartbeat', 'status': status.index};
|
||||
}
|
||||
}
|
||||
|
||||
typedef OnCloseLockHolderRequest = void Function();
|
||||
|
||||
class IsolateLockManager {
|
||||
final String _portName;
|
||||
bool _hasLock = false;
|
||||
ReceivePort? _receivePort;
|
||||
final OnCloseLockHolderRequest? _onCloseRequest;
|
||||
final Set<SendPort> _waitingIsolates = {};
|
||||
// Token object - a new one is created for each acquisition attempt
|
||||
Object? _currentAcquisitionToken;
|
||||
|
||||
IsolateLockManager({String? portName, OnCloseLockHolderRequest? onCloseRequest})
|
||||
: _portName = portName ?? kIsolateLockManagerPort,
|
||||
_onCloseRequest = onCloseRequest;
|
||||
|
||||
Future<bool> acquireLock() async {
|
||||
if (_hasLock) {
|
||||
Logger('BackgroundWorkerLockManager').warning("WARNING: [acquireLock] called more than once");
|
||||
return true;
|
||||
}
|
||||
|
||||
// Create a new token - this invalidates any previous attempt
|
||||
final token = _currentAcquisitionToken = Object();
|
||||
|
||||
final ReceivePort rp = _receivePort = ReceivePort(_portName);
|
||||
final SendPort sp = rp.sendPort;
|
||||
|
||||
while (!IsolateNameServer.registerPortWithName(sp, _portName)) {
|
||||
// This attempt was superseded by a newer one in the same isolate
|
||||
if (_currentAcquisitionToken != token) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await _lockReleasedByHolder(token);
|
||||
}
|
||||
|
||||
_hasLock = true;
|
||||
rp.listen(_onRequest);
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<void> _lockReleasedByHolder(Object token) async {
|
||||
SendPort? holder = IsolateNameServer.lookupPortByName(_portName);
|
||||
debugPrint("Found lock holder: $holder");
|
||||
if (holder == null) {
|
||||
// No holder, try and acquire lock
|
||||
return;
|
||||
}
|
||||
|
||||
final ReceivePort tempRp = ReceivePort();
|
||||
final SendPort tempSp = tempRp.sendPort;
|
||||
final bs = tempRp.asBroadcastStream();
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
// Send a heartbeat request with the send port to receive reply from the holder
|
||||
|
||||
debugPrint("Sending heartbeat request to lock holder");
|
||||
holder.send(_HeartbeatRequest(tempSp).toJson());
|
||||
dynamic answer = await bs.first.timeout(const Duration(seconds: 3), onTimeout: () => null);
|
||||
|
||||
debugPrint("Received heartbeat response from lock holder: $answer");
|
||||
// This attempt was superseded by a newer one in the same isolate
|
||||
if (_currentAcquisitionToken != token) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (answer == null) {
|
||||
// Holder failed, most likely killed without calling releaseLock
|
||||
// Check if a different waiting isolate took the lock
|
||||
if (holder == IsolateNameServer.lookupPortByName(_portName)) {
|
||||
// No, remove the stale lock
|
||||
IsolateNameServer.removePortNameMapping(_portName);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Unknown message type received for heartbeat request. Try again
|
||||
_IsolateResponse? response = _parseResponse(answer);
|
||||
if (response == null || response is! _HeartbeatResponse) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (response.status == _LockStatus.released) {
|
||||
// Holder has released the lock
|
||||
break;
|
||||
}
|
||||
|
||||
// If the _LockStatus is active, we check again if the task completed
|
||||
// by sending a released messaged again, if not, send a new heartbeat again
|
||||
|
||||
// Check if the holder completed its task after the heartbeat
|
||||
answer = await bs.first.timeout(
|
||||
const Duration(seconds: 3),
|
||||
onTimeout: () => const _HeartbeatResponse(_LockStatus.active).toJson(),
|
||||
);
|
||||
|
||||
response = _parseResponse(answer);
|
||||
if (response is _HeartbeatResponse && response.status == _LockStatus.released) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Timeout or error
|
||||
} finally {
|
||||
tempRp.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
_IsolateRequest? _parseRequest(dynamic msg) {
|
||||
if (msg is! Map<String, dynamic>) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return switch (msg['type']) {
|
||||
'heartbeat' => _HeartbeatRequest(msg['sendPort']),
|
||||
'close' => const _CloseRequest(),
|
||||
_ => null,
|
||||
};
|
||||
}
|
||||
|
||||
_IsolateResponse? _parseResponse(dynamic msg) {
|
||||
if (msg is! Map<String, dynamic>) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return switch (msg['type']) {
|
||||
'heartbeat' => _HeartbeatResponse(_LockStatus.values[msg['status']]),
|
||||
_ => null,
|
||||
};
|
||||
}
|
||||
|
||||
// Executed in the isolate with the lock
|
||||
void _onRequest(dynamic msg) {
|
||||
final request = _parseRequest(msg);
|
||||
if (request == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (request is _HeartbeatRequest) {
|
||||
// Add the send port to the list of waiting isolates
|
||||
_waitingIsolates.add(request.sendPort);
|
||||
request.sendPort.send(const _HeartbeatResponse(_LockStatus.active).toJson());
|
||||
return;
|
||||
}
|
||||
|
||||
if (request is _CloseRequest) {
|
||||
_onCloseRequest?.call();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void releaseLock() {
|
||||
if (_hasLock) {
|
||||
IsolateNameServer.removePortNameMapping(_portName);
|
||||
|
||||
// Notify waiting isolates
|
||||
for (final port in _waitingIsolates) {
|
||||
port.send(const _HeartbeatResponse(_LockStatus.released).toJson());
|
||||
}
|
||||
_waitingIsolates.clear();
|
||||
|
||||
_hasLock = false;
|
||||
}
|
||||
|
||||
_receivePort?.close();
|
||||
_receivePort = null;
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
if (_hasLock) {
|
||||
return;
|
||||
}
|
||||
|
||||
debugPrint("Cancelling ongoing acquire lock attempts");
|
||||
// Create a new token to invalidate ongoing acquire lock attempts
|
||||
_currentAcquisitionToken = Object();
|
||||
}
|
||||
|
||||
void requestHolderToClose() {
|
||||
if (_hasLock) {
|
||||
return;
|
||||
}
|
||||
|
||||
IsolateNameServer.lookupPortByName(_portName)?.send(const _CloseRequest().toJson());
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue