diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index 78ba5b7088..e6ac3eaebd 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -30,9 +30,9 @@ import 'package:immich_mobile/services/upload.service.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; +import 'package:immich_mobile/wm_executor.dart'; import 'package:isar/isar.dart'; import 'package:logging/logging.dart'; -import 'package:worker_manager/worker_manager.dart'; class BackgroundWorkerFgService { final BackgroundWorkerFgHostApi _foregroundHostApi; @@ -94,7 +94,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { await Future.wait( [ loadTranslations(), - workerManager.init(dynamicSpawning: true), + workerManagerPatch.init(dynamicSpawning: true), _ref?.read(authServiceProvider).setOpenApiServiceEndpoint(), // Initialize the file downloader FileDownloader().configure( @@ -193,7 +193,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { _logger.info("Cleaning up background worker"); final cleanupFutures = [ nativeSyncApi?.cancelHashing(), - workerManager.dispose().catchError((_) async { + workerManagerPatch.dispose().catchError((_) async { // Discard any errors on the dispose call return; }), diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 263a5ef769..b1d87b36ab 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:io'; +import 'dart:math'; import 'package:auto_route/auto_route.dart'; import 'package:background_downloader/background_downloader.dart'; @@ -40,10 +41,10 @@ import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; import 'package:immich_mobile/utils/licenses.dart'; import 'package:immich_mobile/utils/migration.dart'; +import 'package:immich_mobile/wm_executor.dart'; import 'package:intl/date_symbol_data_local.dart'; import 'package:logging/logging.dart'; import 'package:timezone/data/latest.dart'; -import 'package:worker_manager/worker_manager.dart'; void main() async { ImmichWidgetsBinding(); @@ -52,7 +53,7 @@ void main() async { await Bootstrap.initDomain(isar, drift, logDb); await initApp(); // Warm-up isolate pool for worker manager - await workerManager.init(dynamicSpawning: true); + await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5)); await migrateDatabaseIfNeeded(isar, drift); HttpSSLOptions.apply(); diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 1ccf00d58b..491e1bf107 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -11,6 +11,7 @@ import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; +import 'package:immich_mobile/wm_executor.dart'; import 'package:logging/logging.dart'; import 'package:worker_manager/worker_manager.dart'; @@ -31,7 +32,7 @@ Cancelable runInIsolateGentle({ throw const InvalidIsolateUsageException(); } - return workerManager.executeGentle((cancelledChecker) async { + return workerManagerPatch.executeGentle((cancelledChecker) async { T? result; await runZonedGuarded( () async { diff --git a/mobile/lib/wm_executor.dart b/mobile/lib/wm_executor.dart new file mode 100644 index 0000000000..73e882e8e6 --- /dev/null +++ b/mobile/lib/wm_executor.dart @@ -0,0 +1,251 @@ +// part of 'package:worker_manager/worker_manager.dart'; +// ignore_for_file: implementation_imports, avoid_print + +import 'dart:async'; +import 'dart:math'; + +import 'package:collection/collection.dart'; +import 'package:flutter/foundation.dart'; +import 'package:worker_manager/src/number_of_processors/processors_io.dart'; +import 'package:worker_manager/src/worker/worker.dart'; +import 'package:worker_manager/worker_manager.dart'; + +final workerManagerPatch = _Executor(); + +// [-2^54; 2^53] is compatible with dart2js, see core.int doc +const _minId = -9007199254740992; +const _maxId = 9007199254740992; + +class Mixinable { + late final itSelf = this as T; +} + +mixin _ExecutorLogger on Mixinable<_Executor> { + var log = false; + + @mustCallSuper + void init() { + logMessage("${itSelf._isolatesCount} workers have been spawned and initialized"); + } + + void logTaskAdded(String uid) { + logMessage("added task with number $uid"); + } + + @mustCallSuper + void dispose() { + logMessage("worker_manager have been disposed"); + } + + @mustCallSuper + void _cancel(Task task) { + logMessage("Task ${task.id} have been canceled"); + } + + void logMessage(String message) { + if (log) print(message); + } +} + +class _Executor extends Mixinable<_Executor> with _ExecutorLogger { + final _queue = PriorityQueue(); + final _pool = []; + var _nextTaskId = _minId; + var _dynamicSpawning = false; + var _isolatesCount = numberOfProcessors; + + @override + Future init({int? isolatesCount, bool? dynamicSpawning}) async { + if (_pool.isNotEmpty) { + print("worker_manager already warmed up, init is ignored. Dispose before init"); + return; + } + if (isolatesCount != null) { + if (isolatesCount < 0) { + throw Exception("isolatesCount must be greater than 0"); + } + + _isolatesCount = isolatesCount; + } + _dynamicSpawning = dynamicSpawning ?? false; + await _ensureWorkersInitialized(); + super.init(); + } + + @override + Future dispose() async { + _queue.clear(); + for (final worker in _pool) { + worker.kill(); + } + _pool.clear(); + super.dispose(); + } + + Cancelable execute(Execute execution, {WorkPriority priority = WorkPriority.immediately}) { + return _createCancelable(execution: execution, priority: priority); + } + + Cancelable executeNow(ExecuteGentle execution) { + final task = TaskGentle( + id: "", + workPriority: WorkPriority.immediately, + execution: execution, + completer: Completer(), + ); + + Future run() async { + try { + final result = await execution(() => task.canceled); + task.complete(result, null, null); + } catch (error, st) { + task.complete(null, error, st); + } + } + + run(); + return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); + } + + Cancelable executeWithPort( + ExecuteWithPort execution, { + WorkPriority priority = WorkPriority.immediately, + required void Function(T value) onMessage, + }) { + return _createCancelable( + execution: execution, + priority: priority, + onMessage: (message) => onMessage(message as T), + ); + } + + Cancelable executeGentle(ExecuteGentle execution, {WorkPriority priority = WorkPriority.immediately}) { + return _createCancelable(execution: execution, priority: priority); + } + + Cancelable executeGentleWithPort( + ExecuteGentleWithPort execution, { + WorkPriority priority = WorkPriority.immediately, + required void Function(T value) onMessage, + }) { + return _createCancelable( + execution: execution, + priority: priority, + onMessage: (message) => onMessage(message as T), + ); + } + + void _createWorkers() { + for (var i = 0; i < _isolatesCount; i++) { + _pool.add(Worker()); + } + } + + Future _initializeWorkers() async { + await Future.wait(_pool.map((e) => e.initialize())); + } + + Cancelable _createCancelable({ + required Function execution, + WorkPriority priority = WorkPriority.immediately, + void Function(Object value)? onMessage, + }) { + if (_nextTaskId + 1 == _maxId) { + _nextTaskId = _minId; + } + final id = _nextTaskId.toString(); + _nextTaskId++; + late final Task task; + final completer = Completer(); + if (execution is Execute) { + task = TaskRegular(id: id, workPriority: priority, execution: execution, completer: completer); + } else if (execution is ExecuteWithPort) { + task = TaskWithPort( + id: id, + workPriority: priority, + execution: execution, + completer: completer, + onMessage: onMessage!, + ); + } else if (execution is ExecuteGentle) { + task = TaskGentle(id: id, workPriority: priority, execution: execution, completer: completer); + } else if (execution is ExecuteGentleWithPort) { + task = TaskGentleWithPort( + id: id, + workPriority: priority, + execution: execution, + completer: completer, + onMessage: onMessage!, + ); + } + _queue.add(task); + _schedule(); + logTaskAdded(task.id); + return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); + } + + Future _ensureWorkersInitialized() async { + if (_pool.isEmpty) { + _createWorkers(); + if (!_dynamicSpawning) { + await _initializeWorkers(); + final poolSize = _pool.length; + final queueSize = _queue.length; + for (int i = 0; i <= min(poolSize, queueSize); i++) { + _schedule(); + } + } + } + if (_pool.every((worker) => worker.taskId != null)) { + return; + } + if (_dynamicSpawning) { + final freeWorker = _pool.firstWhereOrNull( + (worker) => worker.taskId == null && !worker.initialized && !worker.initializing, + ); + await freeWorker?.initialize(); + _schedule(); + } + } + + void _schedule() { + final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized); + if (availableWorker == null) { + _ensureWorkersInitialized(); + return; + } + if (_queue.isEmpty) return; + final task = _queue.removeFirst(); + + availableWorker + .work(task) + .then( + (value) { + //could be completed already by cancel and it is normal. + //Assuming that worker finished with error and cleaned gracefully + task.complete(value, null, null); + }, + onError: (error, st) { + task.complete(null, error, st); + }, + ) + .whenComplete(() { + if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill(); + _schedule(); + }); + } + + @override + void _cancel(Task task) { + task.cancel(); + _queue.remove(task); + final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id); + if (task is Gentle) { + targetWorker?.cancelGentle(); + } else { + targetWorker?.kill(); + if (!_dynamicSpawning) targetWorker?.initialize(); + } + super._cancel(task); + } +}