mirror of
https://github.com/immich-app/immich
synced 2025-10-17 18:19:27 +00:00
chore: refactor upload service (#20130)
* chore: refactor upload service * fix: cancel upload queue on logout (#20131) * fix: cancel upload on logout * fix: test --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com> --------- Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com> Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
This commit is contained in:
parent
e5ee1c8db6
commit
03a13828e1
13 changed files with 438 additions and 395 deletions
|
|
@ -1,24 +1,51 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:background_downloader/background_downloader.dart';
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/constants/constants.dart';
|
||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
||||
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||
import 'package:immich_mobile/entities/store.entity.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/backup.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart';
|
||||
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/asset.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/storage.provider.dart';
|
||||
import 'package:immich_mobile/repositories/upload.repository.dart';
|
||||
import 'package:immich_mobile/services/api.service.dart';
|
||||
import 'package:path/path.dart';
|
||||
import 'package:path/path.dart' as p;
|
||||
|
||||
final uploadServiceProvider = Provider((ref) {
|
||||
final service = UploadService(ref.watch(uploadRepositoryProvider));
|
||||
final service = UploadService(
|
||||
ref.watch(uploadRepositoryProvider),
|
||||
ref.watch(backupRepositoryProvider),
|
||||
ref.watch(storageRepositoryProvider),
|
||||
ref.watch(localAssetRepository),
|
||||
);
|
||||
|
||||
ref.onDispose(service.dispose);
|
||||
return service;
|
||||
});
|
||||
|
||||
class UploadService {
|
||||
UploadService(
|
||||
this._uploadRepository,
|
||||
this._backupRepository,
|
||||
this._storageRepository,
|
||||
this._localAssetRepository,
|
||||
) {
|
||||
_uploadRepository.onUploadStatus = _onUploadCallback;
|
||||
_uploadRepository.onTaskProgress = _onTaskProgressCallback;
|
||||
}
|
||||
|
||||
final UploadRepository _uploadRepository;
|
||||
void Function(TaskStatusUpdate)? onUploadStatus;
|
||||
void Function(TaskProgressUpdate)? onTaskProgress;
|
||||
final DriftBackupRepository _backupRepository;
|
||||
final StorageRepository _storageRepository;
|
||||
final DriftLocalAssetRepository _localAssetRepository;
|
||||
|
||||
final StreamController<TaskStatusUpdate> _taskStatusController = StreamController<TaskStatusUpdate>.broadcast();
|
||||
final StreamController<TaskProgressUpdate> _taskProgressController = StreamController<TaskProgressUpdate>.broadcast();
|
||||
|
|
@ -26,25 +53,19 @@ class UploadService {
|
|||
Stream<TaskStatusUpdate> get taskStatusStream => _taskStatusController.stream;
|
||||
Stream<TaskProgressUpdate> get taskProgressStream => _taskProgressController.stream;
|
||||
|
||||
UploadService(
|
||||
this._uploadRepository,
|
||||
) {
|
||||
_uploadRepository.onUploadStatus = _onUploadCallback;
|
||||
_uploadRepository.onTaskProgress = _onTaskProgressCallback;
|
||||
}
|
||||
bool shouldAbortQueuingTasks = false;
|
||||
|
||||
void _onTaskProgressCallback(TaskProgressUpdate update) {
|
||||
onTaskProgress?.call(update);
|
||||
if (!_taskProgressController.isClosed) {
|
||||
_taskProgressController.add(update);
|
||||
}
|
||||
}
|
||||
|
||||
void _onUploadCallback(TaskStatusUpdate update) {
|
||||
onUploadStatus?.call(update);
|
||||
if (!_taskStatusController.isClosed) {
|
||||
_taskStatusController.add(update);
|
||||
}
|
||||
_handleTaskStatusUpdate(update);
|
||||
}
|
||||
|
||||
void dispose() {
|
||||
|
|
@ -52,18 +73,234 @@ class UploadService {
|
|||
_taskProgressController.close();
|
||||
}
|
||||
|
||||
Future<bool> cancelUpload(String id) {
|
||||
return FileDownloader().cancelTaskWithId(id);
|
||||
}
|
||||
|
||||
Future<void> cancelAllForGroup(String group) async {
|
||||
await _uploadRepository.cancelAll(group);
|
||||
await _uploadRepository.reset(group);
|
||||
await _uploadRepository.deleteAllTrackingRecords(group);
|
||||
}
|
||||
|
||||
void enqueueTasks(List<UploadTask> tasks) {
|
||||
_uploadRepository.enqueueAll(tasks);
|
||||
_uploadRepository.enqueueBackgroundAll(tasks);
|
||||
}
|
||||
|
||||
Future<List<Task>> getActiveTasks(String group) {
|
||||
return _uploadRepository.getActiveTasks(group);
|
||||
}
|
||||
|
||||
Future<int> getBackupTotalCount() {
|
||||
return _backupRepository.getTotalCount();
|
||||
}
|
||||
|
||||
Future<int> getBackupRemainderCount(String userId) {
|
||||
return _backupRepository.getRemainderCount(userId);
|
||||
}
|
||||
|
||||
Future<int> getBackupFinishedCount(String userId) {
|
||||
return _backupRepository.getBackupCount(userId);
|
||||
}
|
||||
|
||||
Future<void> manualBackup(List<LocalAsset> localAssets) async {
|
||||
List<UploadTask> tasks = [];
|
||||
for (final asset in localAssets) {
|
||||
final task = await _getUploadTask(
|
||||
asset,
|
||||
group: kManualUploadGroup,
|
||||
priority: 1, // High priority after upload motion photo part
|
||||
);
|
||||
if (task != null) {
|
||||
tasks.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
if (tasks.isNotEmpty) {
|
||||
enqueueTasks(tasks);
|
||||
}
|
||||
}
|
||||
|
||||
/// Find backup candidates
|
||||
/// Build the upload tasks
|
||||
/// Enqueue the tasks
|
||||
Future<void> startBackup(
|
||||
String userId,
|
||||
void Function(EnqueueStatus status) onEnqueueTasks,
|
||||
) async {
|
||||
shouldAbortQueuingTasks = false;
|
||||
|
||||
final candidates = await _backupRepository.getCandidates(userId);
|
||||
if (candidates.isEmpty) {
|
||||
return;
|
||||
}
|
||||
|
||||
const batchSize = 100;
|
||||
int count = 0;
|
||||
for (int i = 0; i < candidates.length; i += batchSize) {
|
||||
if (shouldAbortQueuingTasks) {
|
||||
break;
|
||||
}
|
||||
|
||||
final batch = candidates.skip(i).take(batchSize).toList();
|
||||
|
||||
List<UploadTask> tasks = [];
|
||||
for (final asset in batch) {
|
||||
final task = await _getUploadTask(asset);
|
||||
if (task != null) {
|
||||
tasks.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
if (tasks.isNotEmpty && !shouldAbortQueuingTasks) {
|
||||
count += tasks.length;
|
||||
enqueueTasks(tasks);
|
||||
|
||||
onEnqueueTasks(
|
||||
EnqueueStatus(
|
||||
enqueueCount: count,
|
||||
totalCount: candidates.length,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancel all ongoing uploads and reset the upload queue
|
||||
///
|
||||
/// Return the number of left over tasks in the queue
|
||||
Future<int> cancelBackup() async {
|
||||
shouldAbortQueuingTasks = true;
|
||||
|
||||
await _uploadRepository.reset(kBackupGroup);
|
||||
await _uploadRepository.deleteDatabaseRecords(kBackupGroup);
|
||||
|
||||
final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup);
|
||||
return activeTasks.length;
|
||||
}
|
||||
|
||||
Future<void> resumeBackup() {
|
||||
return _uploadRepository.start();
|
||||
}
|
||||
|
||||
void _handleTaskStatusUpdate(TaskStatusUpdate update) {
|
||||
switch (update.status) {
|
||||
case TaskStatus.complete:
|
||||
_handleLivePhoto(update);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _handleLivePhoto(TaskStatusUpdate update) async {
|
||||
try {
|
||||
if (update.task.metaData.isEmpty || update.task.metaData == '') {
|
||||
return;
|
||||
}
|
||||
|
||||
final metadata = UploadTaskMetadata.fromJson(update.task.metaData);
|
||||
if (!metadata.isLivePhotos) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (update.responseBody == null || update.responseBody!.isEmpty) {
|
||||
return;
|
||||
}
|
||||
final response = jsonDecode(update.responseBody!);
|
||||
|
||||
final localAsset = await _localAssetRepository.getById(metadata.localAssetId);
|
||||
if (localAsset == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final uploadTask = await _getLivePhotoUploadTask(
|
||||
localAsset,
|
||||
response['id'] as String,
|
||||
);
|
||||
|
||||
if (uploadTask == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
enqueueTasks([uploadTask]);
|
||||
} catch (error, stackTrace) {
|
||||
debugPrint("Error handling live photo upload task: $error $stackTrace");
|
||||
}
|
||||
}
|
||||
|
||||
Future<UploadTask?> _getUploadTask(
|
||||
LocalAsset asset, {
|
||||
String group = kBackupGroup,
|
||||
int? priority,
|
||||
}) async {
|
||||
final entity = await _storageRepository.getAssetEntityForAsset(asset);
|
||||
if (entity == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
File? file;
|
||||
|
||||
/// iOS LivePhoto has two files: a photo and a video.
|
||||
/// They are uploaded separately, with video file being upload first, then returned with the assetId
|
||||
/// The assetId is then used as a metadata for the photo file upload task.
|
||||
///
|
||||
/// We implement two separate upload groups for this, the normal one for the video file
|
||||
/// and the higher priority group for the photo file because the video file is already uploaded.
|
||||
///
|
||||
/// The cancel operation will only cancel the video group (normal group), the photo group will not
|
||||
/// be touched, as the video file is already uploaded.
|
||||
|
||||
if (entity.isLivePhoto) {
|
||||
file = await _storageRepository.getMotionFileForAsset(asset);
|
||||
} else {
|
||||
file = await _storageRepository.getFileForAsset(asset.id);
|
||||
}
|
||||
|
||||
if (file == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final originalFileName = entity.isLivePhoto
|
||||
? p.setExtension(
|
||||
asset.name,
|
||||
p.extension(file.path),
|
||||
)
|
||||
: asset.name;
|
||||
|
||||
String metadata = UploadTaskMetadata(
|
||||
localAssetId: asset.id,
|
||||
isLivePhotos: entity.isLivePhoto,
|
||||
livePhotoVideoId: '',
|
||||
).toJson();
|
||||
|
||||
return buildUploadTask(
|
||||
file,
|
||||
originalFileName: originalFileName,
|
||||
deviceAssetId: asset.id,
|
||||
metadata: metadata,
|
||||
group: group,
|
||||
priority: priority,
|
||||
);
|
||||
}
|
||||
|
||||
Future<UploadTask?> _getLivePhotoUploadTask(
|
||||
LocalAsset asset,
|
||||
String livePhotoVideoId,
|
||||
) async {
|
||||
final entity = await _storageRepository.getAssetEntityForAsset(asset);
|
||||
if (entity == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final file = await _storageRepository.getFileForAsset(asset.id);
|
||||
if (file == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final fields = {
|
||||
'livePhotoVideoId': livePhotoVideoId,
|
||||
};
|
||||
|
||||
return buildUploadTask(
|
||||
file,
|
||||
originalFileName: asset.name,
|
||||
deviceAssetId: asset.id,
|
||||
fields: fields,
|
||||
group: kBackupLivePhotoGroup,
|
||||
priority: 0, // Highest priority to get upload immediately
|
||||
);
|
||||
}
|
||||
|
||||
Future<UploadTask> buildUploadTask(
|
||||
|
|
@ -74,26 +311,6 @@ class UploadService {
|
|||
String? deviceAssetId,
|
||||
String? metadata,
|
||||
int? priority,
|
||||
}) async {
|
||||
return _buildTask(
|
||||
deviceAssetId ?? hash(file.path).toString(),
|
||||
file,
|
||||
fields: fields,
|
||||
originalFileName: originalFileName,
|
||||
metadata: metadata,
|
||||
group: group,
|
||||
priority: priority,
|
||||
);
|
||||
}
|
||||
|
||||
Future<UploadTask> _buildTask(
|
||||
String id,
|
||||
File file, {
|
||||
required String group,
|
||||
Map<String, String>? fields,
|
||||
String? originalFileName,
|
||||
String? metadata,
|
||||
int? priority,
|
||||
}) async {
|
||||
final serverEndpoint = Store.get(StoreKey.serverEndpoint);
|
||||
final url = Uri.parse('$serverEndpoint/assets').toString();
|
||||
|
|
@ -106,7 +323,7 @@ class UploadService {
|
|||
final fileModifiedAt = stats.modified;
|
||||
final fieldsMap = {
|
||||
'filename': originalFileName ?? filename,
|
||||
'deviceAssetId': id,
|
||||
'deviceAssetId': deviceAssetId ?? '',
|
||||
'deviceId': deviceId,
|
||||
'fileCreatedAt': fileCreatedAt.toUtc().toIso8601String(),
|
||||
'fileModifiedAt': fileModifiedAt.toUtc().toIso8601String(),
|
||||
|
|
@ -116,7 +333,7 @@ class UploadService {
|
|||
};
|
||||
|
||||
return UploadTask(
|
||||
taskId: id,
|
||||
taskId: deviceAssetId,
|
||||
displayName: originalFileName ?? filename,
|
||||
httpRequestMethod: 'POST',
|
||||
url: url,
|
||||
|
|
@ -134,3 +351,64 @@ class UploadService {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
class UploadTaskMetadata {
|
||||
final String localAssetId;
|
||||
final bool isLivePhotos;
|
||||
final String livePhotoVideoId;
|
||||
|
||||
const UploadTaskMetadata({
|
||||
required this.localAssetId,
|
||||
required this.isLivePhotos,
|
||||
required this.livePhotoVideoId,
|
||||
});
|
||||
|
||||
UploadTaskMetadata copyWith({
|
||||
String? localAssetId,
|
||||
bool? isLivePhotos,
|
||||
String? livePhotoVideoId,
|
||||
}) {
|
||||
return UploadTaskMetadata(
|
||||
localAssetId: localAssetId ?? this.localAssetId,
|
||||
isLivePhotos: isLivePhotos ?? this.isLivePhotos,
|
||||
livePhotoVideoId: livePhotoVideoId ?? this.livePhotoVideoId,
|
||||
);
|
||||
}
|
||||
|
||||
Map<String, dynamic> toMap() {
|
||||
return <String, dynamic>{
|
||||
'localAssetId': localAssetId,
|
||||
'isLivePhotos': isLivePhotos,
|
||||
'livePhotoVideoId': livePhotoVideoId,
|
||||
};
|
||||
}
|
||||
|
||||
factory UploadTaskMetadata.fromMap(Map<String, dynamic> map) {
|
||||
return UploadTaskMetadata(
|
||||
localAssetId: map['localAssetId'] as String,
|
||||
isLivePhotos: map['isLivePhotos'] as bool,
|
||||
livePhotoVideoId: map['livePhotoVideoId'] as String,
|
||||
);
|
||||
}
|
||||
|
||||
String toJson() => json.encode(toMap());
|
||||
|
||||
factory UploadTaskMetadata.fromJson(String source) =>
|
||||
UploadTaskMetadata.fromMap(json.decode(source) as Map<String, dynamic>);
|
||||
|
||||
@override
|
||||
String toString() =>
|
||||
'UploadTaskMetadata(localAssetId: $localAssetId, isLivePhotos: $isLivePhotos, livePhotoVideoId: $livePhotoVideoId)';
|
||||
|
||||
@override
|
||||
bool operator ==(covariant UploadTaskMetadata other) {
|
||||
if (identical(this, other)) return true;
|
||||
|
||||
return other.localAssetId == localAssetId &&
|
||||
other.isLivePhotos == isLivePhotos &&
|
||||
other.livePhotoVideoId == livePhotoVideoId;
|
||||
}
|
||||
|
||||
@override
|
||||
int get hashCode => localAssetId.hashCode ^ isLivePhotos.hashCode ^ livePhotoVideoId.hashCode;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue