avatar

在golang中用mgo实现多数据源连接mongodb

前言:
在 golang 中用 mgo 实现多数据源连接 mongodb,达到主备连接,起到容灾的作用。

Code

utilpool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// mongodb.go
package db

import (
"log"
"time"

"github.com/globalsign/mgo"
)

const (
dbhost_master = "127.0.0.1:27017"
dbhost_slave = "127.0.0.2:27017"
authdb = "admin"
authuser = "user"
authpass = "123456"
timeout = 60 * time.Second
poollimit = 4096
)

var globalS *mgo.Session

func init() {
dialInfo := &mgo.DialInfo{
Addrs: []string{dbhost_master,dbhost_slave},
Timeout: timeout,
Source: authdb,
Username: authuser,
Password: authpass,
PoolLimit: poollimit,
}

s, err := mgo.DialWithInfo(dialInfo)
if err != nil {
log.Fatalf("Create Session: %s\n", err)
}
globalS = s
}

func connect(db, collection string) (*mgo.Session, *mgo.Collection) {
ms := globalS.Copy()
c := ms.DB(db).C(collection)
ms.SetMode(mgo.Monotonic, true)
return ms, c
}

func getDb(db string) (*mgo.Session, *mgo.Database) {
ms := globalS.Copy()
return ms, ms.DB(db)
}

func IsEmpty(db, collection string) bool {
ms, c := connect(db, collection)
defer ms.Close()
count, err := c.Count()
if err != nil {
log.Fatal(err)
}
return count == 0
}

func Count(db, collection string, query interface{}) (int, error) {
ms, c := connect(db, collection)
defer ms.Close()
return c.Find(query).Count()
}

func Insert(db, collection string, docs ...interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Insert(docs...)
}

func FindOne(db, collection string, query, selector, result interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Find(query).Select(selector).One(result)
}

func FindAll(db, collection string, query, selector, result interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Find(query).Select(selector).All(result)
}

func FindPage(db, collection string, page, limit int, query, selector, result interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Find(query).Select(selector).Skip(page * limit).Limit(limit).All(result)
}

func FindIter(db, collection string, query interface{}) *mgo.Iter {
ms, c := connect(db, collection)
defer ms.Close()

return c.Find(query).Iter()
}

func Update(db, collection string, selector, update interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Update(selector, update)
}

func Upsert(db, collection string, selector, update interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

_, err := c.Upsert(selector, update)
return err
}

func UpdateAll(db, collection string, selector, update interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

_, err := c.UpdateAll(selector, update)
return err
}

func Remove(db, collection string, selector interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

return c.Remove(selector)
}

func RemoveAll(db, collection string, selector interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()

_, err := c.RemoveAll(selector)
return err
}

//insert one or multi documents
func BulkInsert(db, collection string, docs ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()
bulk := c.Bulk()
bulk.Insert(docs...)
return bulk.Run()
}

func BulkRemove(db, collection string, selector ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()

bulk := c.Bulk()
bulk.Remove(selector...)
return bulk.Run()
}

func BulkRemoveAll(db, collection string, selector ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()
bulk := c.Bulk()
bulk.RemoveAll(selector...)
return bulk.Run()
}

func BulkUpdate(db, collection string, pairs ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()
bulk := c.Bulk()
bulk.Update(pairs...)
return bulk.Run()
}

func BulkUpdateAll(db, collection string, pairs ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()
bulk := c.Bulk()
bulk.UpdateAll(pairs...)
return bulk.Run()
}

func BulkUpsert(db, collection string, pairs ...interface{}) (*mgo.BulkResult, error) {
ms, c := connect(db, collection)
defer ms.Close()
bulk := c.Bulk()
bulk.Upsert(pairs...)
return bulk.Run()
}

func PipeAll(db, collection string, pipeline, result interface{}, allowDiskUse bool) error {
ms, c := connect(db, collection)
defer ms.Close()
var pipe *mgo.Pipe
if allowDiskUse {
pipe = c.Pipe(pipeline).AllowDiskUse()
} else {
pipe = c.Pipe(pipeline)
}
return pipe.All(result)
}

func PipeOne(db, collection string, pipeline, result interface{}, allowDiskUse bool) error {
ms, c := connect(db, collection)
defer ms.Close()
var pipe *mgo.Pipe
if allowDiskUse {
pipe = c.Pipe(pipeline).AllowDiskUse()
} else {
pipe = c.Pipe(pipeline)
}
return pipe.One(result)
}

func PipeIter(db, collection string, pipeline interface{}, allowDiskUse bool) *mgo.Iter {
ms, c := connect(db, collection)
defer ms.Close()
var pipe *mgo.Pipe
if allowDiskUse {
pipe = c.Pipe(pipeline).AllowDiskUse()
} else {
pipe = c.Pipe(pipeline)
}

return pipe.Iter()

}

func Explain(db, collection string, pipeline, result interface{}) error {
ms, c := connect(db, collection)
defer ms.Close()
pipe := c.Pipe(pipeline)
return pipe.Explain(result)
}
func GridFSCreate(db, prefix, name string) (*mgo.GridFile, error) {
ms, d := getDb(db)
defer ms.Close()
gridFs := d.GridFS(prefix)
return gridFs.Create(name)
}

func GridFSFindOne(db, prefix string, query, result interface{}) error {
ms, d := getDb(db)
defer ms.Close()
gridFs := d.GridFS(prefix)
return gridFs.Find(query).One(result)
}

