2025-02-26 14:35:51 +00:00
|
|
|
import { BadRequestException, ForbiddenException, Injectable } from '@nestjs/common';
|
2025-02-21 04:37:57 +00:00
|
|
|
import { Insertable } from 'kysely';
|
2024-04-16 07:26:37 +02:00
|
|
|
import { DateTime } from 'luxon';
|
2025-02-21 04:37:57 +00:00
|
|
|
import { Writable } from 'node:stream';
|
2024-04-16 07:26:37 +02:00
|
|
|
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
|
2025-02-21 04:37:57 +00:00
|
|
|
import { SessionSyncCheckpoints } from 'src/db';
|
2025-03-10 12:05:39 -04:00
|
|
|
import { AssetResponseDto, hexOrBufferToBase64, mapAsset } from 'src/dtos/asset-response.dto';
|
2024-04-16 07:26:37 +02:00
|
|
|
import { AuthDto } from 'src/dtos/auth.dto';
|
2025-02-21 04:37:57 +00:00
|
|
|
import {
|
|
|
|
|
AssetDeltaSyncDto,
|
|
|
|
|
AssetDeltaSyncResponseDto,
|
|
|
|
|
AssetFullSyncDto,
|
|
|
|
|
SyncAckDeleteDto,
|
|
|
|
|
SyncAckSetDto,
|
|
|
|
|
SyncStreamDto,
|
|
|
|
|
} from 'src/dtos/sync.dto';
|
2025-05-06 12:12:48 -05:00
|
|
|
import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum';
|
2024-10-02 10:54:35 -04:00
|
|
|
import { BaseService } from 'src/services/base.service';
|
2025-02-21 04:37:57 +00:00
|
|
|
import { SyncAck } from 'src/types';
|
2024-06-14 18:29:32 -04:00
|
|
|
import { getMyPartnerIds } from 'src/utils/asset.util';
|
2024-04-29 05:24:21 +02:00
|
|
|
import { setIsEqual } from 'src/utils/set';
|
2025-02-21 04:37:57 +00:00
|
|
|
import { fromAck, serialize } from 'src/utils/sync';
|
2024-04-29 05:24:21 +02:00
|
|
|
|
|
|
|
|
const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
|
2025-03-10 12:05:39 -04:00
|
|
|
export const SYNC_TYPES_ORDER = [
|
2025-02-21 04:37:57 +00:00
|
|
|
//
|
|
|
|
|
SyncRequestType.UsersV1,
|
2025-03-03 11:05:30 +00:00
|
|
|
SyncRequestType.PartnersV1,
|
2025-03-10 12:05:39 -04:00
|
|
|
SyncRequestType.AssetsV1,
|
|
|
|
|
SyncRequestType.AssetExifsV1,
|
|
|
|
|
SyncRequestType.PartnerAssetsV1,
|
|
|
|
|
SyncRequestType.PartnerAssetExifsV1,
|
2025-02-21 04:37:57 +00:00
|
|
|
];
|
|
|
|
|
|
|
|
|
|
const throwSessionRequired = () => {
|
|
|
|
|
throw new ForbiddenException('Sync endpoints cannot be used with API keys');
|
|
|
|
|
};
|
2024-04-16 07:26:37 +02:00
|
|
|
|
2025-02-11 17:15:56 -05:00
|
|
|
@Injectable()
|
2024-10-02 10:54:35 -04:00
|
|
|
export class SyncService extends BaseService {
|
2025-02-21 04:37:57 +00:00
|
|
|
getAcks(auth: AuthDto) {
|
|
|
|
|
const sessionId = auth.session?.id;
|
|
|
|
|
if (!sessionId) {
|
|
|
|
|
return throwSessionRequired();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return this.syncRepository.getCheckpoints(sessionId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async setAcks(auth: AuthDto, dto: SyncAckSetDto) {
|
|
|
|
|
const sessionId = auth.session?.id;
|
|
|
|
|
if (!sessionId) {
|
|
|
|
|
return throwSessionRequired();
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-10 12:05:39 -04:00
|
|
|
const checkpoints: Record<string, Insertable<SessionSyncCheckpoints>> = {};
|
2025-02-21 04:37:57 +00:00
|
|
|
for (const ack of dto.acks) {
|
|
|
|
|
const { type } = fromAck(ack);
|
2025-02-26 14:35:51 +00:00
|
|
|
// TODO proper ack validation via class validator
|
|
|
|
|
if (!Object.values(SyncEntityType).includes(type)) {
|
|
|
|
|
throw new BadRequestException(`Invalid ack type: ${type}`);
|
|
|
|
|
}
|
2025-03-10 12:05:39 -04:00
|
|
|
|
|
|
|
|
if (checkpoints[type]) {
|
|
|
|
|
throw new BadRequestException('Only one ack per type is allowed');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
checkpoints[type] = { sessionId, type, ack };
|
2025-02-21 04:37:57 +00:00
|
|
|
}
|
|
|
|
|
|
2025-03-10 12:05:39 -04:00
|
|
|
await this.syncRepository.upsertCheckpoints(Object.values(checkpoints));
|
2025-02-21 04:37:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async deleteAcks(auth: AuthDto, dto: SyncAckDeleteDto) {
|
|
|
|
|
const sessionId = auth.session?.id;
|
|
|
|
|
if (!sessionId) {
|
|
|
|
|
return throwSessionRequired();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.syncRepository.deleteCheckpoints(sessionId, dto.types);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async stream(auth: AuthDto, response: Writable, dto: SyncStreamDto) {
|
|
|
|
|
const sessionId = auth.session?.id;
|
|
|
|
|
if (!sessionId) {
|
|
|
|
|
return throwSessionRequired();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const checkpoints = await this.syncRepository.getCheckpoints(sessionId);
|
|
|
|
|
const checkpointMap: Partial<Record<SyncEntityType, SyncAck>> = Object.fromEntries(
|
|
|
|
|
checkpoints.map(({ type, ack }) => [type, fromAck(ack)]),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
|
|
|
|
|
switch (type) {
|
|
|
|
|
case SyncRequestType.UsersV1: {
|
|
|
|
|
const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]);
|
2025-02-27 14:22:02 +00:00
|
|
|
for await (const { id, ...data } of deletes) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.UserDeleteV1, updateId: id, data }));
|
2025-02-21 04:37:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]);
|
2025-02-27 14:22:02 +00:00
|
|
|
for await (const { updateId, ...data } of upserts) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.UserV1, updateId, data }));
|
2025-02-21 04:37:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-03 11:05:30 +00:00
|
|
|
case SyncRequestType.PartnersV1: {
|
|
|
|
|
const deletes = this.syncRepository.getPartnerDeletes(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.PartnerDeleteV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { id, ...data } of deletes) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, updateId: id, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]);
|
|
|
|
|
for await (const { updateId, ...data } of upserts) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.PartnerV1, updateId, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-10 12:05:39 -04:00
|
|
|
case SyncRequestType.AssetsV1: {
|
|
|
|
|
const deletes = this.syncRepository.getAssetDeletes(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.AssetDeleteV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { id, ...data } of deletes) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.AssetDeleteV1, updateId: id, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]);
|
|
|
|
|
for await (const { updateId, checksum, thumbhash, ...data } of upserts) {
|
|
|
|
|
response.write(
|
|
|
|
|
serialize({
|
|
|
|
|
type: SyncEntityType.AssetV1,
|
|
|
|
|
updateId,
|
|
|
|
|
data: {
|
|
|
|
|
...data,
|
|
|
|
|
checksum: hexOrBufferToBase64(checksum),
|
|
|
|
|
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
|
|
|
|
|
},
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case SyncRequestType.PartnerAssetsV1: {
|
|
|
|
|
const deletes = this.syncRepository.getPartnerAssetDeletes(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.PartnerAssetDeleteV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { id, ...data } of deletes) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, updateId: id, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const upserts = this.syncRepository.getPartnerAssetsUpserts(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.PartnerAssetV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { updateId, checksum, thumbhash, ...data } of upserts) {
|
|
|
|
|
response.write(
|
|
|
|
|
serialize({
|
|
|
|
|
type: SyncEntityType.PartnerAssetV1,
|
|
|
|
|
updateId,
|
|
|
|
|
data: {
|
|
|
|
|
...data,
|
|
|
|
|
checksum: hexOrBufferToBase64(checksum),
|
|
|
|
|
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
|
|
|
|
|
},
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case SyncRequestType.AssetExifsV1: {
|
|
|
|
|
const upserts = this.syncRepository.getAssetExifsUpserts(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.AssetExifV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { updateId, ...data } of upserts) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.AssetExifV1, updateId, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case SyncRequestType.PartnerAssetExifsV1: {
|
|
|
|
|
const upserts = this.syncRepository.getPartnerAssetExifsUpserts(
|
|
|
|
|
auth.user.id,
|
|
|
|
|
checkpointMap[SyncEntityType.PartnerAssetExifV1],
|
|
|
|
|
);
|
|
|
|
|
for await (const { updateId, ...data } of upserts) {
|
|
|
|
|
response.write(serialize({ type: SyncEntityType.PartnerAssetExifV1, updateId, data }));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-21 04:37:57 +00:00
|
|
|
default: {
|
|
|
|
|
this.logger.warn(`Unsupported sync type: ${type}`);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
response.end();
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-29 05:24:21 +02:00
|
|
|
async getFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise<AssetResponseDto[]> {
|
|
|
|
|
// mobile implementation is faster if this is a single id
|
2024-04-16 07:26:37 +02:00
|
|
|
const userId = dto.userId || auth.user.id;
|
2024-10-10 11:53:53 -04:00
|
|
|
await this.requireAccess({ auth, permission: Permission.TIMELINE_READ, ids: [userId] });
|
2024-04-16 07:26:37 +02:00
|
|
|
const assets = await this.assetRepository.getAllForUserFullSync({
|
|
|
|
|
ownerId: userId,
|
|
|
|
|
updatedUntil: dto.updatedUntil,
|
|
|
|
|
lastId: dto.lastId,
|
|
|
|
|
limit: dto.limit,
|
|
|
|
|
});
|
2024-04-29 05:24:21 +02:00
|
|
|
return assets.map((a) => mapAsset(a, { auth, stripMetadata: false, withStack: true }));
|
2024-04-16 07:26:37 +02:00
|
|
|
}
|
|
|
|
|
|
2024-04-29 05:24:21 +02:00
|
|
|
async getDeltaSync(auth: AuthDto, dto: AssetDeltaSyncDto): Promise<AssetDeltaSyncResponseDto> {
|
|
|
|
|
// app has not synced in the last 100 days
|
2024-04-16 07:26:37 +02:00
|
|
|
const duration = DateTime.now().diff(DateTime.fromJSDate(dto.updatedAfter));
|
2024-04-29 05:24:21 +02:00
|
|
|
if (duration > AUDIT_LOG_MAX_DURATION) {
|
|
|
|
|
return FULL_SYNC;
|
|
|
|
|
}
|
2024-04-16 07:26:37 +02:00
|
|
|
|
2024-04-29 05:24:21 +02:00
|
|
|
// app does not have the correct partners synced
|
2024-06-14 18:29:32 -04:00
|
|
|
const partnerIds = await getMyPartnerIds({ userId: auth.user.id, repository: this.partnerRepository });
|
|
|
|
|
const userIds = [auth.user.id, ...partnerIds];
|
2024-04-29 05:24:21 +02:00
|
|
|
if (!setIsEqual(new Set(userIds), new Set(dto.userIds))) {
|
|
|
|
|
return FULL_SYNC;
|
2024-04-16 07:26:37 +02:00
|
|
|
}
|
|
|
|
|
|
2024-10-10 11:53:53 -04:00
|
|
|
await this.requireAccess({ auth, permission: Permission.TIMELINE_READ, ids: dto.userIds });
|
2024-04-29 05:24:21 +02:00
|
|
|
|
2024-04-16 07:26:37 +02:00
|
|
|
const limit = 10_000;
|
|
|
|
|
const upserted = await this.assetRepository.getChangedDeltaSync({ limit, updatedAfter: dto.updatedAfter, userIds });
|
|
|
|
|
|
2024-04-29 05:24:21 +02:00
|
|
|
// too many changes, need to do a full sync
|
2024-04-16 07:26:37 +02:00
|
|
|
if (upserted.length === limit) {
|
2024-04-29 05:24:21 +02:00
|
|
|
return FULL_SYNC;
|
2024-04-16 07:26:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const deleted = await this.auditRepository.getAfter(dto.updatedAfter, {
|
2024-04-29 05:24:21 +02:00
|
|
|
userIds,
|
2024-04-16 07:26:37 +02:00
|
|
|
entityType: EntityType.ASSET,
|
|
|
|
|
action: DatabaseAction.DELETE,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const result = {
|
|
|
|
|
needsFullSync: false,
|
2024-04-29 05:24:21 +02:00
|
|
|
upserted: upserted
|
|
|
|
|
// do not return archived assets for partner users
|
2025-05-06 12:12:48 -05:00
|
|
|
.filter(
|
|
|
|
|
(a) =>
|
|
|
|
|
a.ownerId === auth.user.id || (a.ownerId !== auth.user.id && a.visibility === AssetVisibility.TIMELINE),
|
|
|
|
|
)
|
2024-04-29 05:24:21 +02:00
|
|
|
.map((a) =>
|
|
|
|
|
mapAsset(a, {
|
|
|
|
|
auth,
|
|
|
|
|
stripMetadata: false,
|
|
|
|
|
// ignore stacks for non partner users
|
2024-06-14 18:29:32 -04:00
|
|
|
withStack: a.ownerId === auth.user.id,
|
2024-04-29 05:24:21 +02:00
|
|
|
}),
|
|
|
|
|
),
|
2024-04-16 07:26:37 +02:00
|
|
|
deleted,
|
|
|
|
|
};
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
}
|