Go语言中的sync.Pool怎么使用

寻技术 Go编程 2023年07月31日 116

本篇内容介绍了“Go语言中的sync.Pool怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

    1. 简介

    本文将介绍 Go 语言中的

    sync.Pool
    并发原语,包括
    sync.Pool
    的基本使用方法、使用注意事项等的内容。能够更好得使用
    sync.Pool
    来减少对象的重复创建,最大限度实现对象的重复使用,减少程序GC的压力,以及提升程序的性能。

    2. 问题引入

    2.1 问题描述

    这里我们实现一个简单的JSON序列化器,能够实现将一个

    map[string]int
    序列化为一个JSON字符串,实现如下:
    func IntToStringMap(m map[string]int) (string, error) {
       // 定义一个bytes.Buffer,用于缓存数据
       var buf bytes.Buffer
       buf.Write([]byte("{"))
       for k, v := range m {
          buf.WriteString(fmt.Sprintf(`"%s":%d,`, k, v))
       }
       if len(m) > 0 {
          buf.Truncate(buf.Len() - 1) // 去掉最后一个逗号
       }
       buf.Write([]byte("}"))
       return buf.String(), nil
    }

    这里使用

    bytes.Buffer
    来缓存数据,然后按照
    key:value
    的形式,将数据生成一个字符串,然后返回,实现是比较简单的。

    每次调用

    IntToStringMap
    方法时,都会创建一个
    bytes.Buffer
    来缓存中间结果,而
    bytes.Buffer
    其实是可以被重用的,因为序列化规则和其并没有太大的关系,其只是作为一个缓存区来使用而已。

    但是当前的实现为每次调用

    IntToStringMap
    时,都会创建一个
    bytes.Buffer
    ,如果在一个应用中,请求并发量非常高时,频繁创建和销毁
    bytes.Buffer
    将会带来较大的性能开销,会导致对象的频繁分配和垃圾回收,增加了内存使用量和垃圾回收的压力。

    那有什么方法能够让

    bytes.Buffer
    能够最大程度得被重复利用呢,避免重复的创建和回收呢?

    2.2 解决方案

    其实我们可以发现,为了让

    bytes.Buffer
    能够被重复利用,避免重复的创建和回收,我们此时只需要将
    bytes.Buffer
    缓存起来,在需要时,将其从缓存中取出;当用完后,便又将其放回到缓存池当中。这样子,便不需要每次调用
    IntToStringMap
    方法时,就创建一个
    bytes.Buffer

    这里我们可以自己实现一个缓存池,当需要对象时,可以从缓存池中获取,当不需要对象时,可以将对象放回缓存池中。

    IntToStringMap
    方法需要
    bytes.Buffer
    时,便从该缓存池中取,当用完后,便重新放回缓存池中,等待下一次的获取。下面是一个使用切片实现的一个
    bytes.Buffer
    缓存池。
    type BytesBufferPool struct {
       mu   sync.Mutex
       pool []*bytes.Buffer
    }
    
    func (p *BytesBufferPool) Get() *bytes.Buffer {
       p.mu.Lock()
       defer p.mu.Unlock()
       n := len(p.pool)
       if n == 0 {
          // 当缓存池中没有对象时,创建一个bytes.Buffer
          return &bytes.Buffer{}
       }
       // 有对象时,取出切片最后一个元素返回
       v := p.pool[n-1]
       p.pool[n-1] = nil
       p.pool = p.pool[:n-1]
       return v
    }
    
    func (p *BytesBufferPool) Put(buffer *bytes.Buffer) {
       if buffer == nil {
          return
       }
       // 将bytes.Buffer放入到切片当中
       p.mu.Lock()
       defer p.mu.Unlock()
       obj.Reset()
       p.pool = append(p.pool, buffer)
    }

    上面

    BytesBufferPool
    实现了一个
    bytes.Buffer
    的缓存池,其中
    Get
    方法用于从缓存池中取对象,如果没有对象,就创建一个新的对象返回;
    Put
    方法用于将对象重新放入
    BytesBufferPool
    当中,下面使用
    BytesBufferPool
    来优化
    IntToStringMap
    // 首先定义一个BytesBufferPool
    var buffers BytesBufferPool
    
    func IntToStringMap(m map[string]int) (string, error) {
       // bytes.Buffer不再自己创建,而是从BytesBufferPool中取出
       buf := buffers.Get()
       // 函数结束后,将bytes.Buffer重新放回缓存池当中
       defer buffers.Put(buf)
       buf.Write([]byte("{"))
       for k, v := range m {
          buf.WriteString(fmt.Sprintf(`"%s":%d,`, k, v))
       }
       if len(m) > 0 {
          buf.Truncate(buf.Len() - 1) // 去掉最后一个逗号
       }
       buf.Write([]byte("}"))
       return buf.String(), nil
    }

    到这里我们通过自己实现了一个缓存池,成功对

    InitToStringMap
    函数进行了优化,减少了
    bytes.Buffer
    对象频繁的创建和回收,在一定程度上提高了对象的频繁创建和回收。

    但是,

    BytesBufferPool
    这个缓存池的实现,其实存在几点问题,其一,只能用于缓存
    bytes.Buffer
    对象;其二,不能根据系统的实际情况,动态调整对象池中缓存对象的数量。假如某段时间并发量较高,
    bytes.Buffer
    对象被大量创建,用完后,重新放回
    BytesBufferPool
    之后,将永远不会被回收,这有可能导致内存浪费,严重一点,也会导致内存泄漏。

    既然自定义缓存池存在这些问题,那我们不禁要问,Go语言标准库中有没有提供了更方便的方式,来帮助我们缓存对象呢?

    别说,还真有,Go标准库提供了

    sync.Pool
    ,可以用来缓存那些需要频繁创建和销毁的对象,而且它支持缓存任何类型的对象,同时
    sync.Pool
    是可以根据系统的实际情况来调整缓存池中对象的数量,如果一个对象长时间未被使用,此时将会被回收掉。

    相对于自己实现的缓冲池,

    sync.Pool
    的性能更高,充分利用多核cpu的能力,同时也能够根据系统当前使用对象的负载,来动态调整缓冲池中对象的数量,而且使用起来也比较简单,可以说是实现无状态对象缓存池的不二之选。

    下面我们来看看

    sync.Pool
    的基本使用方式,然后将其应用到
    IntToStringMap
    方法的实现当中。

    3. 基本使用

    3.1 使用方式

    3.1.1 sync.Pool的基本定义

    sync.Pool
    的定义如下: 提供了
    Get
    ,
    Put
    两个方法:
    type Pool struct {
      noCopy noCopy
    
      local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
      localSize uintptr        // size of the local array
    
      victim     unsafe.Pointer // local from previous cycle
      victimSize uintptr        // size of victims array
    
      New func() any
    }
    func (p *Pool) Put(x any) {}
    func (p *Pool) Get() any {}
    • Get
      方法: 从
      sync.Pool
      中取出缓存对象
    • Put
      方法: 将缓存对象放入到
      sync.Pool
      当中
    • New
      函数: 在创建
      sync.Pool
      时,需要传入一个
      New
      函数,当
      Get
      方法获取不到对象时,此时将会调用
      New
      函数创建新的对象返回。

    3.1.2 使用方式

    当使用

    sync.Pool
    时,通常需要以下几个步骤:
    • 首先使用

      sync.Pool
      定义一个对象缓冲池
    • 在需要使用到对象时,从缓冲池中取出

    • 当使用完之后,重新将对象放回缓冲池中

    下面是一个简单的代码的示例,展示了使用

    sync.Pool
    大概的代码结构:
    type struct data{
        // 定义一些属性
    }
    //1. 创建一个data对象的缓存池
    var dataPool = sync.Pool{New: func() interface{} {
       return &data{}
    }}
    
    func Operation_A(){
        // 2. 需要用到data对象的地方,从缓存池中取出
        d := dataPool.Get().(*data)
        // 执行后续操作
        // 3. 将对象重新放入缓存池中
        dataPool.Put(d)
    }

    3.2 使用例子

    下面我们使用

    sync.Pool
    来对
    IntToStringMap
    进行改造,实现对
    bytes.Buffer
    对象的重用,同时也能够自动根据系统当前的状况,自动调整缓冲池中对象的数量。
    // 1. 定义一个bytes.Buffer的对象缓冲池
    var buffers sync.Pool = sync.Pool{
       New: func() interface{} {
          return &bytes.Buffer{}
       },
    }
    func IntToStringMap(m map[string]int) (string, error) {
       // 2. 在需要的时候,从缓冲池中取出一个bytes.Buffer对象
       buf := buffers.Get().(*bytes.Buffer)
       buf.Reset()
       // 3. 用完之后,将其重新放入缓冲池中
       defer buffers.Put(buf)
       buf.Write([]byte("{"))
       for k, v := range m {
          buf.WriteString(fmt.Sprintf(`"%s":%d,`, k, v))
       }
       if len(m) > 0 {
          buf.Truncate(buf.Len() - 1) // 去掉最后一个逗号
       }
       buf.Write([]byte("}"))
       return buf.String(), nil
    }

    上面我们使用

    sync.Pool
    实现了一个
    bytes.Buffer
    的缓冲池,在
    IntToStringMap
    函数中,我们从
    buffers
    中获取一个
    bytes.Buffer
    对象,并在函数结束时将其放回池中,避免了频繁创建和销毁
    bytes.Buffer
    对象的开销。

    同时,由于

    sync.Pool
    IntToStringMap
    调用不频繁的情况下,能够自动回收
    sync.Pool
    中的
    bytes.Buffer
    对象,无需用户操心,也能减小内存的压力。而且其底层实现也有考虑到多核cpu并发执行,每一个processor都会有其对应的本地缓存,在一定程度也减少了多线程加锁的开销。

    从上面可以看出,

    sync.Pool
    使用起来非常简单,但是其还是存在一些注意事项,如果使用不当的话,还是有可能会导致内存泄漏等问题的,下面就来介绍
    sync.Pool
    使用时的注意事项。

    4.使用注意事项

    4.1 需要注意放入对象的大小

    如果不注意放入

    sync.Pool
    缓冲池中对象的大小,可能出现
    sync.Pool
    中只存在几个对象,却占据了大量的内存,导致内存泄漏。

    这里对于有固定大小的对象,并不需要太过注意放入

    sync.Pool
    中对象的大小,这种场景出现内存泄漏的可能性小之又小。但是,如果放入
    sync.Pool
    中的对象存在自动扩容的机制,如果不注意放入
    sync.Pool
    中对象的大小,此时将很有可能导致内存泄漏。下面来看一个例子:
    func Sprintf(format string, a ...any) string {
       p := newPrinter()
       p.doPrintf(format, a)
       s := string(p.buf)
       p.free()
       return s
    }

    Sprintf
    方法根据传入的format和对应的参数,完成组装,返回对应的字符串结果。按照普通的思路,此时只需要申请一个
    byte
    数组,然后根据一定规则,将
    format
    参数
    的内容放入
    byte
    数组中,最终将
    byte
    数组转换为字符串返回即可。

    按照上面这个思路我们发现,其实每次使用到的

    byte
    数组是可复用的,并不需要重复构建。

    实际上

    Sprintf
    方法的实现也是如此,
    byte
    数组其实并非每次创建一个新的,而是会对其进行复用。其实现了一个
    pp
    结构体,
    format
    参数
    按照一定规则组装成字符串的职责,交付给
    pp
    结构体,同时
    byte
    数组作为
    pp
    结构体的成员变量。

    然后将

    pp
    的实例放入
    sync.Pool
    当中,实现
    pp
    重复使用目的,从而简介避免了重复创建
    byte
    数组导致频繁的GC,同时也提升了性能。下面是
    newPrinter
    方法的逻辑,获取
    pp
    结构体,都是从
    sync.Pool
    中获取:
    var ppFree = sync.Pool{
       New: func() any { return new(pp) },
    }
    
    // newPrinter allocates a new pp struct or grabs a cached one.
    func newPrinter() *pp {
        // 从ppFree中获取pp
       p := ppFree.Get().(*pp)
       // 执行一些初始化逻辑
       p.panicking = false
       p.erroring = false
       p.wrapErrs = false
       p.fmt.init(&p.buf)
       return p
    }

    下面回到上面的

    byte
    数组,此时其作为
    pp
    结构体的一个成员变量,用于字符串格式化的中间结果,定义如下:
    // Use simple []byte instead of bytes.Buffer to avoid large dependency.
    type buffer []byte
    
    type pp struct {
       buf buffer
       // 省略掉其他不相关的字段
    }

    这里看起来似乎没啥问题,但是其实是有可能存在内存浪费甚至内存泄漏的问题。假如此时存在一个非常长的字符串需要格式化,此时调用

    Sprintf
    来实现格式化,此时
    pp
    结构体中的
    buffer
    也同样需要不断扩容,直到能够存储整个字符串的长度为止,此时
    pp
    结构体中的
    buffer
    将会占据比较大的内存。

    Sprintf
    方法完成之后,重新将
    pp
    结构体放入
    sync.Pool
    当中,此时
    pp
    结构体中的
    buffer
    占据的内存将不会被释放。

    但是,如果下次调用

    Sprintf
    方法来格式化的字符串,长度并没有那么长,但是此时从
    sync.Pool
    中取出的
    pp
    结构体中的
    byte数组
    长度却是上次扩容之后的
    byte数组
    ,此时将会导致内存浪费,严重点甚至可能导致内存泄漏。

    因此,因为

    pp
    对象中
    buffer
    字段占据的内存是会自动扩容的,对象的大小是不固定的,因此将
    pp
    对象重新放入
    sync.Pool
    中时,需要注意放入对象的大小,如果太大,可能会导致内存泄漏或者内存浪费的情况,此时可以直接抛弃,不重新放入
    sync.Pool
    当中。事实上,
    pp
    结构体重新放入
    sync.Pool
    也是基于该逻辑,其会先判断
    pp
    结构体中
    buffer
    字段占据的内存大小,如果太大,此时将不会重新放入
    sync.Pool
    当中,而是直接丢弃,具体如下:
    func (p *pp) free() {
       // 如果byte数组的大小超过一定限度,此时将会直接返回
       if cap(p.buf) > 64<<10 {
          return
       }
    
       p.buf = p.buf[:0]
       p.arg = nil
       p.value = reflect.Value{}
       p.wrappedErr = nil
       
       // 否则,则重新放回sync.Pool当中
       ppFree.Put(p)
    }

    基于以上总结,如果

    sync.Pool
    中存储的对象占据的内存大小是不固定的话,此时需要注意放入对象的大小,防止内存泄漏或者内存浪费。

    4.2 不要往sync.Pool中放入数据库连接/TCP连接

    TCP连接和数据库连接等资源的获取和释放通常需要遵循一定的规范,比如需要在连接完成后显式地关闭连接等,这些规范是基于网络协议、数据库协议等规范而制定的,如果这些规范没有被正确遵守,就可能导致连接泄漏、连接池资源耗尽等问题。

    当使用

    sync.Pool
    存储连接对象时,如果这些连接对象并没有显式的关闭,那么它们就会在内存中一直存在,直到进程结束。如果连接对象数量过多,那么这些未关闭的连接对象就会占用过多的内存资源,导致内存泄漏等问题。

    举个例子,假设有一个对象

    Conn
    表示数据库连接,它的
    Close
    方法用于关闭连接。如果将
    Conn
    对象放入
    sync.Pool
    中,并在从池中取出并使用后没有手动调用
    Close
    方法归还对象,那么这些连接就会一直保持打开状态,直到程序退出或达到连接数限制等情况。这可能会导致资源耗尽或其他一些问题。

    以下是一个简单的示例代码,使用

    sync.Pool
    存储TCP连接对象,演示了连接对象泄漏的情况:
    import (
       "fmt"
       "net"
       "sync"
       "time"
    )
    
    var pool = &sync.Pool{
       New: func() interface{} {
          conn, err := net.Dial("tcp", "localhost:8000")
          if err != nil {
             panic(err)
          }
          return conn
       },
    }
    
    func main() {
    
       // 模拟使用连接
       for i := 0; i < 100; i++ {
          conn := pool.Get().(net.Conn)
          time.Sleep(100 * time.Millisecond)
          fmt.Fprintf(conn, "GET / HTTP/1.0
    
    ")
          // 不关闭连接
          // 不在使用连接时,释放连接对象到池中即可
          pool.Put(conn)
       }
    
    }

    在上面的代码中,我们使用

    net.Dial
    创建了一个 TCP 连接,并将其存储到
    sync.Pool
    中。在模拟使用连接时,我们从池中获取连接对象,向服务器发送一个简单的 HTTP 请求,然后将连接对象释放到池中。但是,我们没有显式地关闭连接对象。如果连接对象的数量很大,那么这些未关闭的连接对象就会占用大量的内存资源,导致内存泄漏等问题。

    因此,对于数据库连接或者TCP连接这种资源的释放需要遵循一定的规范,此时不应该使用

    sync.Pool
    来复用,可以自己实现数据库连接池等方式来实现连接的复用。
    关闭

    用微信“扫一扫”