首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TypeScript中的资源池

TypeScript中的资源池
EN

Code Review用户
提问于 2016-11-07 02:52:38
回答 1查看 842关注 0票数 2

作为一项练习,并且为了避免在每个节点库中包装回调API,我似乎把一个资源池类组合在一起。预期用途如下:

代码语言:javascript
复制
const pool = new Pool({
  createConnection() { return ... Promise<MyConnection> }
  destroyConnection(c: MyConnection) { return ... any }
})

const connection = await pool.resource()

//later...
pool.release(connection)

//maybe if error occurs in handlers setup in `createConnection`
pool.destroy(connection)

//and when finished
pool.drain().then(() => pool.close())

我的目标是让这种行为尽可能不令人惊讶。我想要反馈!

下面是指向要旨的链接。

代码语言:javascript
复制
import Queue from 'double-ended-queue'

const POOL_ID = Symbol('pool-id')
const IS_DIRTY = Symbol('dirty')

function isDirty (resource) {
  return resource[IS_DIRTY]
}

function poolId (resource) {
  return resource[POOL_ID]
}

function range (num) {
  return Array.from(Array(num))
}

class Deferred<T> {
  public promise: Promise<T>
  public resolve: (answer: T) => void
  public reject: (error: Error) => void
  constructor () {
    this.promise = new Promise((resolve, reject) => {
      this.resolve = resolve
      this.reject = reject
    })
  }
}

function isDefined (thing: any) {
  return typeof thing !== 'undefined'
}

class Pool<T> {
  private _closing = false
  private _min: number = 0
  private _max: number = 10
  private _idle: number = 30000
  private _resourceIds: Queue<number>
  private _resources: Queue<{ expiration: number, resource: T}>
  private _issued: { [key: string]: T } = {}
  private _pending: Queue<Deferred<T>> = new Queue<Deferred<T>>()
  private _timeoutToken: any
  private _size: number = 0
  private _initializing = false
  private _createResource: () => Promise<T>
  private _destroyResource: (resource: T) => Promise<any>
  constructor ({ createConnection, destroyConnection, idle, min, max }) {
    this._max = max || this._max
    this._min = min || this._min
    this._idle = idle || this._idle
    this._resources = new Queue<{ expiration: number, resource: T}>(max)
    this._createResource = () => {
      this._size++
      return Promise.resolve(createConnection())
    }  
    this._destroyResource = (x) => {
      this._size--
      return Promise.resolve(destroyConnection(x))
    }
    this._resourceIds = new Queue(range(this.max).map((_, i) => 1 + i))

    if (this.min > 0) {
      this._initializing = true
      Promise.all(range(this.min).map(this._createResource))
      .then(resources => {
        this._resources.enqueue(
          ...resources.map(resource => ({ expiration: Date.now(), resource }))
        )
        this._pending.toArray().forEach(x => x.resolve(this.resource()))
        this._initializing = false
        this._pending.clear()
      })
    }
  }

  /**
   * Current number of resources in pool
   */
  public get size () { return this._size }

  /**
   * Current number of resources in use
   */
  public get issued () { return this.max - this._resourceIds.length }

  /**
   * Maximum size of pool
   */
  public get max () { return this._max }

  /**
   * Minimum size of pool
   */
  public get min () { return this._min }

  /**
   * Approximate lifetime (ms) of an unused resource
   */
  public get idle () { return this._idle }

  /**
   * Forcibly destroy all created resources
   * To close the pool gracefully, call #drain first
   * A subsequent call to #resource is a programming error  
   */
  public close () {
    this._closing = true
    return Promise.all(Object.keys(this._issued)
      .map(x => this._issued[x])
      .concat(this._resources.toArray().map(x => x.resource))
      .map(x => this.destroy(x)))
  }
  /**
   * Wait until all resources have been released
   * This will not prevent resources from being issued. (calls to #resource)
   * Usually followed by a call to #close.
   * Calling close while draining, will short circuit this process
   */
  public drain () {
    return new Promise ((resolve) => {
      !function wait () {
        if (this.issued === 0) {
          resolve()
        } else {
          !this._closing && setTimeout(wait, 100) || resolve()
        } 
      }()
    })
  }

