mirror of
https://github.com/immich-app/immich
synced 2025-11-07 17:27:20 +00:00
feat: beta background sync (#21243)
* feat: ios background sync # Conflicts: # mobile/ios/Runner/Info.plist * feat: Android sync * add local sync worker and rename stuff * group upload notifications * uncomment onresume beta handling * rename methods --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com>
This commit is contained in:
parent
e78144ea31
commit
0df88fc22b
28 changed files with 1933 additions and 81 deletions
232
mobile/lib/domain/services/background_worker.service.dart
Normal file
232
mobile/lib/domain/services/background_worker.service.dart
Normal file
|
|
@ -0,0 +1,232 @@
|
|||
import 'dart:async';
|
||||
import 'dart:ui';
|
||||
|
||||
import 'package:background_downloader/background_downloader.dart';
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/constants/constants.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
|
||||
import 'package:immich_mobile/platform/background_worker_api.g.dart';
|
||||
import 'package:immich_mobile/providers/app_settings.provider.dart';
|
||||
import 'package:immich_mobile/providers/background_sync.provider.dart';
|
||||
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
|
||||
import 'package:immich_mobile/providers/db.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||
import 'package:immich_mobile/providers/user.provider.dart';
|
||||
import 'package:immich_mobile/services/app_settings.service.dart';
|
||||
import 'package:immich_mobile/services/auth.service.dart';
|
||||
import 'package:immich_mobile/services/localization.service.dart';
|
||||
import 'package:immich_mobile/services/upload.service.dart';
|
||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||
import 'package:isar/isar.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
|
||||
class BackgroundWorkerFgService {
|
||||
final BackgroundWorkerFgHostApi _foregroundHostApi;
|
||||
|
||||
const BackgroundWorkerFgService(this._foregroundHostApi);
|
||||
|
||||
// TODO: Move this call to native side once old timeline is removed
|
||||
Future<void> enableSyncService() => _foregroundHostApi.enableSyncWorker();
|
||||
|
||||
Future<void> enableUploadService() => _foregroundHostApi.enableUploadWorker(
|
||||
PluginUtilities.getCallbackHandle(_backgroundSyncNativeEntrypoint)!.toRawHandle(),
|
||||
);
|
||||
|
||||
Future<void> disableUploadService() => _foregroundHostApi.disableUploadWorker();
|
||||
}
|
||||
|
||||
class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
||||
late final ProviderContainer _ref;
|
||||
final Isar _isar;
|
||||
final Drift _drift;
|
||||
final DriftLogger _driftLogger;
|
||||
final BackgroundWorkerBgHostApi _backgroundHostApi;
|
||||
final Logger _logger = Logger('BackgroundUploadBgService');
|
||||
|
||||
bool _isCleanedUp = false;
|
||||
|
||||
BackgroundWorkerBgService({required Isar isar, required Drift drift, required DriftLogger driftLogger})
|
||||
: _isar = isar,
|
||||
_drift = drift,
|
||||
_driftLogger = driftLogger,
|
||||
_backgroundHostApi = BackgroundWorkerBgHostApi() {
|
||||
_ref = ProviderContainer(
|
||||
overrides: [
|
||||
dbProvider.overrideWithValue(isar),
|
||||
isarProvider.overrideWithValue(isar),
|
||||
driftProvider.overrideWith(driftOverride(drift)),
|
||||
],
|
||||
);
|
||||
BackgroundWorkerFlutterApi.setUp(this);
|
||||
}
|
||||
|
||||
bool get _isBackupEnabled => _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup);
|
||||
|
||||
Future<void> init() async {
|
||||
await loadTranslations();
|
||||
HttpSSLOptions.apply(applyNative: false);
|
||||
await _ref.read(authServiceProvider).setOpenApiServiceEndpoint();
|
||||
|
||||
// Initialize the file downloader
|
||||
await FileDownloader().configure(
|
||||
globalConfig: [
|
||||
// maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3
|
||||
(Config.holdingQueue, (6, 6, 3)),
|
||||
// On Android, if files are larger than 256MB, run in foreground service
|
||||
(Config.runInForegroundIfFileLargerThan, 256),
|
||||
],
|
||||
);
|
||||
await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false);
|
||||
await FileDownloader().trackTasks();
|
||||
configureFileDownloaderNotifications();
|
||||
|
||||
// Notify the host that the background upload service has been initialized and is ready to use
|
||||
await _backgroundHostApi.onInitialized();
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> onLocalSync(int? maxSeconds) async {
|
||||
_logger.info('Local background syncing started');
|
||||
final sw = Stopwatch()..start();
|
||||
|
||||
final timeout = maxSeconds != null ? Duration(seconds: maxSeconds) : null;
|
||||
await _syncAssets(hashTimeout: timeout, syncRemote: false);
|
||||
|
||||
sw.stop();
|
||||
_logger.info("Local sync completed in ${sw.elapsed.inSeconds}s");
|
||||
}
|
||||
|
||||
/* We do the following on Android upload
|
||||
* - Sync local assets
|
||||
* - Hash local assets 3 / 6 minutes
|
||||
* - Sync remote assets
|
||||
* - Check and requeue upload tasks
|
||||
*/
|
||||
@override
|
||||
Future<void> onAndroidUpload() async {
|
||||
_logger.info('Android background processing started');
|
||||
final sw = Stopwatch()..start();
|
||||
|
||||
await _syncAssets(hashTimeout: Duration(minutes: _isBackupEnabled ? 3 : 6));
|
||||
await _handleBackup(processBulk: false);
|
||||
|
||||
await _cleanup();
|
||||
|
||||
sw.stop();
|
||||
_logger.info("Android background processing completed in ${sw.elapsed.inSeconds}s");
|
||||
}
|
||||
|
||||
/* We do the following on background upload
|
||||
* - Sync local assets
|
||||
* - Hash local assets
|
||||
* - Sync remote assets
|
||||
* - Check and requeue upload tasks
|
||||
*
|
||||
* The native side will not send the maxSeconds value for processing tasks
|
||||
*/
|
||||
@override
|
||||
Future<void> onIosUpload(bool isRefresh, int? maxSeconds) async {
|
||||
_logger.info('iOS background upload started with maxSeconds: ${maxSeconds}s');
|
||||
final sw = Stopwatch()..start();
|
||||
|
||||
final timeout = isRefresh ? const Duration(seconds: 5) : Duration(minutes: _isBackupEnabled ? 3 : 6);
|
||||
await _syncAssets(hashTimeout: timeout);
|
||||
|
||||
final backupFuture = _handleBackup();
|
||||
if (maxSeconds != null) {
|
||||
await backupFuture.timeout(Duration(seconds: maxSeconds - 1), onTimeout: () {});
|
||||
} else {
|
||||
await backupFuture;
|
||||
}
|
||||
|
||||
await _cleanup();
|
||||
|
||||
sw.stop();
|
||||
_logger.info("iOS background upload completed in ${sw.elapsed.inSeconds}s");
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> cancel() async {
|
||||
_logger.warning("Background upload cancelled");
|
||||
await _cleanup();
|
||||
}
|
||||
|
||||
Future<void> _cleanup() async {
|
||||
if (_isCleanedUp) {
|
||||
return;
|
||||
}
|
||||
|
||||
_isCleanedUp = true;
|
||||
await _ref.read(backgroundSyncProvider).cancel();
|
||||
await _ref.read(backgroundSyncProvider).cancelLocal();
|
||||
await _isar.close();
|
||||
await _drift.close();
|
||||
await _driftLogger.close();
|
||||
_ref.dispose();
|
||||
}
|
||||
|
||||
Future<void> _handleBackup({bool processBulk = true}) async {
|
||||
if (!_isBackupEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
final currentUser = _ref.read(currentUserProvider);
|
||||
if (currentUser == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (processBulk) {
|
||||
return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id);
|
||||
}
|
||||
|
||||
final activeTask = await _ref.read(uploadServiceProvider).getActiveTasks(currentUser.id);
|
||||
if (activeTask.isNotEmpty) {
|
||||
await _ref.read(uploadServiceProvider).resumeBackup();
|
||||
} else {
|
||||
await _ref.read(uploadServiceProvider).startBackupSerial(currentUser.id);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _syncAssets({Duration? hashTimeout, bool syncRemote = true}) async {
|
||||
final futures = <Future<void>>[];
|
||||
|
||||
final localSyncFuture = _ref.read(backgroundSyncProvider).syncLocal().then((_) async {
|
||||
if (_isCleanedUp) {
|
||||
return;
|
||||
}
|
||||
|
||||
var hashFuture = _ref.read(backgroundSyncProvider).hashAssets();
|
||||
if (hashTimeout != null) {
|
||||
hashFuture = hashFuture.timeout(
|
||||
hashTimeout,
|
||||
onTimeout: () {
|
||||
// Consume cancellation errors as we want to continue processing
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
return hashFuture;
|
||||
});
|
||||
|
||||
futures.add(localSyncFuture);
|
||||
if (syncRemote) {
|
||||
final remoteSyncFuture = _ref.read(backgroundSyncProvider).syncRemote();
|
||||
futures.add(remoteSyncFuture);
|
||||
}
|
||||
|
||||
await Future.wait(futures);
|
||||
}
|
||||
}
|
||||
|
||||
@pragma('vm:entry-point')
|
||||
Future<void> _backgroundSyncNativeEntrypoint() async {
|
||||
WidgetsFlutterBinding.ensureInitialized();
|
||||
DartPluginRegistrant.ensureInitialized();
|
||||
|
||||
final (isar, drift, logDB) = await Bootstrap.initDB();
|
||||
await Bootstrap.initDomain(isar, drift, logDB, shouldBufferLogs: false);
|
||||
await BackgroundWorkerBgService(isar: isar, drift: drift, driftLogger: logDB).init();
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@ class HashService {
|
|||
final DriftLocalAssetRepository _localAssetRepository;
|
||||
final StorageRepository _storageRepository;
|
||||
final NativeSyncApi _nativeSyncApi;
|
||||
final bool Function()? _cancelChecker;
|
||||
final _log = Logger('HashService');
|
||||
|
||||
HashService({
|
||||
|
|
@ -22,13 +23,17 @@ class HashService {
|
|||
required DriftLocalAssetRepository localAssetRepository,
|
||||
required StorageRepository storageRepository,
|
||||
required NativeSyncApi nativeSyncApi,
|
||||
bool Function()? cancelChecker,
|
||||
this.batchSizeLimit = kBatchHashSizeLimit,
|
||||
this.batchFileLimit = kBatchHashFileLimit,
|
||||
}) : _localAlbumRepository = localAlbumRepository,
|
||||
_localAssetRepository = localAssetRepository,
|
||||
_storageRepository = storageRepository,
|
||||
_cancelChecker = cancelChecker,
|
||||
_nativeSyncApi = nativeSyncApi;
|
||||
|
||||
bool get isCancelled => _cancelChecker?.call() ?? false;
|
||||
|
||||
Future<void> hashAssets() async {
|
||||
final Stopwatch stopwatch = Stopwatch()..start();
|
||||
// Sorted by backupSelection followed by isCloud
|
||||
|
|
@ -37,6 +42,11 @@ class HashService {
|
|||
);
|
||||
|
||||
for (final album in localAlbums) {
|
||||
if (isCancelled) {
|
||||
_log.warning("Hashing cancelled. Stopped processing albums.");
|
||||
break;
|
||||
}
|
||||
|
||||
final assetsToHash = await _localAlbumRepository.getAssetsToHash(album.id);
|
||||
if (assetsToHash.isNotEmpty) {
|
||||
await _hashAssets(assetsToHash);
|
||||
|
|
@ -55,6 +65,11 @@ class HashService {
|
|||
final toHash = <_AssetToPath>[];
|
||||
|
||||
for (final asset in assetsToHash) {
|
||||
if (isCancelled) {
|
||||
_log.warning("Hashing cancelled. Stopped processing assets.");
|
||||
return;
|
||||
}
|
||||
|
||||
final file = await _storageRepository.getFileForAsset(asset.id);
|
||||
if (file == null) {
|
||||
continue;
|
||||
|
|
@ -89,6 +104,11 @@ class HashService {
|
|||
);
|
||||
|
||||
for (int i = 0; i < hashes.length; i++) {
|
||||
if (isCancelled) {
|
||||
_log.warning("Hashing cancelled. Stopped processing batch.");
|
||||
return;
|
||||
}
|
||||
|
||||
final hash = hashes[i];
|
||||
final asset = toHash[i].asset;
|
||||
if (hash?.length == 20) {
|
||||
|
|
|
|||
|
|
@ -123,6 +123,11 @@ class LogService {
|
|||
_flushTimer = null;
|
||||
final buffer = [..._msgBuffer];
|
||||
_msgBuffer.clear();
|
||||
|
||||
if (buffer.isEmpty) {
|
||||
return;
|
||||
}
|
||||
|
||||
await _logRepository.insertAll(buffer);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,6 +59,28 @@ class BackgroundSyncManager {
|
|||
}
|
||||
}
|
||||
|
||||
Future<void> cancelLocal() async {
|
||||
final futures = <Future>[];
|
||||
|
||||
if (_hashTask != null) {
|
||||
futures.add(_hashTask!.future);
|
||||
}
|
||||
_hashTask?.cancel();
|
||||
_hashTask = null;
|
||||
|
||||
if (_deviceAlbumSyncTask != null) {
|
||||
futures.add(_deviceAlbumSyncTask!.future);
|
||||
}
|
||||
_deviceAlbumSyncTask?.cancel();
|
||||
_deviceAlbumSyncTask = null;
|
||||
|
||||
try {
|
||||
await Future.wait(futures);
|
||||
} on CanceledError {
|
||||
// Ignore cancellation errors
|
||||
}
|
||||
}
|
||||
|
||||
// No need to cancel the task, as it can also be run when the user logs out
|
||||
Future<void> syncLocal({bool full = false}) {
|
||||
if (_deviceAlbumSyncTask != null) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue