websocket upload notification - closed #24 (#25)

* Render when a new asset is uploaded from WebSocket notification
* Update Readme
This commit is contained in:
Alex 2022-02-14 10:40:41 -06:00 committed by GitHub
parent 7cc7fc0a0c
commit c234c95880
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 11037 additions and 69 deletions

View file

@ -0,0 +1,47 @@
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { CommunicationService } from './communication.service';
import { Socket, Server } from 'socket.io';
import { ImmichJwtService } from '../../modules/immich-jwt/immich-jwt.service';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { UserEntity } from '../user/entities/user.entity';
import { Repository } from 'typeorm';
@WebSocketGateway()
export class CommunicationGateway implements OnGatewayConnection, OnGatewayDisconnect {
constructor(
private immichJwtService: ImmichJwtService,
@InjectRepository(UserEntity)
private userRepository: Repository<UserEntity>,
) {}
@WebSocketServer() server: Server;
handleDisconnect(client: Socket) {
client.leave(client.nsp.name);
Logger.log(`Client ${client.id} disconnected`);
}
async handleConnection(client: Socket, ...args: any[]) {
Logger.log(`New websocket connection: ${client.id}`, 'NewWebSocketConnection');
const accessToken = client.handshake.headers.authorization.split(' ')[1];
const res = await this.immichJwtService.validateToken(accessToken);
if (!res.status) {
client.emit('error', 'unauthorized');
client.disconnect();
return;
}
const user = await this.userRepository.findOne({ where: { id: res.userId } });
if (!user) {
client.emit('error', 'unauthorized');
client.disconnect();
return;
}
client.join(user.id);
}
}

View file

@ -0,0 +1,16 @@
import { Module } from '@nestjs/common';
import { CommunicationService } from './communication.service';
import { CommunicationGateway } from './communication.gateway';
import { ImmichJwtModule } from '../../modules/immich-jwt/immich-jwt.module';
import { ImmichJwtService } from '../../modules/immich-jwt/immich-jwt.service';
import { JwtModule } from '@nestjs/jwt';
import { jwtConfig } from '../../config/jwt.config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UserEntity } from '../user/entities/user.entity';
@Module({
imports: [TypeOrmModule.forFeature([UserEntity]), ImmichJwtModule, JwtModule.register(jwtConfig)],
providers: [CommunicationGateway, CommunicationService, ImmichJwtService],
exports: [CommunicationGateway],
})
export class CommunicationModule {}

View file

@ -0,0 +1,4 @@
import { Injectable } from '@nestjs/common';
@Injectable()
export class CommunicationService {}

View file

@ -16,16 +16,7 @@ export class UserService {
return 'This action adds a new user';
}
async findAll() {
try {
return 'welcome';
// return await this.userRepository.find();
// return await this.userRepository.query('select * from users');
} catch (e) {
console.log(e);
}
// return 'helloworld';
}
async findAll() {}
findOne(id: number) {
return `This action returns a #${id} user`;

View file

@ -14,6 +14,7 @@ import { BullModule } from '@nestjs/bull';
import { ImageOptimizeModule } from './modules/image-optimize/image-optimize.module';
import { ServerInfoModule } from './api-v1/server-info/server-info.module';
import { BackgroundTaskModule } from './modules/background-task/background-task.module';
import { CommunicationModule } from './api-v1/communication/communication.module';
@Module({
imports: [
@ -40,6 +41,8 @@ import { BackgroundTaskModule } from './modules/background-task/background-task.
ServerInfoModule,
BackgroundTaskModule,
CommunicationModule,
],
controllers: [],
providers: [],

View file

@ -1,11 +1,15 @@
import { NestFactory } from '@nestjs/core';
import { NestExpressApplication } from '@nestjs/platform-express';
import { AppModule } from './app.module';
import { RedisIoAdapter } from './middlewares/redis-io.adapter.middleware';
async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule);
app.set('trust proxy');
app.useWebSocketAdapter(new RedisIoAdapter(app));
await app.listen(3000);
}
bootstrap();

View file

@ -0,0 +1,15 @@
import { IoAdapter } from '@nestjs/platform-socket.io';
import { RedisClient, createClient } from 'redis';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
const pubClient = createClient({ url: 'redis://immich_redis:6379' });
const subClient = pubClient.duplicate();
export class RedisIoAdapter extends IoAdapter {
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(createAdapter(pubClient, subClient));
return server;
}
}

