十二 092014
 

查阅文档时看到一篇2013年12月3日写的文章,一直忘了贴出来,放在硬盘角落里厚厚一层灰了。
———————————————————————————
SQLite是一种嵌入式数据库,没有server进程,每个数据库均为单文件存储,因此在有多线程并发读写同一个数据库时,会因为文件读写锁造成并发写入或读取的失败率上升。
为了解决这个问题,基本的思路就是让存在多线程并发写入的情况收敛为顺序写入。在多线程这个策略无法更改的基础上,将数据库拆分成每个线程对应一个单独的文件就是比较适宜的解决方案。
我们的业务基本情况是有数量级为百或千的PC,这里假定为机器000到999,每30秒发送五组不同的数据,对应机器的五个不同指标,这里假定为指标A-E。为了方便归档和减少单个数据库文件的大小,需要按日期将数据库文件归类,每日、不同机器的数据存储到不到的数据库文件中。因此储存的目的目录结构为:
1111
对于每一项指标,都是通过一个异步队列到达存储层的,因此在数据到达的时间上并不一定按照指标实际产生的实际为序。这就使得在日期切换时,并不能保证在日期戳为20131203的第一个数据抵达后,不会再有日期戳为20131202的数据抵达。因此必须有一个缓冲时间段,在缓冲器内对同一机器的同一指标同时维护两个数据库连接,待旧日期戳数据包基本处理完毕后关闭旧的数据库连接。
为了达到这一效果,要在内存中维护一个数据结构,该结构保存了一个单调递增的日期戳和一个数据库连接池,连接池以日期哈希的形式保存数据库连接。程序启动时初始化该数据结构,每次有数据需要存储时,调用该结构的GetLink方法得到本数据包应该对应的数据库连接。在日期更替时,数据结构中的日期戳会指向最新的日期,防止切换过程中可能出现的连接池震荡。

type DbLink struct {
	Today string
	Changing bool
	Links map[string]*sql.DB
}

func NewDbLink(date string) (link *DbLink) {
	links := make(map[string]*sql.DB)
	link = &DbLink{
		date,
		false,
		links,
	}
	return
}

func (link *DbLink) GetLink(date string, hardware_addr string, indicator string) (dbLink *sql.DB, err error) {
	key := date + "_" + hardware_addr
	dbLink, ok := link.Links[key]
	// 如果已经存在,则认为没有日期变更,且数据库连接已经打开
	if ok {
		// fmt.Println("bingo!") //命中已经打开的数据库连接
		return dbLink, nil
	} else {
		// 否则为新的日期打开新的数据库连接,并延时关闭原有日期对应的数据库连接,且删除其在本结构体中的注册条目
		var dbPath, dbSourceName string
		dbPath = "../db/" + date + "/" + strings.Replace(hardware_addr, ":", "_", -1) + "/"
		dbSourceName = dbPath + indicator + ".db"
		os.MkdirAll(dbPath, 0666)
		link.Changing = true
		inComingDate, _ := strconv.Atoi(date)
		currentDate, _ := strconv.Atoi(link.Today)
		if inComingDate > currentDate {
			// 仅当后来的日期比保存的日期更晚时,更新结构体中的Today值
			link.Today = date
		}
		newLink, err := sql.Open("sqlite3", dbSourceName)
		link.Links[key] = newLink
		if err != nil {
			return nil, err
		}
		createTable(indicator, newLink)
		go func() {
			c := time.Tick(5 * time.Minute)
			for _ = range c {
				for k, v := range link.Links {
					// 如果缓存中有非Today的日期,表示已经过期,可以执行延时关闭
					if !strings.HasPrefix(k, link.Today) {
						v.Close()
						fmt.Println(k, "to be deleted")
						delete(link.Links, k)
						link.Changing = false
					}
				}
			}
		}()
		return newLink, nil
	}
	return nil, nil
}
162014
 

闭包使用的注意事项

for循环中使用闭包时,一定要显示向闭包函数传递某个循环变量,否则闭包只会使用第一次的值。如:

