mirror of
https://github.com/immich-app/immich
synced 2025-10-17 18:19:27 +00:00
Merge a8e47f7c31 into e7d6a066f8
This commit is contained in:
commit
a883eb0d4e
4 changed files with 259 additions and 6 deletions
|
|
@ -30,9 +30,9 @@ import 'package:immich_mobile/services/upload.service.dart';
|
||||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
import 'package:isar/isar.dart';
|
import 'package:isar/isar.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
|
||||||
|
|
||||||
class BackgroundWorkerFgService {
|
class BackgroundWorkerFgService {
|
||||||
final BackgroundWorkerFgHostApi _foregroundHostApi;
|
final BackgroundWorkerFgHostApi _foregroundHostApi;
|
||||||
|
|
@ -94,7 +94,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
||||||
await Future.wait(
|
await Future.wait(
|
||||||
[
|
[
|
||||||
loadTranslations(),
|
loadTranslations(),
|
||||||
workerManager.init(dynamicSpawning: true),
|
workerManagerPatch.init(dynamicSpawning: true),
|
||||||
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
|
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
|
||||||
// Initialize the file downloader
|
// Initialize the file downloader
|
||||||
FileDownloader().configure(
|
FileDownloader().configure(
|
||||||
|
|
@ -193,7 +193,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
||||||
_logger.info("Cleaning up background worker");
|
_logger.info("Cleaning up background worker");
|
||||||
final cleanupFutures = [
|
final cleanupFutures = [
|
||||||
nativeSyncApi?.cancelHashing(),
|
nativeSyncApi?.cancelHashing(),
|
||||||
workerManager.dispose().catchError((_) async {
|
workerManagerPatch.dispose().catchError((_) async {
|
||||||
// Discard any errors on the dispose call
|
// Discard any errors on the dispose call
|
||||||
return;
|
return;
|
||||||
}),
|
}),
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:io';
|
import 'dart:io';
|
||||||
|
import 'dart:math';
|
||||||
|
|
||||||
import 'package:auto_route/auto_route.dart';
|
import 'package:auto_route/auto_route.dart';
|
||||||
import 'package:background_downloader/background_downloader.dart';
|
import 'package:background_downloader/background_downloader.dart';
|
||||||
|
|
@ -40,10 +41,10 @@ import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
import 'package:immich_mobile/utils/licenses.dart';
|
import 'package:immich_mobile/utils/licenses.dart';
|
||||||
import 'package:immich_mobile/utils/migration.dart';
|
import 'package:immich_mobile/utils/migration.dart';
|
||||||
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
import 'package:intl/date_symbol_data_local.dart';
|
import 'package:intl/date_symbol_data_local.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:timezone/data/latest.dart';
|
import 'package:timezone/data/latest.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
|
||||||
|
|
||||||
void main() async {
|
void main() async {
|
||||||
ImmichWidgetsBinding();
|
ImmichWidgetsBinding();
|
||||||
|
|
@ -52,7 +53,7 @@ void main() async {
|
||||||
await Bootstrap.initDomain(isar, drift, logDb);
|
await Bootstrap.initDomain(isar, drift, logDb);
|
||||||
await initApp();
|
await initApp();
|
||||||
// Warm-up isolate pool for worker manager
|
// Warm-up isolate pool for worker manager
|
||||||
await workerManager.init(dynamicSpawning: true);
|
await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5));
|
||||||
await migrateDatabaseIfNeeded(isar, drift);
|
await migrateDatabaseIfNeeded(isar, drift);
|
||||||
HttpSSLOptions.apply();
|
HttpSSLOptions.apply();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
import 'package:worker_manager/worker_manager.dart';
|
||||||
|
|
||||||
|
|
@ -31,7 +32,7 @@ Cancelable<T?> runInIsolateGentle<T>({
|
||||||
throw const InvalidIsolateUsageException();
|
throw const InvalidIsolateUsageException();
|
||||||
}
|
}
|
||||||
|
|
||||||
return workerManager.executeGentle((cancelledChecker) async {
|
return workerManagerPatch.executeGentle((cancelledChecker) async {
|
||||||
T? result;
|
T? result;
|
||||||
await runZonedGuarded(
|
await runZonedGuarded(
|
||||||
() async {
|
() async {
|
||||||
|
|
|
||||||
251
mobile/lib/wm_executor.dart
Normal file
251
mobile/lib/wm_executor.dart
Normal file
|
|
@ -0,0 +1,251 @@
|
||||||
|
// part of 'package:worker_manager/worker_manager.dart';
|
||||||
|
// ignore_for_file: implementation_imports, avoid_print
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
import 'dart:math';
|
||||||
|
|
||||||
|
import 'package:collection/collection.dart';
|
||||||
|
import 'package:flutter/foundation.dart';
|
||||||
|
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
|
||||||
|
import 'package:worker_manager/src/worker/worker.dart';
|
||||||
|
import 'package:worker_manager/worker_manager.dart';
|
||||||
|
|
||||||
|
final workerManagerPatch = _Executor();
|
||||||
|
|
||||||
|
// [-2^54; 2^53] is compatible with dart2js, see core.int doc
|
||||||
|
const _minId = -9007199254740992;
|
||||||
|
const _maxId = 9007199254740992;
|
||||||
|
|
||||||
|
class Mixinable<T> {
|
||||||
|
late final itSelf = this as T;
|
||||||
|
}
|
||||||
|
|
||||||
|
mixin _ExecutorLogger on Mixinable<_Executor> {
|
||||||
|
var log = false;
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
void init() {
|
||||||
|
logMessage("${itSelf._isolatesCount} workers have been spawned and initialized");
|
||||||
|
}
|
||||||
|
|
||||||
|
void logTaskAdded<R>(String uid) {
|
||||||
|
logMessage("added task with number $uid");
|
||||||
|
}
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
void dispose() {
|
||||||
|
logMessage("worker_manager have been disposed");
|
||||||
|
}
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
void _cancel(Task task) {
|
||||||
|
logMessage("Task ${task.id} have been canceled");
|
||||||
|
}
|
||||||
|
|
||||||
|
void logMessage(String message) {
|
||||||
|
if (log) print(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
||||||
|
final _queue = PriorityQueue<Task>();
|
||||||
|
final _pool = <Worker>[];
|
||||||
|
var _nextTaskId = _minId;
|
||||||
|
var _dynamicSpawning = false;
|
||||||
|
var _isolatesCount = numberOfProcessors;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
|
||||||
|
if (_pool.isNotEmpty) {
|
||||||
|
print("worker_manager already warmed up, init is ignored. Dispose before init");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (isolatesCount != null) {
|
||||||
|
if (isolatesCount < 0) {
|
||||||
|
throw Exception("isolatesCount must be greater than 0");
|
||||||
|
}
|
||||||
|
|
||||||
|
_isolatesCount = isolatesCount;
|
||||||
|
}
|
||||||
|
_dynamicSpawning = dynamicSpawning ?? false;
|
||||||
|
await _ensureWorkersInitialized();
|
||||||
|
super.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> dispose() async {
|
||||||
|
_queue.clear();
|
||||||
|
for (final worker in _pool) {
|
||||||
|
worker.kill();
|
||||||
|
}
|
||||||
|
_pool.clear();
|
||||||
|
super.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
||||||
|
return _createCancelable<R>(execution: execution, priority: priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
|
||||||
|
final task = TaskGentle<R>(
|
||||||
|
id: "",
|
||||||
|
workPriority: WorkPriority.immediately,
|
||||||
|
execution: execution,
|
||||||
|
completer: Completer<R>(),
|
||||||
|
);
|
||||||
|
|
||||||
|
Future<void> run() async {
|
||||||
|
try {
|
||||||
|
final result = await execution(() => task.canceled);
|
||||||
|
task.complete(result, null, null);
|
||||||
|
} catch (error, st) {
|
||||||
|
task.complete(null, error, st);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
run();
|
||||||
|
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> executeWithPort<R, T>(
|
||||||
|
ExecuteWithPort<R> execution, {
|
||||||
|
WorkPriority priority = WorkPriority.immediately,
|
||||||
|
required void Function(T value) onMessage,
|
||||||
|
}) {
|
||||||
|
return _createCancelable<R>(
|
||||||
|
execution: execution,
|
||||||
|
priority: priority,
|
||||||
|
onMessage: (message) => onMessage(message as T),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
||||||
|
return _createCancelable<R>(execution: execution, priority: priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> executeGentleWithPort<R, T>(
|
||||||
|
ExecuteGentleWithPort<R> execution, {
|
||||||
|
WorkPriority priority = WorkPriority.immediately,
|
||||||
|
required void Function(T value) onMessage,
|
||||||
|
}) {
|
||||||
|
return _createCancelable<R>(
|
||||||
|
execution: execution,
|
||||||
|
priority: priority,
|
||||||
|
onMessage: (message) => onMessage(message as T),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
void _createWorkers() {
|
||||||
|
for (var i = 0; i < _isolatesCount; i++) {
|
||||||
|
_pool.add(Worker());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _initializeWorkers() async {
|
||||||
|
await Future.wait(_pool.map((e) => e.initialize()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Cancelable<R> _createCancelable<R>({
|
||||||
|
required Function execution,
|
||||||
|
WorkPriority priority = WorkPriority.immediately,
|
||||||
|
void Function(Object value)? onMessage,
|
||||||
|
}) {
|
||||||
|
if (_nextTaskId + 1 == _maxId) {
|
||||||
|
_nextTaskId = _minId;
|
||||||
|
}
|
||||||
|
final id = _nextTaskId.toString();
|
||||||
|
_nextTaskId++;
|
||||||
|
late final Task<R> task;
|
||||||
|
final completer = Completer<R>();
|
||||||
|
if (execution is Execute<R>) {
|
||||||
|
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
||||||
|
} else if (execution is ExecuteWithPort<R>) {
|
||||||
|
task = TaskWithPort<R>(
|
||||||
|
id: id,
|
||||||
|
workPriority: priority,
|
||||||
|
execution: execution,
|
||||||
|
completer: completer,
|
||||||
|
onMessage: onMessage!,
|
||||||
|
);
|
||||||
|
} else if (execution is ExecuteGentle<R>) {
|
||||||
|
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
||||||
|
} else if (execution is ExecuteGentleWithPort<R>) {
|
||||||
|
task = TaskGentleWithPort<R>(
|
||||||
|
id: id,
|
||||||
|
workPriority: priority,
|
||||||
|
execution: execution,
|
||||||
|
completer: completer,
|
||||||
|
onMessage: onMessage!,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_queue.add(task);
|
||||||
|
_schedule();
|
||||||
|
logTaskAdded(task.id);
|
||||||
|
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _ensureWorkersInitialized() async {
|
||||||
|
if (_pool.isEmpty) {
|
||||||
|
_createWorkers();
|
||||||
|
if (!_dynamicSpawning) {
|
||||||
|
await _initializeWorkers();
|
||||||
|
final poolSize = _pool.length;
|
||||||
|
final queueSize = _queue.length;
|
||||||
|
for (int i = 0; i <= min(poolSize, queueSize); i++) {
|
||||||
|
_schedule();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (_pool.every((worker) => worker.taskId != null)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_dynamicSpawning) {
|
||||||
|
final freeWorker = _pool.firstWhereOrNull(
|
||||||
|
(worker) => worker.taskId == null && !worker.initialized && !worker.initializing,
|
||||||
|
);
|
||||||
|
await freeWorker?.initialize();
|
||||||
|
_schedule();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _schedule() {
|
||||||
|
final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized);
|
||||||
|
if (availableWorker == null) {
|
||||||
|
_ensureWorkersInitialized();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_queue.isEmpty) return;
|
||||||
|
final task = _queue.removeFirst();
|
||||||
|
|
||||||
|
availableWorker
|
||||||
|
.work(task)
|
||||||
|
.then(
|
||||||
|
(value) {
|
||||||
|
//could be completed already by cancel and it is normal.
|
||||||
|
//Assuming that worker finished with error and cleaned gracefully
|
||||||
|
task.complete(value, null, null);
|
||||||
|
},
|
||||||
|
onError: (error, st) {
|
||||||
|
task.complete(null, error, st);
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.whenComplete(() {
|
||||||
|
if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill();
|
||||||
|
_schedule();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void _cancel(Task task) {
|
||||||
|
task.cancel();
|
||||||
|
_queue.remove(task);
|
||||||
|
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
|
||||||
|
if (task is Gentle) {
|
||||||
|
targetWorker?.cancelGentle();
|
||||||
|
} else {
|
||||||
|
targetWorker?.kill();
|
||||||
|
if (!_dynamicSpawning) targetWorker?.initialize();
|
||||||
|
}
|
||||||
|
super._cancel(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue