queue to batch upload

master
cyhhao 2 years ago
parent e3bb76e7bd
commit abe9c727d9

@ -37,8 +37,8 @@ export class TxManager {
this.cancel = false this.cancel = false
this.blockTimeSec = constOptions.blockTimeSec || 3 this.blockTimeSec = constOptions.blockTimeSec || 3
this.gasLimitRatio = constOptions.gasLimitRatio || 1.2 this.gasLimitRatio = constOptions.gasLimitRatio || 1.2
this.rbfTimes = constOptions.rbfTimes || 3 this.rbfTimes = constOptions.rbfTimes || 5
this.boardcastTimes = constOptions.boardcastTimes || 3 this.boardcastTimes = constOptions.boardcastTimes || 5
this.waitDistance = constOptions.waitDistance || 10 this.waitDistance = constOptions.waitDistance || 10
this.minRBFRatio = constOptions.minRBFRatio || 1.3 this.minRBFRatio = constOptions.minRBFRatio || 1.3
} }
@ -87,6 +87,7 @@ export class TxManager {
} }
async SendCall(_method: string, _args: any[]): Promise<any> { async SendCall(_method: string, _args: any[]): Promise<any> {
let lastError: any = null
const nonce = await this.getNonce() const nonce = await this.getNonce()
if (this.queueCurrNonce < 0) this.queueCurrNonce = nonce if (this.queueCurrNonce < 0) this.queueCurrNonce = nonce
@ -180,6 +181,7 @@ export class TxManager {
} else { } else {
console.error("[tx-manager] sendTransaction", nonce, e.code, e.message) console.error("[tx-manager] sendTransaction", nonce, e.code, e.message)
} }
lastError = e
// console.error( // console.error(
// "[tx-manager] sendTransaction", // "[tx-manager] sendTransaction",
// nonce, // nonce,
@ -239,6 +241,7 @@ export class TxManager {
return receipt return receipt
} catch (e) { } catch (e) {
// ignore // ignore
lastError = e
} }
} else { } else {
// send first time failed, wait 1s then try again // send first time failed, wait 1s then try again
@ -249,6 +252,6 @@ export class TxManager {
rbfCount++ rbfCount++
} }
throw new Error(`send tx failed: ${nonce}`) throw new Error(`send tx failed: ${nonce} ${lastError}`)
} }
} }

File diff suppressed because one or more lines are too long

