queue abstract

master
cyhhao 2 years ago
parent 4111f842d6
commit e1b38802b8

@ -0,0 +1,77 @@
export class QueueTask {
maxRetry: number = 0
queueInterval: number = 0
maxPending: number = 0
retryInterval: number = 0
lastTaskScheduled: number = 0
pending: number = 0
checkPointWait!: Promise<void>
checkPointResolve: any
constructor({
maxRetry = 10,
queueInterval = 0,
maxPending = 0,
retryInterval = 0,
}) {
this.maxRetry = maxRetry
this.queueInterval = queueInterval
this.maxPending = maxPending
this.retryInterval = retryInterval
this.refreshCheckPoint()
}
async refreshCheckPoint() {
this.checkPointWait = new Promise((resolve) => {
this.checkPointResolve = resolve
})
}
async tickThread() {
return new Promise(async (resolve) => {
let now = Date.now().valueOf()
if (this.maxPending > 0) {
while (this.pending >= this.maxPending) {
await this.checkPointWait
this.checkPointWait = new Promise((resolve) => {
this.checkPointResolve = resolve
})
}
}
if (now - this.lastTaskScheduled < this.queueInterval) {
let delta = this.queueInterval - (now - this.lastTaskScheduled)
setTimeout(() => {
resolve(true)
}, delta)
this.lastTaskScheduled = delta
return
} else {
this.lastTaskScheduled = now
resolve(true)
return
}
})
}
async run(func: Function, retry: number = this.maxRetry): Promise<any> {
let lastError
for (let i = 0; i < retry; i++) {
await this.tickThread()
this.pending++
try {
let res = await func()
this.pending--
this.checkPointResolve(true)
return res
} catch (err: any) {
lastError = err
this.pending--
this.checkPointResolve(true)
new Promise((r) => setTimeout(r, this.retryInterval))
}
}
if (lastError) throw lastError
}
}

@ -4,6 +4,7 @@ import { ethers } from "ethers"
import ipfsConf from "../config/ipfs.js"
import axios from "axios"
import { Git3Protocol } from "../common/git3-protocol.js"
import { QueueTask } from "../common/queue-task.js"
export class SLIStorage implements Storage {
repoName: string
@ -16,8 +17,11 @@ export class SLIStorage implements Storage {
maxBatchSize = 20
commitTimer: any
storageIntervalLimit = 500
storageCallLastTime = 0
taskRunning = 0
waitTasks: Promise<void>
uploadDone: any
storageTask: QueueTask
constructor(protocol: Git3Protocol) {
this.repoName = protocol.repoName
@ -28,6 +32,17 @@ export class SLIStorage implements Storage {
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkaWQ6ZXRocjoweGFEQTdCOWFlQTdGNTc2ZDI5NzM0ZWUxY0Q2ODVFMzc2OWNCM2QwRDEiLCJpc3MiOiJuZnQtc3RvcmFnZSIsImlhdCI6MTY3NTQ5NDYwMDkzMiwibmFtZSI6ImZ2bS1oYWNrc29uIn0.YBqfsj_LTZSJPKc0OH586avnQNqove_Htzl5rrToXTk"
this.txManager = new TxManager(this.contract, protocol.chainId, protocol.netConfig.txConst)
this.waitTasks = new Promise((resolve, reject) => {
this.uploadDone = resolve
})
this.storageTask = new QueueTask({
maxRetry: 10,
queueInterval: 400,
maxPending: 20,
retryInterval: 500,
})
}
async repoRoles(): Promise<string[]> {
@ -75,10 +90,10 @@ export class SLIStorage implements Storage {
if (this.commitTimer) clearTimeout(this.commitTimer)
if (this.batchQueue.length >= this.maxBatchSize) {
await this.commitQueue("full")
this.commitQueue("full")
} else {
this.commitTimer = setTimeout(async () => {
await this.commitQueue("timeout")
this.commitTimer = setTimeout(() => {
this.commitQueue("timeout")
}, 3000)
}
@ -98,6 +113,7 @@ export class SLIStorage implements Storage {
}
async uploadCommit(): Promise<Status> {
await this.waitTasks
return Status.SUCCEED
// try {
// await this.commitQueue("uploadCommit")
@ -110,16 +126,32 @@ export class SLIStorage implements Storage {
}
async commitQueue(reason: string) {
this.taskRunning += 1
let queue = this.batchQueue
this.batchQueue = []
console.error(`[${reason}] commit queue length ${queue.length}`)
if (queue.length === 0) return
await this.txManager.SendCall("batchUpload", [
Buffer.from(this.repoName),
queue.map((i) => Buffer.from(i.path)),
queue.map((i) => Buffer.from(i.cid)),
])
let err
try {
await this.txManager.SendCall("batchUpload", [
Buffer.from(this.repoName),
queue.map((i) => Buffer.from(i.path)),
queue.map((i) => Buffer.from(i.cid)),
])
err = null
} catch (error: any) {
this.txManager.CancelAll()
console.error(`upload failed: ${error}`)
err = error
}
this.taskRunning -= 1
if (this.taskRunning === 0) {
if (err) this.uploadDone(Status.FAILED)
else this.uploadDone(Status.SUCCEED)
}
}
remove(path: string): Promise<Status> {
@ -159,26 +191,10 @@ export class SLIStorage implements Storage {
}
async storeIPFS(data: Buffer): Promise<string> {
const RETRY_TIMES = 10
const TIMEOUT = 30
let response
let lastError
// while (this.storageAPICallCount >= this.storageAPILimit) {
// await new Promise((r) => setTimeout(r, 1000))
// }
for (let i = 0; i < RETRY_TIMES; i++) {
try {
while (
Date.now().valueOf() - this.storageCallLastTime <
this.storageIntervalLimit
) {
await new Promise((r) => setTimeout(r, this.storageIntervalLimit / 2))
}
this.storageCallLastTime = Date.now().valueOf()
response = await axios.post("https://api.nft.storage/upload", data, {
try {
let cid = this.storageTask.run(async () => {
let response = await axios.post("https://api.nft.storage/upload", data, {
headers: {
"Content-Type": "application/octet-stream",
Authorization: this.auth,
@ -188,14 +204,29 @@ export class SLIStorage implements Storage {
if (response.status == 200) {
return response.data.value.cid
} else {
lastError = response.status
throw new Error(`response code: ${response.status}`)
}
} catch (e) {
//pass
lastError = e
new Promise((r) => setTimeout(r, 1000))
}
})
return cid
} catch (e) {
throw new Error(`store ipfs failed: ${e}`)
}
throw new Error(`store ipfs failed: ${response?.status} ${lastError}`)
// for (let i = 0; i < RETRY_TIMES; i++) {
// try {
// while (
// Date.now().valueOf() - this.storageCallLastTime <
// this.storageIntervalLimit
// ) {
// await new Promise((r) => setTimeout(r, this.storageIntervalLimit / 2))
// }
// this.storageCallLastTime = Date.now().valueOf()
// } catch (e) {
// //pass
// lastError = e
// new Promise((r) => setTimeout(r, 1000))
// }
// }
}
}

Loading…
Cancel
Save