fix: concurrency issue (#21830)

This commit is contained in:
Alex 2025-09-11 14:02:03 -05:00 committed by GitHub
parent 722a464e23
commit 42a03f2556
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 90 additions and 73 deletions

View file

@ -169,7 +169,10 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
} }
try { try {
final backgroundSyncManager = _ref.read(backgroundSyncProvider);
_isCleanedUp = true; _isCleanedUp = true;
_ref.dispose();
_cancellationToken.cancel(); _cancellationToken.cancel();
_logger.info("Cleaning up background worker"); _logger.info("Cleaning up background worker");
final cleanupFutures = [ final cleanupFutures = [
@ -179,14 +182,13 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
}), }),
_drift.close(), _drift.close(),
_driftLogger.close(), _driftLogger.close(),
_ref.read(backgroundSyncProvider).cancel(), backgroundSyncManager.cancel(),
_ref.read(backgroundSyncProvider).cancelLocal(), backgroundSyncManager.cancelLocal(),
]; ];
if (_isar.isOpen) { if (_isar.isOpen) {
cleanupFutures.add(_isar.close()); cleanupFutures.add(_isar.close());
} }
_ref.dispose();
await Future.wait(cleanupFutures); await Future.wait(cleanupFutures);
_logger.info("Background worker resources cleaned up"); _logger.info("Background worker resources cleaned up");
} catch (error, stack) { } catch (error, stack) {
@ -195,35 +197,42 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
} }
Future<void> _handleBackup() async { Future<void> _handleBackup() async {
if (!_isBackupEnabled || _isCleanedUp) { await runZonedGuarded(
_logger.info("[_handleBackup 1] Backup is disabled. Skipping backup routine"); () async {
return; if (!_isBackupEnabled || _isCleanedUp) {
} _logger.info("[_handleBackup 1] Backup is disabled. Skipping backup routine");
return;
}
_logger.info("[_handleBackup 2] Enqueuing assets for backup from the background service"); _logger.info("[_handleBackup 2] Enqueuing assets for backup from the background service");
final currentUser = _ref.read(currentUserProvider); final currentUser = _ref.read(currentUserProvider);
if (currentUser == null) { if (currentUser == null) {
_logger.warning("[_handleBackup 3] No current user found. Skipping backup from background"); _logger.warning("[_handleBackup 3] No current user found. Skipping backup from background");
return; return;
} }
_logger.info("[_handleBackup 4] Resume backup from background"); _logger.info("[_handleBackup 4] Resume backup from background");
if (Platform.isIOS) { if (Platform.isIOS) {
return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id); return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
} }
final canPing = await _ref.read(serverInfoServiceProvider).ping(); final canPing = await _ref.read(serverInfoServiceProvider).ping();
if (!canPing) { if (!canPing) {
_logger.warning("[_handleBackup 5] Server is not reachable. Skipping backup from background"); _logger.warning("[_handleBackup 5] Server is not reachable. Skipping backup from background");
return; return;
} }
final networkCapabilities = await _ref.read(connectivityApiProvider).getCapabilities(); final networkCapabilities = await _ref.read(connectivityApiProvider).getCapabilities();
return _ref return _ref
.read(uploadServiceProvider) .read(uploadServiceProvider)
.startBackupWithHttpClient(currentUser.id, networkCapabilities.hasWifi, _cancellationToken); .startBackupWithHttpClient(currentUser.id, networkCapabilities.hasWifi, _cancellationToken);
},
(error, stack) {
debugPrint("Error in backup zone $error, $stack");
},
);
} }
Future<void> _syncAssets({Duration? hashTimeout}) async { Future<void> _syncAssets({Duration? hashTimeout}) async {

View file

@ -1,9 +1,10 @@
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/domain/services/sync_linked_album.service.dart'; import 'package:immich_mobile/domain/services/sync_linked_album.service.dart';
import 'package:immich_mobile/providers/user.provider.dart'; import 'package:immich_mobile/entities/store.entity.dart';
Future<void> syncLinkedAlbumsIsolated(ProviderContainer ref) { Future<void> syncLinkedAlbumsIsolated(ProviderContainer ref) {
final user = ref.read(currentUserProvider); final user = Store.tryGet(StoreKey.currentUser);
if (user == null) { if (user == null) {
return Future.value(); return Future.value();
} }

View file

@ -46,7 +46,7 @@ void main() async {
await Bootstrap.initDomain(isar, drift, logDb); await Bootstrap.initDomain(isar, drift, logDb);
await initApp(); await initApp();
// Warm-up isolate pool for worker manager // Warm-up isolate pool for worker manager
await workerManager.init(dynamicSpawning: false); await workerManager.init(dynamicSpawning: true);
await migrateDatabaseIfNeeded(isar, drift); await migrateDatabaseIfNeeded(isar, drift);
HttpSSLOptions.apply(); HttpSSLOptions.apply();

View file

@ -31,55 +31,62 @@ Cancelable<T?> runInIsolateGentle<T>({
} }
return workerManager.executeGentle((cancelledChecker) async { return workerManager.executeGentle((cancelledChecker) async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token); await runZonedGuarded(
DartPluginRegistrant.ensureInitialized(); () async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
final (isar, drift, logDb) = await Bootstrap.initDB(); final (isar, drift, logDb) = await Bootstrap.initDB();
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false); await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false);
final ref = ProviderContainer( final ref = ProviderContainer(
overrides: [ overrides: [
// TODO: Remove once isar is removed // TODO: Remove once isar is removed
dbProvider.overrideWithValue(isar), dbProvider.overrideWithValue(isar),
isarProvider.overrideWithValue(isar), isarProvider.overrideWithValue(isar),
cancellationProvider.overrideWithValue(cancelledChecker), cancellationProvider.overrideWithValue(cancelledChecker),
driftProvider.overrideWith(driftOverride(drift)), driftProvider.overrideWith(driftOverride(drift)),
], ],
); );
Logger log = Logger("IsolateLogger"); Logger log = Logger("IsolateLogger");
try {
HttpSSLOptions.apply(applyNative: false);
return await computation(ref);
} on CanceledError {
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
} catch (error, stack) {
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
} finally {
try {
await LogService.I.dispose();
await logDb.close();
await ref.read(driftProvider).close();
// Close Isar safely
try { try {
final isar = ref.read(isarProvider); HttpSSLOptions.apply(applyNative: false);
if (isar.isOpen) { return await computation(ref);
await isar.close(); } on CanceledError {
} log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
} catch (e) { } catch (error, stack) {
debugPrint("Error closing Isar: $e"); log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
} } finally {
try {
ref.dispose();
ref.dispose(); await LogService.I.dispose();
} catch (error, stack) { await logDb.close();
debugPrint("Error closing resources in isolate: $error, $stack"); await drift.close();
} finally {
ref.dispose(); // Close Isar safely
// Delay to ensure all resources are released try {
await Future.delayed(const Duration(seconds: 2)); if (isar.isOpen) {
} await isar.close();
} }
} catch (e) {
debugPrint("Error closing Isar: $e");
}
} catch (error, stack) {
debugPrint("Error closing resources in isolate: $error, $stack");
} finally {
ref.dispose();
// Delay to ensure all resources are released
await Future.delayed(const Duration(seconds: 2));
}
}
return null;
},
(error, stack) {
debugPrint("Error in isolate zone: $error, $stack");
},
);
return null; return null;
}); });
} }