@ -41,9 +41,11 @@ const evmNetworks: Record<number, any> = {
}, },
], ],
txConst: { txConst: {
blockTimeSec: 15, blockTimeSec: 12,
rbfTimes: 6,
boardcastTimes: 5,
}, },
contracts: { git3: "0x80F4b977F9C1d21FF6fDDd56C3CA59eeD5745B58" }, contracts: { git3: "0x51bb7F23193b88696D25EAec7E3293a2C96e55Ee" },
}, },
3334: { 3334: {
name: "Web3Q Galileo", name: "Web3Q Galileo",

@ -145,6 +145,10 @@ class Git {
for (let obj of objects) { for (let obj of objects) {
pendings.push(this.putObject(obj)) pendings.push(this.putObject(obj))
} }
let resault = await this.storage.uploadCommit()
if(resault!= Status.SUCCEED){
return `error ${dst} upload commit fail`
}
let resaults = await Promise.all(pendings) let resaults = await Promise.all(pendings)
for (let res of resaults) { for (let res of resaults) {
if (res != Status.SUCCEED) { if (res != Status.SUCCEED) {

@ -13,17 +13,14 @@ export class ETHStorage implements Storage {
this.repoName = protocol.repoName this.repoName = protocol.repoName
this.contract = protocol.contract this.contract = protocol.contract
this.wallet = protocol.wallet this.wallet = protocol.wallet
this.txManager = new TxManager( this.txManager = new TxManager(this.contract, protocol.chainId, protocol.netConfig.txConst)
this.contract, }
protocol.chainId, async uploadCommit(): Promise<Status> {
protocol.netConfig.txConst return Promise.resolve(Status.SUCCEED)
)
} }
async repoRoles(): Promise<string[]> { async repoRoles(): Promise<string[]> {
let owner = await this.contract.repoNameToOwner( let owner = await this.contract.repoNameToOwner(Buffer.from(this.repoName))
Buffer.from(this.repoName)
)
if (owner === ethers.constants.AddressZero) return [] if (owner === ethers.constants.AddressZero) return []
return [owner] return [owner]
} }
@ -34,10 +31,7 @@ export class ETHStorage implements Storage {
} }
async download(path: string): Promise<[Status, Buffer]> { async download(path: string): Promise<[Status, Buffer]> {
const res = await this.contract.download( const res = await this.contract.download(Buffer.from(this.repoName), Buffer.from(path))
Buffer.from(this.repoName),
Buffer.from(path)
)
const buffer = Buffer.from(res[0].slice(2), "hex") const buffer = Buffer.from(res[0].slice(2), "hex")
console.error(`=== download file ${path} succeed ===`) console.error(`=== download file ${path} succeed ===`)
return [Status.SUCCEED, buffer] return [Status.SUCCEED, buffer]
@ -66,9 +60,7 @@ export class ETHStorage implements Storage {
} }
async listRefs(): Promise<Ref[]> { async listRefs(): Promise<Ref[]> {
const res: string[][] = await this.contract.listRefs( const res: string[][] = await this.contract.listRefs(Buffer.from(this.repoName))
Buffer.from(this.repoName)
)
let refs = res.map((i) => ({ let refs = res.map((i) => ({
ref: Buffer.from(i[1].slice(2), "hex") ref: Buffer.from(i[1].slice(2), "hex")
.toString("utf8") .toString("utf8")
@ -96,10 +88,7 @@ export class ETHStorage implements Storage {
} }
async removeRef(path: string): Promise<Status> { async removeRef(path: string): Promise<Status> {
await this.contract.delRef( await this.contract.delRef(Buffer.from(this.repoName), Buffer.from(path))
Buffer.from(this.repoName),
Buffer.from(path)
)
return Status.SUCCEED return Status.SUCCEED
} }
} }

@ -10,32 +10,28 @@ export class SLIStorage implements Storage {
wallet: ethers.Wallet wallet: ethers.Wallet
contract: ethers.Contract contract: ethers.Contract
txManager: TxManager txManager: TxManager
auth: string auth: string
batchQueue: Record<string, string>[] = []
maxBatchSize = 20
commitTimer: any
storageIntervalLimit = 500
storageCallLastTime = 0
constructor(protocol: Git3Protocol) { constructor(protocol: Git3Protocol) {
this.repoName = protocol.repoName this.repoName = protocol.repoName
this.contract = protocol.contract this.contract = protocol.contract
this.wallet = protocol.wallet this.wallet = protocol.wallet
this.txManager = new TxManager( this.txManager = new TxManager(this.contract, protocol.chainId, protocol.netConfig.txConst)
this.contract,
protocol.chainId,
protocol.netConfig.txConst
)
this.auth = this.auth =
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkaWQ6ZXRocjoweGFEQTdCOWFlQTdGNTc2ZDI5NzM0ZWUxY0Q2ODVFMzc2OWNCM2QwRDEiLCJpc3MiOiJuZnQtc3RvcmFnZSIsImlhdCI6MTY3NTQ5NDYwMDkzMiwibmFtZSI6ImZ2bS1oYWNrc29uIn0.YBqfsj_LTZSJPKc0OH586avnQNqove_Htzl5rrToXTk" "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkaWQ6ZXRocjoweGFEQTdCOWFlQTdGNTc2ZDI5NzM0ZWUxY0Q2ODVFMzc2OWNCM2QwRDEiLCJpc3MiOiJuZnQtc3RvcmFnZSIsImlhdCI6MTY3NTQ5NDYwMDkzMiwibmFtZSI6ImZ2bS1oYWNrc29uIn0.YBqfsj_LTZSJPKc0OH586avnQNqove_Htzl5rrToXTk"
this.txManager = new TxManager( this.txManager = new TxManager(this.contract, protocol.chainId, protocol.netConfig.txConst)
this.contract,
protocol.chainId,
protocol.netConfig.txConst
)
} }
async repoRoles(): Promise<string[]> { async repoRoles(): Promise<string[]> {
let owner = await this.contract.repoNameToOwner( let owner = await this.contract.repoNameToOwner(Buffer.from(this.repoName))
Buffer.from(this.repoName)
)
if (owner === ethers.constants.AddressZero) return [] if (owner === ethers.constants.AddressZero) return []
return [owner] return [owner]
} }
@ -46,17 +42,11 @@ export class SLIStorage implements Storage {
} }
async download(path: string): Promise<[Status, Buffer]> { async download(path: string): Promise<[Status, Buffer]> {
const res = await this.contract.download( const res = await this.contract.download(Buffer.from(this.repoName), Buffer.from(path))
Buffer.from(this.repoName),
Buffer.from(path)
)
const buffer = Buffer.from(res.slice(2), "hex") const buffer = Buffer.from(res.slice(2), "hex")
const cid = buffer.toString("utf8") const cid = buffer.toString("utf8")
for (let i = 0; i < ipfsConf.gateways.length; i++) { for (let i = 0; i < ipfsConf.gateways.length; i++) {
let gateway = let gateway = ipfsConf.gateways[Math.floor(Math.random() * ipfsConf.gateways.length)] //random get rpc
ipfsConf.gateways[
Math.floor(Math.random() * ipfsConf.gateways.length)
] //random get rpc
try { try {
let response = await axios.get(gateway + cid, { let response = await axios.get(gateway + cid, {
responseType: "arraybuffer", responseType: "arraybuffer",
@ -79,12 +69,25 @@ export class SLIStorage implements Storage {
try { try {
console.error(`=== uploading file ${path} ===`) console.error(`=== uploading file ${path} ===`)
const cid = await this.storeIPFS(file) const cid = await this.storeIPFS(file)
await this.txManager.SendCall("upload", [ console.error(`ipfs cid: ${cid}`)
Buffer.from(this.repoName), this.batchQueue.push({ path, cid })
Buffer.from(path),
Buffer.from(cid), if (this.commitTimer) clearTimeout(this.commitTimer)
])
console.error(`=== upload ${path} ${cid.slice(0, 6)} succeed ===`) if (this.batchQueue.length >= this.maxBatchSize) {
await this.commitQueue("full")
} else {
this.commitTimer = setTimeout(async () => {
await this.commitQueue("timeout")
}, 3000)
}
// await this.txManager.SendCall("upload", [
// Buffer.from(this.repoName),
// Buffer.from(path),
// Buffer.from(cid),
// ])
console.error(`=== upload ${path} ${cid.slice(-6, -1)} succeed ===`)
return Status.SUCCEED return Status.SUCCEED
} catch (error: any) { } catch (error: any) {
@ -94,14 +97,37 @@ export class SLIStorage implements Storage {
} }
} }
async uploadCommit(): Promise<Status> {
return Status.SUCCEED
// try {
// await this.commitQueue("uploadCommit")
// return Status.SUCCEED
// } catch (error: any) {
// this.txManager.CancelAll()
// console.error(`uploadCommit failed: ${error}`)
// return Status.FAILED
// }
}
async commitQueue(reason: string) {
let queue = this.batchQueue
this.batchQueue = []
console.error(`[${reason}] commit queue length ${queue.length}`)
if (queue.length === 0) return
await this.txManager.SendCall("batchUpload", [
Buffer.from(this.repoName),
queue.map((i) => Buffer.from(i.path)),
queue.map((i) => Buffer.from(i.cid)),
])
}
remove(path: string): Promise<Status> { remove(path: string): Promise<Status> {
throw new Error("Method not implemented.") throw new Error("Method not implemented.")
} }
async listRefs(): Promise<Ref[]> { async listRefs(): Promise<Ref[]> {
const res: string[][] = await this.contract.listRefs( const res: string[][] = await this.contract.listRefs(Buffer.from(this.repoName))
Buffer.from(this.repoName)
)
let refs = res.map((i) => ({ let refs = res.map((i) => ({
ref: Buffer.from(i[1].slice(2), "hex") ref: Buffer.from(i[1].slice(2), "hex")
.toString("utf8") .toString("utf8")
@ -128,10 +154,7 @@ export class SLIStorage implements Storage {
} }
async removeRef(path: string): Promise<Status> { async removeRef(path: string): Promise<Status> {
await this.contract.delRef( await this.contract.delRef(Buffer.from(this.repoName), Buffer.from(path))
Buffer.from(this.repoName),
Buffer.from(path)
)
return Status.SUCCEED return Status.SUCCEED
} }
@ -139,26 +162,40 @@ export class SLIStorage implements Storage {
const RETRY_TIMES = 10 const RETRY_TIMES = 10
const TIMEOUT = 30 const TIMEOUT = 30
let response let response
let lastError
// while (this.storageAPICallCount >= this.storageAPILimit) {
// await new Promise((r) => setTimeout(r, 1000))
// }
for (let i = 0; i < RETRY_TIMES; i++) { for (let i = 0; i < RETRY_TIMES; i++) {
try { try {
response = await axios.post( while (
"https://api.nft.storage/upload", Date.now().valueOf() - this.storageCallLastTime <
data, this.storageIntervalLimit
{ ) {
await new Promise((r) => setTimeout(r, this.storageIntervalLimit / 2))
}
this.storageCallLastTime = Date.now().valueOf()
response = await axios.post("https://api.nft.storage/upload", data, {
headers: { headers: {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
Authorization: this.auth, Authorization: this.auth,
}, },
timeout: TIMEOUT * 1000, timeout: TIMEOUT * 1000,
} })
)
if (response.status == 200) { if (response.status == 200) {
return response.data.value.cid return response.data.value.cid
} else {
lastError = response.status
} }
} catch (e) { } catch (e) {
//pass //pass
lastError = e
new Promise((r) => setTimeout(r, 1000))
} }
} }
throw new Error(`store ipfs failed: ${response?.status}`) throw new Error(`store ipfs failed: ${response?.status} ${lastError}`)
} }
} }

@ -19,4 +19,7 @@ export interface Storage {
listRefs(): Promise<Ref[]> listRefs(): Promise<Ref[]>
setRef(path: string, sha: string): Promise<Status> setRef(path: string, sha: string): Promise<Status>
removeRef(path: string): Promise<Status> removeRef(path: string): Promise<Status>
// for batch upload
uploadCommit(): Promise<Status>
} }

@ -0,0 +1,26 @@
import axios from "axios"
import FormData from "form-data"
async function main() {
let auth =
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkaWQ6ZXRocjoweGFEQTdCOWFlQTdGNTc2ZDI5NzM0ZWUxY0Q2ODVFMzc2OWNCM2QwRDEiLCJpc3MiOiJuZnQtc3RvcmFnZSIsImlhdCI6MTY3NTQ5NDYwMDkzMiwibmFtZSI6ImZ2bS1oYWNrc29uIn0.YBqfsj_LTZSJPKc0OH586avnQNqove_Htzl5rrToXTk"
let data = new FormData()
data.append("file", Buffer.from("hello world1"), {
filename: "hello.txt",
})
data.append("file", Buffer.from("hello world2"), {
filename: "hello.txt",
})
let response = await axios.post("https://api.nft.storage/upload", data, {
headers: {
"Content-Type": "multipart/form-data",
Authorization: auth,
},
})
console.log(response.status, JSON.stringify(response.data))
}
main()
Loading…
Cancel
Save