func GridFSFindAll(db, prefix string, query, result interface{}) error {
ms, d := getDb(db)
defer ms.Close()
gridFs := d.GridFS(prefix)
return gridFs.Find(query).All(result)
}

func GridFSOpen(db, prefix, name string) (*mgo.GridFile, error) {
ms, d := getDb(db)
defer ms.Close()
gridFs := d.GridFS(prefix)
return gridFs.Open(name)
}

func GridFSRemove(db, prefix, name string) error {
ms, d := getDb(db)
defer ms.Close()
gridFs := d.GridFS(prefix)
return gridFs.Remove(name)
}

TestCode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// drop.go
package main

import (
"fmt"
db "mongo/mongodb"
"time"

"github.com/globalsign/mgo/bson"
)

type Data struct {
Id bson.ObjectId `bson:"_id"`
Title string `bson:"title"`
Des string `bson:"des"`
Content string `bson:"content"`
Date time.Time `bson:"date"`
}

const (
database = "Test"
collection = "TestModel"
)

// Examples : the operation of mgo to MongoDB
func main() {
//insert one document
data := &Data{
Id: bson.NewObjectId(),
Title: "博客的标题 1",
Des: "博客描述信息 1",
Content: "博客的具体内容 1",
Date: time.Now(),
}

err := db.Insert(database, collection, data)
if err != nil {
fmt.Println("insert one doc", err)
}

// find one with all fields
var result Data
err = db.FindOne(database, collection, bson.M{"_id": bson.ObjectIdHex("5b3db2334d661ff46ee14b9c")}, nil, &result)
fmt.Println("find one with all fields", result)

// find one without id field
var result1 Data
err = db.FindOne(database, collection, bson.M{"_id": bson.ObjectIdHex("5b3db2334d661ff46ee14b9c")}, bson.M{"_id": 0}, &result1)
fmt.Println("find one without id field", result1)

//find all documents
var allResult []Data
err = db.FindAll(database, collection, nil, nil, &allResult)
fmt.Println("find all docs", allResult)

// find all documents with query and selector
var allResult1 []Data
err = db.FindAll(database, collection, bson.M{"title": "博客的标题 1"}, bson.M{"_id": 0}, &allResult1)
fmt.Println("find all docs with query and selector", allResult1)

//find documents with page and limit
var resultWithPage []Data
err = db.FindPage(database, collection, 0, 4, nil, bson.M{"_id": 0}, &resultWithPage)
fmt.Println("find docs with page and limit", resultWithPage)

//find the cursor
var iterAll []Data
iter := db.FindIter(database, collection, nil)
err = iter.All(&iterAll)
fmt.Println("find cursor ", iterAll)

//update one document
err = db.Update(database, collection, bson.M{"_id": bson.ObjectIdHex("5b3db2334d661ff46ee14b9c")}, bson.M{"$set": bson.M{
"title": "更新后的标题",
"des": "更新后的描述信息",
"date": time.Now(),
}})

if err != nil {
fmt.Println("upate one error", err)
}

//update all docments
/*err = db.UpdateAll(database, collection, nil, bson.M{"$set": bson.M{
"title": "更新所有的标题",
"date": time.Now(),
}})
if err != nil {
fmt.Println("update all docs error ", err)
}*/

//delete one docment
err = db.Remove(database, collection, bson.M{"_id": bson.ObjectIdHex("5b3db2334d661ff46ee14b99")})
if err != nil {
fmt.Println("remove one doc error", err)
}

//upsert the docment
err = db.Upsert(database, collection, bson.M{"title": "Title Upsert"}, bson.M{"$set": bson.M{
"des": "描述Upsert",
"date": time.Now(),
"Content": "内容Upsert",
}})
if err != nil {
fmt.Println("upsert docment error", err)
}

//bulk insert docments
d1 := &Data{
Id: bson.NewObjectId(),
Title: "bulk title",
Des: "bulk Des",
Content: "bulk content",
Date: time.Now(),
}
d2 := &Data{
Id: bson.NewObjectId(),
Title: "bulk title",
Des: "bulk Des",
Content: "bulk content",
Date: time.Now(),
}

insertResult, _ := db.BulkInsert(database, collection, d1, d2)
fmt.Println("bulk insert docs", insertResult)

//bulk update
up1 := bson.M{"title": "bulk update title"}
up2 := bson.M{"$set": bson.M{"title": "bulk update title"}}

up3 := bson.M{"_id": bson.ObjectIdHex("5b3dbd7a9d5e3e314c93d150")}
up4 := bson.M{"$set": bson.M{"des": "bulk update des"}}

updateResult, _ := db.BulkUpdate(database, collection, up1, up2, up3, up4)
fmt.Println("bulk update result", updateResult)

//bulk update all
up5 := bson.M{"title": "bulk title"}
up6 := bson.M{"$set": bson.M{"title": "bulk update title"}}

updateAllResult, _ := db.BulkUpdateAll(database, collection, up5, up6)
fmt.Println("bulk update result", updateAllResult)

}
文章作者: luochenxi
文章链接: https://luochenxi.github.io/2019/05/09/yuque/%E5%9C%A8golang%E4%B8%AD%E7%94%A8mgo%E5%AE%9E%E7%8E%B0%E5%A4%9A%E6%95%B0%E6%8D%AE%E6%BA%90%E8%BF%9E%E6%8E%A5mongodb/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Kirio

评论