for _, conn := range conns {
    go func(c Conn) {
        select {
        case ch <- c.DoQuery(query):         default:         }     }(conn) } 而不能这样 for _, conn := range conns {     go func() {         select {         case ch <- conn.DoQuery(query):         default:         }     }() } 第二种错误的用法中,闭包中的conn是不会变的。 使用WaitGroup

package util

import (
    ”sync”
)

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}
这种方法将WaitGroup包装起来,其他的struct中需要等待一系列的goroutine返回时,先生成一个WaitGroupWrapper对象,再用该对象的Wrap方法包裹要并行且等待的函数,一般是匿名函数的形式。最后调用者通过该对象的Wait()方法等待所有函数返回。

map类型不是线程安全的。对map数据并发读写时需要加锁。

var counter = struct {
    sync.RWMutex
    m map[string]int
}{m: make(map[string]int)}

counter.RLock()
n := counter.m["some_key"]
counter.Unlock()
fmt.Println(n)

实现类似try/catch的错误捕捉

用一个函数包装另一个,当内部程序panic时,外部程序通过recover收集错误,使整个进程得以继续。

package main

import ”fmt”

func safeHandler(cb func() int) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println(r)
        }
    }()
    cb()
}

func worker() int {
    b := []int{1,2}
    b[3] = 0
    return 1
}

func main() {
    safeHandler(worker)
    fmt.Println(“ok, we are safe to continue.”)
}

golang中的yield方式。

可以采用在生成器函数中向无缓冲管道写入,写入完毕后close管道。消费者for循环中读取管道,当管道close时自动退出循环。
package main

import ”fmt”

// 生成器
func generator(num …int) <-chan int {     c := make(chan int)     go func() {         for _, v := range num {             c <- v         }         close(c)     }()     return c } //消费者 func main() {     gen := generator(1, 1, 2, 3, 5, 8)     for n := range gen {         fmt.Println(n)     } } 这种生成方式要求消费者必须消费掉所有生成的数据,否则生成器一直阻塞并占用资源,形成了内存的泄露。为了避免这种情况的发生,可以在传递给生成器一个done的通道,当通道激活时生成器退出。利用channel的一个特性,即close执行时,在这个channel上的读取会立刻返回。我们不用手动向channel发送done信号,只需在defer函数中关闭它既可。因此上面的程序改为: package main import "fmt" // 生成器 func generator(done chan int, num ...int) <-chan int {     c := make(chan int)     go func() {         defer close(c)         for _, v := range num {             select {             case c <- v:             case <- done:                 return             }         }     }()     return c } //消费者 func main() {     done := make(chan int)     defer close(done)     gen := generator(done, 1, 1, 2, 3, 5, 8)     for n := range gen {         fmt.Println(n)     } } 这样,即使消费者没有完全耗尽所有要生产的数据,生产者也能在接到信号后自行退出。 使用什么样的import方式引入自定义的包?

应该使用唯一的路径来引入包,一般代码应该提交到google code或者github,这样使用

import ”github.com/seanluo/my/package”
引入的包,可以用go get的形式获取到。不宜采用本地的相对路径的形式,如import my/package,因为这种形式的引入必须手动将包路径添加到$GOPATH中去。

 Posted by at 10:22
072013
 

前一阵子因为工作需要,接触了一下Go语言。
起初看来这个语言的语法实在是非常的怪异,尤其是变量声明的方式,让从C语言之类转过来的人们非常不习惯。但是Golang的开发团队与给出了这些写变量声明的原因,而我看了之后觉得他们说的是对的:这种方式的声明在出现一大堆关于函数指针的嵌套定义的情况下,能够展示的比C语言更清晰。
另一方面,这是一种静态编译的语言,所以在移植性上非常优秀,基本上不会看到类似“缺少xxx.so”或者“找不到yyy.dll”之类的错误,脱离一堆依赖的感觉真的很好。
Golang的defer处理机制是把双刃剑。用得好,可以避免文件、Socket之类的不关闭造成的问题,但是新手用得不好,很有可能出现一些搞不清楚状况的panic。如一个file没有被真正打开过,却在defer中进行了关闭,就会出现关闭一个nil的调用。果断panic,报错的还不是defer这一行,而是return语句那行——defer里的东西正是这个时候被调用的。

 Posted by at 16:57