refactor(server): jobs (#2023)

* refactor: job to domain

* chore: regenerate open api

* chore: tests

* fix: missing breaks

* fix: get asset with missing exif data

---------

Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
Jason Rasmussen 2023-03-20 11:55:28 -04:00 committed by GitHub
parent db6b14361d
commit 386eef046d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
68 changed files with 1355 additions and 907 deletions

View file

@ -6,7 +6,8 @@ import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import {
BackgroundTaskProcessor,
MachineLearningProcessor,
ClipEncodingProcessor,
ObjectTaggingProcessor,
SearchIndexProcessor,
StorageTemplateMigrationProcessor,
ThumbnailGeneratorProcessor,
@ -24,7 +25,8 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
ThumbnailGeneratorProcessor,
MetadataExtractionProcessor,
VideoTranscodeProcessor,
MachineLearningProcessor,
ObjectTaggingProcessor,
ClipEncodingProcessor,
StorageTemplateMigrationProcessor,
BackgroundTaskProcessor,
SearchIndexProcessor,

View file

@ -2,6 +2,7 @@ import {
AssetService,
IAssetJob,
IAssetUploadedJob,
IBaseJob,
IBulkEntityJob,
IDeleteFilesJob,
IUserDeletionJob,
@ -48,20 +49,35 @@ export class BackgroundTaskProcessor {
}
}
@Processor(QueueName.MACHINE_LEARNING)
export class MachineLearningProcessor {
@Processor(QueueName.OBJECT_TAGGING)
export class ObjectTaggingProcessor {
constructor(private smartInfoService: SmartInfoService) {}
@Process({ name: JobName.IMAGE_TAGGING, concurrency: 1 })
async onTagImage(job: Job<IAssetJob>) {
await this.smartInfoService.handleTagImage(job.data);
@Process({ name: JobName.QUEUE_OBJECT_TAGGING, concurrency: 1 })
async onQueueObjectTagging(job: Job<IBaseJob>) {
await this.smartInfoService.handleQueueObjectTagging(job.data);
}
@Process({ name: JobName.OBJECT_DETECTION, concurrency: 1 })
async onDetectObject(job: Job<IAssetJob>) {
@Process({ name: JobName.DETECT_OBJECTS, concurrency: 1 })
async onDetectObjects(job: Job<IAssetJob>) {
await this.smartInfoService.handleDetectObjects(job.data);
}
@Process({ name: JobName.CLASSIFY_IMAGE, concurrency: 1 })
async onClassifyImage(job: Job<IAssetJob>) {
await this.smartInfoService.handleClassifyImage(job.data);
}
}
@Processor(QueueName.CLIP_ENCODING)
export class ClipEncodingProcessor {
constructor(private smartInfoService: SmartInfoService) {}
@Process({ name: JobName.QUEUE_ENCODE_CLIP, concurrency: 1 })
async onQueueClipEncoding(job: Job<IBaseJob>) {
await this.smartInfoService.handleQueueEncodeClip(job.data);
}
@Process({ name: JobName.ENCODE_CLIP, concurrency: 1 })
async onEncodeClip(job: Job<IAssetJob>) {
await this.smartInfoService.handleEncodeClip(job.data);
@ -117,6 +133,11 @@ export class StorageTemplateMigrationProcessor {
export class ThumbnailGeneratorProcessor {
constructor(private mediaService: MediaService) {}
@Process({ name: JobName.QUEUE_GENERATE_THUMBNAILS, concurrency: 1 })
async handleQueueGenerateThumbnails(job: Job<IBaseJob>) {
await this.mediaService.handleQueueGenerateThumbnails(job.data);
}
@Process({ name: JobName.GENERATE_JPEG_THUMBNAIL, concurrency: 3 })
async handleGenerateJpegThumbnail(job: Job<IAssetJob>) {
await this.mediaService.handleGenerateJpegThumbnail(job.data);

View file

@ -1,11 +1,14 @@
import {
AssetCore,
getFileNameWithoutExtension,
IAssetRepository,
IAssetUploadedJob,
IBaseJob,
IJobRepository,
IReverseGeocodingJob,
JobName,
QueueName,
WithoutProperty,
} from '@app/domain';
import { AssetEntity, AssetType, ExifEntity } from '@app/infra';
import { Process, Processor } from '@nestjs/bull';
@ -85,8 +88,8 @@ export class MetadataExtractionProcessor {
private assetCore: AssetCore;
constructor(
@Inject(IAssetRepository) assetRepository: IAssetRepository,
@Inject(IJobRepository) jobRepository: IJobRepository,
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@InjectRepository(ExifEntity)
private exifRepository: Repository<ExifEntity>,
@ -148,6 +151,24 @@ export class MetadataExtractionProcessor {
return { country, state, city };
}
@Process(JobName.QUEUE_METADATA_EXTRACTION)
async handleQueueMetadataExtraction(job: Job<IBaseJob>) {
try {
const { force } = job.data;
const assets = force
? await this.assetRepository.getAll()
: await this.assetRepository.getWithout(WithoutProperty.EXIF);
for (const asset of assets) {
const fileName = asset.exifInfo?.imageName ?? getFileNameWithoutExtension(asset.originalPath);
const name = asset.type === AssetType.VIDEO ? JobName.EXTRACT_VIDEO_METADATA : JobName.EXIF_EXTRACTION;
await this.jobRepository.queue({ name, data: { asset, fileName } });
}
} catch (error: any) {
this.logger.error(`Unable to queue metadata extraction`, error?.stack);
}
}
@Process(JobName.EXIF_EXTRACTION)
async extractExifInfo(job: Job<IAssetUploadedJob>) {
try {

View file

@ -1,6 +1,15 @@
import { APP_UPLOAD_LOCATION } from '@app/common/constants';
import { AssetEntity } from '@app/infra';
import { IAssetJob, IAssetRepository, JobName, QueueName, SystemConfigService } from '@app/domain';
import { AssetEntity, AssetType } from '@app/infra';
import {
IAssetJob,
IAssetRepository,
IBaseJob,
IJobRepository,
JobName,
QueueName,
SystemConfigService,
WithoutProperty,
} from '@app/domain';
import { Process, Processor } from '@nestjs/bull';
import { Inject, Logger } from '@nestjs/common';
import { Job } from 'bull';
@ -12,11 +21,27 @@ export class VideoTranscodeProcessor {
readonly logger = new Logger(VideoTranscodeProcessor.name);
constructor(
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
private systemConfigService: SystemConfigService,
) {}
@Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 1 })
async handleQueueVideoConversion(job: Job<IBaseJob>): Promise<void> {
try {
const { force } = job.data;
const assets = force
? await this.assetRepository.getAll({ type: AssetType.VIDEO })
: await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO);
for (const asset of assets) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } });
}
} catch (error: any) {
this.logger.error('Failed to queue video conversions', error.stack);
}
}
@Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 })
async videoConversion(job: Job<IAssetJob>) {
async handleVideoConversion(job: Job<IAssetJob>) {
const { asset } = job.data;
const basePath = APP_UPLOAD_LOCATION;
const encodedVideoPath = `${basePath}/${asset.ownerId}/encoded-video`;