View file

@ -5,11 +5,16 @@ import { join } from 'path';
import { AssetModule } from '../../api-v1/asset/asset.module';
import { AssetService } from '../../api-v1/asset/asset.service';
import { AssetEntity } from '../../api-v1/asset/entities/asset.entity';
import { CommunicationGateway } from '../../api-v1/communication/communication.gateway';
import { CommunicationModule } from '../../api-v1/communication/communication.module';
import { UserEntity } from '../../api-v1/user/entities/user.entity';
import { ImmichJwtModule } from '../immich-jwt/immich-jwt.module';
import { ImageOptimizeProcessor } from './image-optimize.processor';
import { AssetOptimizeService } from './image-optimize.service';
@Module({
imports: [
CommunicationModule,
BullModule.registerQueue({
name: 'optimize',
defaultJobOptions: {

View file

@ -8,14 +8,16 @@ import { existsSync, mkdirSync, readFile } from 'fs';
import { ConfigService } from '@nestjs/config';
import ffmpeg from 'fluent-ffmpeg';
import { APP_UPLOAD_LOCATION } from '../../constants/upload_location.constant';
import { WebSocketServer } from '@nestjs/websockets';
import { Socket, Server as SocketIoServer } from 'socket.io';
import { CommunicationGateway } from '../../api-v1/communication/communication.gateway';
@Processor('optimize')
export class ImageOptimizeProcessor {
constructor(
private wsCommunicateionGateway: CommunicationGateway,
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
private configService: ConfigService,
) {}
@Process('resize-image')
@ -55,7 +57,12 @@ export class ImageOptimizeProcessor {
return;
}
await this.assetRepository.update(savedAsset, { resizePath: desitnation });
const res = await this.assetRepository.update(savedAsset, { resizePath: desitnation });
if (res.affected) {
this.wsCommunicateionGateway.server
.to(savedAsset.userId)
.emit('on_upload_success', JSON.stringify(savedAsset));
}
});
} else {
sharp(data)
@ -66,7 +73,12 @@ export class ImageOptimizeProcessor {
return;
}
await this.assetRepository.update(savedAsset, { resizePath: resizePath });
const res = await this.assetRepository.update(savedAsset, { resizePath: resizePath });
if (res.affected) {
this.wsCommunicateionGateway.server
.to(savedAsset.userId)
.emit('on_upload_success', JSON.stringify(savedAsset));
}
});
}
});
@ -95,7 +107,12 @@ export class ImageOptimizeProcessor {
filename: `${filename}.png`,
})
.on('end', async (a) => {
await this.assetRepository.update(savedAsset, { resizePath: `${resizeDir}/${filename}.png` });
const res = await this.assetRepository.update(savedAsset, { resizePath: `${resizeDir}/${filename}.png` });
if (res.affected) {
this.wsCommunicateionGateway.server
.to(savedAsset.userId)
.emit('on_upload_success', JSON.stringify(savedAsset));
}
});
return 'ok';

View file

@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
import { JwtPayloadDto } from '../../api-v1/auth/dto/jwt-payload.dto';
import { jwtSecret } from '../../constants/jwt.constant';
@Injectable()
export class ImmichJwtService {
@ -11,4 +12,20 @@ export class ImmichJwtService {
...payload,
});
}
public async validateToken(accessToken: string) {
try {
const payload = await this.jwtService.verify(accessToken, { secret: jwtSecret });
return {
userId: payload['userId'],
status: true,
};
} catch (e) {
Logger.error('Error validating token from websocket request', 'ValidateWebsocketToken');
return {
userId: null,
status: false,
};
}
}
}