  /**
   * Destroy a resource. This is useful if you deterime the resource is in an error state.
   * It can be called at any time. 
   * If a destroyed resource is currently issued it will also be released
   */
  public destroy (resource) {
    if (isDirty(resource)) {
      return Promise.resolve()
    }
    resource[IS_DIRTY] = true
    if (poolId(resource)) {
      this.release(resource)
    }

    return this._destroyResource(resource)
  }

  /**
   * Immediately release a resource back into the pool. 
   * If the resource has not also been destroyed it may be recycled immediately.
   * Released resources that remain unused for #idle milliseconds will be destroyed.
   */
  public release (resource) {
    const resourceId = poolId(resource)

    if (!isDirty(resource) && this._pending.length) {
      return this._pending.dequeue().resolve(resource)
    }
    delete this._issued[resourceId]
    delete resource[POOL_ID]

    this._resourceIds.enqueue(resourceId)
    !isDirty(resource) && this._queuePossibleDestruction(resource)
  }

  /**
   * Request a resource, if none exist, request will be queued or one will be created. 
   * Otherwise, previously released resources are issued.
   */
  public resource () {
    if (this._closing) {
      return Promise.reject(new Error('Cannot issue resource while pool is closing...'))
    }

    if (!this._resourceIds.length || this._initializing) {
      const futureAvailableResource = new Deferred<T>()
      this._pending.enqueue(futureAvailableResource)
      return futureAvailableResource.promise
    }

    const resourceId = this._resourceIds.dequeue()

    if (this._resources.length) {
      const { resource } = this._resources.dequeue()
      if (isDirty(resource)) {
        this._resourceIds.enqueue(resourceId)
        return this.resource()
      }
      return Promise.resolve(this._dispatchResource(resource, resourceId))
    }

    return this._createResource().then(resource => {
      return this._dispatchResource(resource, resourceId)
    })
  }

  private _dispatchResource (resource: any, resourceId: number) { 
    resource[POOL_ID] = resourceId
    this._issued[resourceId] = resource
    return resource
  }

  private _queuePossibleDestruction (resource :T) {
    this._resources.enqueue({ expiration: Date.now() + this._idle, resource })
    if (!this._timeoutToken) {
      this._scheduleNextCleanup(this._idle)
    }
  }

  private _cleanup () {
    if (this.size === this._min || !this._resources.length) {
      return this._timeoutToken = null
    }

    const { expiration } = this._resources.peekFront(),
      expiredMsAgo = expiration - Date.now()

    if (expiredMsAgo <= 0) {
      const { resource } = this._resources.dequeue()
      this.destroy(resource)
      this._scheduleNextCleanup(100)
    } else {
      this._scheduleNextCleanup(expiredMsAgo + 1)
    }
  }

  private _scheduleNextCleanup(ms: number) {
    if (!this._closing) {
      this._timeoutToken = setTimeout(() => {
        this._cleanup()    
      }, ms)
    }
  }
}
EN

回答 1

Code Review用户

发布于 2018-03-10 16:09:12

有趣的用例!核心功能让我想起了我自己的AsyncQueue实现。

不过,我承认,我对您的业务逻辑并没有太多的了解。我希望下面的笔记对你还是有用的。

  • 在对象析构构造函数({ createConnection,destroyConnection,空闲,min,max })中使用默认值({ this._max = max欧元/ this._max this._min = min \x- this._min this._idle =空闲- this._idle构造函数({ createConnection,destroyConnection,空闲= 30000,min = 0,max = 10 }) ){
  • 我将声明size为一个函数,而不是一个getter属性,因为它可以并且将在池的生存期内返回一个不同的值。函数调用更好地传递读取当前值的信息,而读取的属性可能被误认为是读取常数的一次性操作。因此,我发现其他值(如maxmin )的getter属性非常适合! (附带说明:为什么Array#length不是一个函数也可能被同样的论证所争议)。
  • 添加操作符优先级不完全明确// In漏出的括号!this._closing && setTimeout(等待,100)而今,我甚至更倾向于使用if-否则块,而不是双嵌套的短路逻辑。
  • async注释返回承诺的函数。目前,一些函数在多个分支上没有返回类型注释和返回承诺。很容易引入另一个分支并直接返回一个原始值(例如return this.size),而不是包装在承诺中。使用类型系统可以有效地防止这些错误。
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/146355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档