From e1b38802b86fcd6a25e1ed21e494c5287f73b1d4 Mon Sep 17 00:00:00 2001 From: cyhhao Date: Sat, 18 Feb 2023 00:01:29 +0800 Subject: [PATCH] queue abstract --- src/common/queue-task.ts | 77 ++++++++++++++++++++++++++++ src/storage/SLIStorage.ts | 103 +++++++++++++++++++++++++------------- 2 files changed, 144 insertions(+), 36 deletions(-) create mode 100644 src/common/queue-task.ts diff --git a/src/common/queue-task.ts b/src/common/queue-task.ts new file mode 100644 index 0000000..f96ac9c --- /dev/null +++ b/src/common/queue-task.ts @@ -0,0 +1,77 @@ +export class QueueTask { + maxRetry: number = 0 + queueInterval: number = 0 + maxPending: number = 0 + retryInterval: number = 0 + + lastTaskScheduled: number = 0 + pending: number = 0 + + checkPointWait!: Promise + checkPointResolve: any + + constructor({ + maxRetry = 10, + queueInterval = 0, + maxPending = 0, + retryInterval = 0, + }) { + this.maxRetry = maxRetry + this.queueInterval = queueInterval + this.maxPending = maxPending + this.retryInterval = retryInterval + this.refreshCheckPoint() + } + + async refreshCheckPoint() { + this.checkPointWait = new Promise((resolve) => { + this.checkPointResolve = resolve + }) + } + + async tickThread() { + return new Promise(async (resolve) => { + let now = Date.now().valueOf() + if (this.maxPending > 0) { + while (this.pending >= this.maxPending) { + await this.checkPointWait + this.checkPointWait = new Promise((resolve) => { + this.checkPointResolve = resolve + }) + } + } + if (now - this.lastTaskScheduled < this.queueInterval) { + let delta = this.queueInterval - (now - this.lastTaskScheduled) + setTimeout(() => { + resolve(true) + }, delta) + this.lastTaskScheduled = delta + return + } else { + this.lastTaskScheduled = now + resolve(true) + return + } + }) + } + + async run(func: Function, retry: number = this.maxRetry): Promise { + let lastError + for (let i = 0; i < retry; i++) { + await this.tickThread() + this.pending++ + try { + let res = await func() + this.pending-- + this.checkPointResolve(true) + return res + } catch (err: any) { + lastError = err + this.pending-- + this.checkPointResolve(true) + new Promise((r) => setTimeout(r, this.retryInterval)) + } + } + if (lastError) throw lastError + } +} diff --git a/src/storage/SLIStorage.ts b/src/storage/SLIStorage.ts index a2ee0e9..a09fd6d 100644 --- a/src/storage/SLIStorage.ts +++ b/src/storage/SLIStorage.ts @@ -4,6 +4,7 @@ import { ethers } from "ethers" import ipfsConf from "../config/ipfs.js" import axios from "axios" import { Git3Protocol } from "../common/git3-protocol.js" +import { QueueTask } from "../common/queue-task.js" export class SLIStorage implements Storage { repoName: string @@ -16,8 +17,11 @@ export class SLIStorage implements Storage { maxBatchSize = 20 commitTimer: any - storageIntervalLimit = 500 - storageCallLastTime = 0 + taskRunning = 0 + waitTasks: Promise + uploadDone: any + + storageTask: QueueTask constructor(protocol: Git3Protocol) { this.repoName = protocol.repoName @@ -28,6 +32,17 @@ export class SLIStorage implements Storage { "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkaWQ6ZXRocjoweGFEQTdCOWFlQTdGNTc2ZDI5NzM0ZWUxY0Q2ODVFMzc2OWNCM2QwRDEiLCJpc3MiOiJuZnQtc3RvcmFnZSIsImlhdCI6MTY3NTQ5NDYwMDkzMiwibmFtZSI6ImZ2bS1oYWNrc29uIn0.YBqfsj_LTZSJPKc0OH586avnQNqove_Htzl5rrToXTk" this.txManager = new TxManager(this.contract, protocol.chainId, protocol.netConfig.txConst) + + this.waitTasks = new Promise((resolve, reject) => { + this.uploadDone = resolve + }) + + this.storageTask = new QueueTask({ + maxRetry: 10, + queueInterval: 400, + maxPending: 20, + retryInterval: 500, + }) } async repoRoles(): Promise { @@ -75,10 +90,10 @@ export class SLIStorage implements Storage { if (this.commitTimer) clearTimeout(this.commitTimer) if (this.batchQueue.length >= this.maxBatchSize) { - await this.commitQueue("full") + this.commitQueue("full") } else { - this.commitTimer = setTimeout(async () => { - await this.commitQueue("timeout") + this.commitTimer = setTimeout(() => { + this.commitQueue("timeout") }, 3000) } @@ -98,6 +113,7 @@ export class SLIStorage implements Storage { } async uploadCommit(): Promise { + await this.waitTasks return Status.SUCCEED // try { // await this.commitQueue("uploadCommit") @@ -110,16 +126,32 @@ export class SLIStorage implements Storage { } async commitQueue(reason: string) { + this.taskRunning += 1 + let queue = this.batchQueue this.batchQueue = [] console.error(`[${reason}] commit queue length ${queue.length}`) if (queue.length === 0) return - await this.txManager.SendCall("batchUpload", [ - Buffer.from(this.repoName), - queue.map((i) => Buffer.from(i.path)), - queue.map((i) => Buffer.from(i.cid)), - ]) + + let err + try { + await this.txManager.SendCall("batchUpload", [ + Buffer.from(this.repoName), + queue.map((i) => Buffer.from(i.path)), + queue.map((i) => Buffer.from(i.cid)), + ]) + err = null + } catch (error: any) { + this.txManager.CancelAll() + console.error(`upload failed: ${error}`) + err = error + } + this.taskRunning -= 1 + if (this.taskRunning === 0) { + if (err) this.uploadDone(Status.FAILED) + else this.uploadDone(Status.SUCCEED) + } } remove(path: string): Promise { @@ -159,26 +191,10 @@ export class SLIStorage implements Storage { } async storeIPFS(data: Buffer): Promise { - const RETRY_TIMES = 10 const TIMEOUT = 30 - let response - let lastError - - // while (this.storageAPICallCount >= this.storageAPILimit) { - // await new Promise((r) => setTimeout(r, 1000)) - // } - - for (let i = 0; i < RETRY_TIMES; i++) { - try { - while ( - Date.now().valueOf() - this.storageCallLastTime < - this.storageIntervalLimit - ) { - await new Promise((r) => setTimeout(r, this.storageIntervalLimit / 2)) - } - this.storageCallLastTime = Date.now().valueOf() - - response = await axios.post("https://api.nft.storage/upload", data, { + try { + let cid = this.storageTask.run(async () => { + let response = await axios.post("https://api.nft.storage/upload", data, { headers: { "Content-Type": "application/octet-stream", Authorization: this.auth, @@ -188,14 +204,29 @@ export class SLIStorage implements Storage { if (response.status == 200) { return response.data.value.cid } else { - lastError = response.status + throw new Error(`response code: ${response.status}`) } - } catch (e) { - //pass - lastError = e - new Promise((r) => setTimeout(r, 1000)) - } + }) + return cid + } catch (e) { + throw new Error(`store ipfs failed: ${e}`) } - throw new Error(`store ipfs failed: ${response?.status} ${lastError}`) + + // for (let i = 0; i < RETRY_TIMES; i++) { + // try { + // while ( + // Date.now().valueOf() - this.storageCallLastTime < + // this.storageIntervalLimit + // ) { + // await new Promise((r) => setTimeout(r, this.storageIntervalLimit / 2)) + // } + // this.storageCallLastTime = Date.now().valueOf() + + // } catch (e) { + // //pass + // lastError = e + // new Promise((r) => setTimeout(r, 1000)) + // } + // } } }