0%

yum安装方法

1、安装

1
yum install -y iozone

2、参数

1
iozone -l 1 -u 1 -r 16k -s 64g -F

-l是最小进程数量lower

-u是最大进程数量upper

-r是读写的基本单位,16k作为读写的基本单位,根据模拟应用程序进行合理设置(目的是模拟真实应用)

-s指定默认读写的大小,建议不要指定的太小,一般指定的是内存的2倍

-F指定测试文件位置,可以是多个

编译安装方法

1
2
3
4
5
6
7
8
9
10
11
12
yum install gcc
wget http://www.iozone.org/src/current/iozone3_471.tar
tar xvf iozone3_471.tar
cd iozone3_471/src/current/

ARM平台:
make linux-arm
X86平台:
make linux-AMD64

cp iozone /usr/bin/
./iozone -+u -+d -+p -+t -z -l 1 -u 1 -r 4k -s 1G -F /home/node-3/1-FILE

Go并发编程案例解析学习笔记

课程地址:https://www.imooc.com/video/17021

课程教师:麦可同学

课程介绍

  • 并发编程基础知识介绍
  • 日志监控系统实战
  • 课程总结

准备知识

  • 有一定的编程基础
  • 了解Golang基本语法
  • 有并发编程经验就更好了

常见并发模型讲解

Golang并发实现

  • 程序并发执行(goroutine)
  • 多个goroutine间的数据同步与通信(channels)
  • 多个channels选择数据读取或者写入(select)

Goroutines(程序并发执行)

1
2
3
4
foo()		//执行函数foo,程序等待函数foo返回

go foo() //执行函数foo
bar() //不用等待foo返回

Channels(多个goroutines间的数据同步与通信)

1
2
3
4
5
6
c := make(chan string)	//创建一个channel
go func(){
time.Sleep(1 * time.Second)
c <- "message from closure" //发送数据到channel中
}() //这个()表示调用该函数
msg := <-c //阻塞直到接收到数据

Select(从多个channel中读取或写入数据)

1
2
3
4
5
6
7
8
select {
case v := <-c1:
fmt.Println("channel 1 sends", v)
case v := <-c2:
fmt.Println("channel 2 sends", v")
default: //可选
fmt.Println("neither channel was ready")
}

并发拓展:并发与并行

定义:

  • 并发:指同一时刻,系统通过调度,来回切换交替的运行多个任务,“看起来”是同时进行
  • 并行:指同一时刻,两个任务“真正的”同时进行

可以把《实时读取》《解析》《写入》拆成多个模块,使用多个goroutine。那么这么多个goroutine是并行执行还是并发执行呢?换句话说,多个goroutine执行,是一个CPU核心通过不断的切换时间片,并发的执行?还是将goroutine分散到多核的CPU并行的执行?这个问题Golang为我们屏蔽掉了,编程人员不需要考虑这个问题。

总结

  • 将复杂的任务拆分,通过goroutine去并发执行
  • 通过channel做数据的同步与通信

Golang中的面向对象

Golang中没有类和对象的概念,但是支持

  • struct
  • interface

传统的面向对象中,继承、封装、多态都可以基于这两个特性来实现。

封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 结构体,baz相当于成员变量,可以看做是类
type Foo struct {
baz string
}

// 接收者Receiver,接收之后,就可以使用结构体中的字段了,相当于成员函数
func (f *Foo) echo(){
fmt.Println(f.baz)
}

// 在main中初始化结构体,相当于实例化一个类,然后调用成员方法
func main(){
f := Foo{baz: "hello, struct"}
f.echo()
}

继承(个人认为这是设计模式中的组合)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Foo结构体
type Foo struct {
baz string
}

// 接收者Receiver,Foo成员函数
func (f *Foo) echo() {
fmt.Println(f.baz)
}

// 在Bar结构体中直接把Foo结构体写进来,这个叫匿名字段。这样写之后,Bar结构体就拥有了Foo结构体的所有特性
type Bar struct {
Foo
}

// 在main函数中初始化Bar结构体,然后直接调用echo()方法,echo()方法其实是Foo中的成员函数
func main() {
b := Bar{Foo{baz: "hello, struct"}}
b.echo()
}

多态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 定义一个interface,可以看成是一组方法的集合,通过interface定义一些对象的行为
type Foo interface {
qux()
}

// 定义Bar和Baz两个结构体
type Bar struct {}
type Baz struct {}

// Bar和Baz两个结构体分别实现了qux()函数,这样就可以说Bar和Baz都是Foo这种类型了。这里并没有显示的说我实现了Foo这个接口,只要结构体中实现了qux()这个函数,就认为它实现了这个接口,这就是所谓的非侵入式接口
func (b Bar) qux() {}
func (b Baz) qux() {}

// 在main中,定义了f的变量,它的类型是Foo。无论是Bar还是Baz都可以赋值给f这个变量,这也证明了Bar{}和Baz{}是同一类型。可以说类型相同,实现不同
func main() {
var f Foo
f = Bar{}
f = Baz{}
fmt.Println(f)
}

日志监控程序的实现

日志分析系统实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"strings"
"fmt"
"time"
)

type LogProcess struct {
rc chan string
wc chan string
path string // 读取文件的路径
influxDBDsn string // influx data source
}

func (l *LogProcess) ReadFromFile() { // 需要代码优化,只能从文件中读取数据
// 读取模块
line := "message"
l.rc <- line
}

func (l *LogProcess) Process() {
// 解析模块
data := <- l.rc
l.wc <- strings.ToUpper(data)

}

func (l *LogProcess) WriteToInfluxDB() { // 需要代码优化,只能向influxDB中写入数据
// 写入模块
fmt.Println(<-l.wc)
}

func main() {
lp := LogProcess{
rc : make(chan string),
wc : make(chan string),
path: "/tmp/access.log",
influxDBDsn: "",
}
go lp.ReadFromFile()
go lp.Process()
go lp.WriteToInfluxDB()
time.Sleep(time.Second)
}

代码优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
"strings"
"fmt"
"time"
)

// 定义接口,抽象读取模块
type Reader interface {
Read(rc chan string)
}

type ReadFromFile struct {
path string // 读取文件的路径
}

func (l *ReadFromFile) Read(rc chan string) {
// 读取模块
line := "message"
rc <- line
}

// 定义接口,抽象写入模块
type Writer interface {
Write(wc chan string)
}

type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}

func (l *WriteToInfluxDB) Write(wc chan string) {
// 写入模块
fmt.Println(<-wc)
}

type LogProcess struct {
rc chan string
wc chan string
read Reader
write Writer
}

func (l *LogProcess) Process() {
// 解析模块
data := <- l.rc
l.wc <- strings.ToUpper(data)
}

func main() {

r := &ReadFromFile{
path: "/tmp/access.log",
}

w := &WriteToInfluxDB{
influxDBDsn: "",
}

lp := LogProcess{
rc : make(chan string),
wc : make(chan string),
read: r,
write: w,
}

go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep(time.Second)
}

读取模块实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main

import (
"strings"
"fmt"
"time"
"os"
"bufio"
"io"
)

// 定义两个接口,抽象读取模块和写入模块
type Reader interface {
Read(rc chan []byte)
}

type ReadFromFile struct {
path string // 读取文件的路径
}

func (l *ReadFromFile) Read(rc chan []byte) {
// 读取模块

// 1. 打开文件
f,err := os.Open(l.path)
if err != nil {
panic(fmt.Sprintf("open file error : %s \n", err))
}

// 2. 从文件末尾开始逐行读取文件内容
f.Seek(0,2)
rd := bufio.NewReader(f)

// 3. 写入到Read Channel中
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error : %s", err.Error()))
}
rc <- line[:len(line) - 1]
}
}

type Writer interface {
Write(wc chan string)
}

type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}

func (l *WriteToInfluxDB) Write(wc chan string) {
// 写入模块
for v := range wc{
fmt.Println(v)
}
}

type LogProcess struct {
rc chan []byte
wc chan string
read Reader
write Writer
}

func (l *LogProcess) Process() {
// 解析模块
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}

func main() {

r := &ReadFromFile{
path: "/tmp/access.log",
}

w := &WriteToInfluxDB{
influxDBDsn: "",
}

lp := LogProcess{
rc : make(chan []byte),
wc : make(chan string),
read: r,
write: w,
}

go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)

time.Sleep(60 * time.Second)
}

解析模块的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main

import (
"strings"
"fmt"
"time"
"os"
"bufio"
"io"
"regexp"
"log"
"strconv"
"net/url"
)

// 定义两个接口,抽象读取模块和写入模块
type Reader interface {
Read(rc chan []byte)
}

type ReadFromFile struct {
path string // 读取文件的路径
}

func (l *ReadFromFile) Read(rc chan []byte) {
// 读取模块

// 1. 打开文件
f,err := os.Open(l.path)
if err != nil {
panic(fmt.Sprintf("open file error : %s \n", err))
}

// 2. 从文件末尾开始逐行读取文件内容
f.Seek(0,2)
rd := bufio.NewReader(f)

// 3. 写入到Read Channel中
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error : %s", err.Error()))
}
rc <- line[:len(line) - 1]
}
}

type Writer interface {
Write(wc chan *Message)
}

type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}

func (l *WriteToInfluxDB) Write(wc chan *Message) {
// 写入模块
for v := range wc{
fmt.Println(v)
}
}

type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}

type Message struct {
TimeLocal time.Time
ByteSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}

func (l *LogProcess) Process() {
// 解析模块
// 172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP:1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc , _ := time.LoadLocation("Asia/Shanghai")

// 1. 从Read Channel中读取每行日志数据
for v := range l.rc {

// 2. 正则提取所需的监控数据(path,status,method等)
ret := r.FindStringSubmatch(string(v))
if len(ret) != 14 {
log.Println("FindStringSubmatch failed : ", string(v))
continue
}

message := &Message{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation failed : ", err.Error(), ret[4])
}
message.TimeLocal = t

byteSent, _ := strconv.Atoi(ret[8])
message.ByteSent = byteSent

reqSli := strings.Split(ret[6]," ")
if len(reqSli) != 3 {
log.Println("string.Split failed : ", ret[6])
continue
}
message.Method = reqSli[0]
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println("url parse failed : ", err)
continue
}
message.Path = u.Path

message.Scheme = ret[5]
message.Status = ret[7]

upstreamTime, err := strconv.ParseFloat(ret[12],64)
message.UpstreamTime = upstreamTime

requestTime, err := strconv.ParseFloat(ret[13], 64)
message.RequestTime = requestTime

// 3. 写入Write Channel
l.wc <- message
}
}

func main() {

r := &ReadFromFile{
path: "/tmp/access.log",
}

w := &WriteToInfluxDB{
influxDBDsn: "",
}

lp := LogProcess{
rc : make(chan []byte),
wc : make(chan *Message),
read: r,
write: w,
}

go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)

time.Sleep(60 * time.Second)
}

写入模块流程讲解

在URL中,db=mydb指定database。使用逗号做分隔’,’,cpu_usage表示measurement。host=server01,region=us-west value=0.64 1434055562000000000表示points。host=server01,region=us-west表示points中的tags,value=0.64表示points中的fields,1434055562000000000表示points中的time。

influxDB提供了Golang的客户端,可以使用这个客户端很方便的写入数据https://github.com/influxdata/influxdb/tree/master/client,首先先引入包"github.com/influxdata/influxdb/client/v2"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package main

import (
"strings"
"fmt"
"time"
"os"
"bufio"
"io"
"regexp"
"log"
"strconv"
"net/url"

"github.com/influxdata/influxdb/client/v2"
"flag"
)

// 定义两个接口,抽象读取模块和写入模块
type Reader interface {
Read(rc chan []byte)
}

type ReadFromFile struct {
path string // 读取文件的路径
}

func (l *ReadFromFile) Read(rc chan []byte) {
// 读取模块

// 1. 打开文件
f,err := os.Open(l.path)
if err != nil {
panic(fmt.Sprintf("open file error : %s \n", err))
}

// 2. 从文件末尾开始逐行读取文件内容
f.Seek(0,2)
rd := bufio.NewReader(f)

// 3. 写入到Read Channel中
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error : %s", err.Error()))
}
rc <- line[:len(line) - 1]
}
}

type Writer interface {
Write(wc chan *Message)
}

type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}

func (l *WriteToInfluxDB) Write(wc chan *Message) {
// 写入模块
// 1.初始化influxDB client
// 2.从Write Channel中读取监控数据
// 3.构造数据并写入influxDB

infSli := strings.Split(l.influxDBDsn, "@")

// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: infSli[0],
Username: infSli[1],
Password: infSli[2],
})
if err != nil {
log.Fatal(err)
}


for v := range wc{
fmt.Println(v)
defer c.Close()

// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: infSli[3],
Precision: infSli[4],
})
if err != nil {
log.Fatal(err)
}

// Create a point and add to batch
tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.ByteSent,
}

pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)

// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}

// Close client resources
if err := c.Close(); err != nil {
log.Fatal(err)
}
fmt.Println("write success")
}
}

type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}

type Message struct {
TimeLocal time.Time
ByteSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}

func (l *LogProcess) Process() {
// 解析模块
// 172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP:1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc , _ := time.LoadLocation("Asia/Shanghai")

// 1. 从Read Channel中读取每行日志数据
for v := range l.rc {

// 2. 正则提取所需的监控数据(path,status,method等)
ret := r.FindStringSubmatch(string(v))
if len(ret) != 14 {
log.Println("FindStringSubmatch failed : ", string(v))
continue
}

message := &Message{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation failed : ", err.Error(), ret[4])
}
message.TimeLocal = t

byteSent, _ := strconv.Atoi(ret[8])
message.ByteSent = byteSent

reqSli := strings.Split(ret[6]," ")
if len(reqSli) != 3 {
log.Println("string.Split failed : ", ret[6])
continue
}
message.Method = reqSli[0]
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println("url parse failed : ", err)
continue
}
message.Path = u.Path

message.Scheme = ret[5]
message.Status = ret[7]

upstreamTime, err := strconv.ParseFloat(ret[12],64)
message.UpstreamTime = upstreamTime

requestTime, err := strconv.ParseFloat(ret[13], 64)
message.RequestTime = requestTime

// 3. 写入Write Channel
l.wc <- message
}
}

func main() {

var path, influxDsn string
flag.StringVar(&path, "path", "/tmp/access.log", "read file path")
flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@imooc@imoocpass@nginx@s", "influx data source")
flag.Parse()

r := &ReadFromFile{
path: path,
}

w := &WriteToInfluxDB{
influxDBDsn: influxDsn,
}

lp := LogProcess{
rc : make(chan []byte),
wc : make(chan *Message),
read: r,
write: w,
}

go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)

time.Sleep(6000 * time.Second)
}

监控模块的实现

1、总处理日志行数

2、系统吞吐量

3、read channel长度

4、write channel长度

5、运行总时间

6、错误数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package main

import (
"strings"
"fmt"
"time"
"os"
"bufio"
"io"
"regexp"
"log"
"strconv"
"net/url"

"github.com/influxdata/influxdb/client/v2"
"flag"
"net/http"
"encoding/json"
)

// 定义两个接口,抽象读取模块和写入模块
type Reader interface {
Read(rc chan []byte)
}

type ReadFromFile struct {
path string // 读取文件的路径
}

func (l *ReadFromFile) Read(rc chan []byte) {
// 读取模块

// 1. 打开文件
f,err := os.Open(l.path)
if err != nil {
panic(fmt.Sprintf("open file error : %s \n", err))
}

// 2. 从文件末尾开始逐行读取文件内容
f.Seek(0,2)
rd := bufio.NewReader(f)

// 3. 写入到Read Channel中
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error : %s", err.Error()))
}
TypeMonitorChan <- TypeHandleLine
rc <- line[:len(line) - 1]
}
}

type Writer interface {
Write(wc chan *Message)
}

type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}

func (l *WriteToInfluxDB) Write(wc chan *Message) {
// 写入模块
// 1.初始化influxDB client
// 2.从Write Channel中读取监控数据
// 3.构造数据并写入influxDB

infSli := strings.Split(l.influxDBDsn, "@")

// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: infSli[0],
Username: infSli[1],
Password: infSli[2],
})
if err != nil {
log.Fatal(err)
}


for v := range wc{
fmt.Println(v)
defer c.Close()

// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: infSli[3],
Precision: infSli[4],
})
if err != nil {
log.Fatal(err)
}

// Create a point and add to batch
tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.ByteSent,
}

pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)

// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}

// Close client resources
if err := c.Close(); err != nil {
log.Fatal(err)
}
fmt.Println("write success")
}
}

type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}

type Message struct {
TimeLocal time.Time
ByteSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}

// 系统状态监控
type SystemInfo struct {
HandleLine int `json:"HandleLine"` // 总处理日志行数
Tps float64 `json:"tps"` // 系统吞吐量
ReadChanLen int `json:"ReadChanLen"` // read channel 长度
WriteChanLen int `json:"WriteChanLen"` // write channel 长度
RunTime string `json:"RunTime"` // 运行总时间
ErrNum int `json:"ErrNum"` // 错误数
}

const (
TypeHandleLine = 0
TypeErrNum = 1
)
var TypeMonitorChan = make(chan int, 200)

type Monitor struct {
startTime time.Time
data SystemInfo
tpsSli []int
}

func (m *Monitor) start(lp *LogProcess) {
// 消费数据
go func() {
for n := range TypeMonitorChan {
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()

ticker := time.NewTicker(time.Second * 5)
go func() {
for {
<-ticker.C
m.tpsSli = append(m.tpsSli, m.data.HandleLine)
if len(m.tpsSli) > 2 {
m.tpsSli = m.tpsSli[1:]
}
}
}()

http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.WriteChanLen = len(lp.wc)
m.data.ReadChanLen = len(lp.rc)

if len(m.tpsSli) >= 2 {
m.data.Tps = float64(m.tpsSli[1] - m.tpsSli[0]) / 5
}

ret, _ := json.MarshalIndent(m.data,"","\t")
io.WriteString(writer, string(ret))
})
http.ListenAndServe(":9193", nil)
}

func (l *LogProcess) Process() {
// 解析模块
// 172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP:1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc , _ := time.LoadLocation("Asia/Shanghai")

// 1. 从Read Channel中读取每行日志数据
for v := range l.rc {

// 2. 正则提取所需的监控数据(path,status,method等)
ret := r.FindStringSubmatch(string(v))
if len(ret) != 14 {
TypeMonitorChan <- TypeErrNum
log.Println("FindStringSubmatch failed : ", string(v))
continue
}

message := &Message{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Println("ParseInLocation failed : ", err.Error(), ret[4])
continue
}
message.TimeLocal = t

byteSent, _ := strconv.Atoi(ret[8])
message.ByteSent = byteSent

reqSli := strings.Split(ret[6]," ")
if len(reqSli) != 3 {
TypeMonitorChan <- TypeErrNum
log.Println("string.Split failed : ", ret[6])
continue
}
message.Method = reqSli[0]
u, err := url.Parse(reqSli[1])
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Println("url parse failed : ", err)
continue
}
message.Path = u.Path

message.Scheme = ret[5]
message.Status = ret[7]

upstreamTime, err := strconv.ParseFloat(ret[12],64)
message.UpstreamTime = upstreamTime

requestTime, err := strconv.ParseFloat(ret[13], 64)
message.RequestTime = requestTime

// 3. 写入Write Channel
l.wc <- message
}
}

func main() {

var path, influxDsn string
flag.StringVar(&path, "path", "/tmp/access.log", "read file path")
flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@imooc@imoocpass@nginx@s", "influx data source")
flag.Parse()

r := &ReadFromFile{
path: path,
}

w := &WriteToInfluxDB{
influxDBDsn: influxDsn,
}

lp := LogProcess{
rc : make(chan []byte),
wc : make(chan *Message),
read: r,
write: w,
}

go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)

m := &Monitor{
startTime: time.Now(),
data: SystemInfo{},
}
m.start(&lp)

//time.Sleep(6000 * time.Second)
}

Package flag

import “flag”

Package flag实现了command-line flag解析

Usage:

定义flags使用flag.String(), Bool(), Int()等。

以下声明了一个整型flag,名字是-flagname,存储在指针ip中,类型为*int。

1
2
import "flag"
var ip = flag.Int("flagname", 1234, "help message for flagname")

如果你喜欢,你可以使用Var()函数将flag绑定到一个变量。

1
2
3
4
var flagvar int
func init() {
flag.IntVar(&flagvar, "flagname", 1234, "help message for flagname")
}

或者,您可以创建满足Value接口(指针接收器)的自定义flags

1
flag.Var(&flagVal, "name", "help message for flagname")

对于这样的flags,默认值是变量的初始值。

所有flags定义后,调用:

1
flag.Parse()

将命令行解析为所定义的flags。

flags可以直接使用。如果你使用flags本身,它们都是指针;如果你绑定到变量,它们就是值。

1
2
fmt.Println("ip has value ", *ip)
fmt.Println("flagvar has value ", flagvar)

解析后,flags后面的参数可作为slice,flag.Args()。或者独立的一个值,flag.Arg(i)。参数索引,从0到flag.NArg() - 1。

Command line flag语法:

1
2
3
-flag
-flag=x
-flag x // non-boolean flags only

可以使用一个或两个减号;它们是等价的。最后一种形式不允许用于boolean flags

1
cmd -x *

其中*是一个Unix shell通配符,如果有一个名为0或false等文件时它将会更改。您必须使用-flag=false形式来关闭boolean flag。

flag解析在第一个non-flag参数之前(”-“是一个non-flag参数)或终止符“–”之后停止。

整数flags接受1234,0664,0x1234并且可能是负数。Boolean flags可能是:

1
1, 0, t, f, T, F, true, false, TRUE, FALSE, True, False

Duration flags接受任何有效的time.ParseDuration。

默认的一组command-line flags由顶层函数控制。FlagSet类型允许您定义独立的flags集,例如在command-line interface中实现子命令。FlagSet的方法类似于command-line flag集的顶层函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// These examples demonstrate more intricate uses of the flag package.
package main

import (
"errors"
"flag"
"fmt"
"strings"
"time"
)

// Example 1: A single string flag called "species" with default value "gopher".
// 示例1:一个名为“species”的字符串flag,默认值为“gopher”。
var species = flag.String("species", "gopher", "the species we are studying")

// Example 2: Two flags sharing a variable, so we can have a shorthand.
// The order of initialization is undefined, so make sure both use the
// same default value. They must be set up with an init function.
// 示例2:两个flags共享一个变量,所以我们可以用简写。
// 初始化的顺序是未定义的,所以确保两者都使用相同的默认值。 它们必须用init函数来设置。
var gopherType string

func init() {
const (
defaultGopher = "pocket"
usage = "the variety of gopher"
)
flag.StringVar(&gopherType, "gopher_type", defaultGopher, usage)
flag.StringVar(&gopherType, "g", defaultGopher, usage+" (shorthand)")
}

// Example 3: A user-defined flag type, a slice of durations.
type interval []time.Duration

// String is the method to format the flag's value, part of the flag.Value interface.
// The String method's output will be used in diagnostics.
func (i *interval) String() string {
return fmt.Sprint(*i)
}

// Set is the method to set the flag value, part of the flag.Value interface.
// Set's argument is a string to be parsed to set the flag.
// It's a comma-separated list, so we split it.
func (i *interval) Set(value string) error {
// If we wanted to allow the flag to be set multiple times,
// accumulating values, we would delete this if statement.
// That would permit usages such as
// -deltaT 10s -deltaT 15s
// and other combinations.
if len(*i) > 0 {
return errors.New("interval flag already set")
}
for _, dt := range strings.Split(value, ",") {
duration, err := time.ParseDuration(dt)
if err != nil {
return err
}
*i = append(*i, duration)
}
return nil
}

// Define a flag to accumulate durations. Because it has a special type,
// we need to use the Var function and therefore create the flag during
// init.

var intervalFlag interval

func init() {
// Tie the command-line flag to the intervalFlag variable and
// set a usage message.
flag.Var(&intervalFlag, "deltaT", "comma-separated list of intervals to use between events")
}

func main() {
// All the interesting pieces are with the variables declared above, but
// to enable the flag package to see the flags defined there, one must
// execute, typically at the start of main (not init!):
// flag.Parse()
// We don't run it here because this is not a main function and
// the testing suite has already parsed the flags.
}

原文:https://golang.org/pkg/flag/

在近期项目中,想针对Prometheus的ceph_exporter做定制化修改,但是遇到了一些麻烦。

使用go get github.com/digitalocean/ceph_exporter 命令down下来的代码和GitHub上的master分支或tag中的代码都不一样,这是为什么呢?如果go get指向目标Repository的master分支,那么master分支怎样保证任何时刻都是可用的呢?于是带着疑问开始寻找答案。

package管理工具

当你在代码中引用package时,通常使用看起来像URL的导入路径,例如github.com/jpoehls/gophermail。当使用go build构建代码时,Go工具使用此路径在GOPATH中查找这些软件package,如果找不到,则失败。

如何拉取这些package?

1
2
1、手动下载。你可以使用git clone这些package到你的GOPATH中。
2、使用go get命令( download and install packages and dependencies )。go get只是简单地将导入路径作为URL来对待,并尝试通过HTTP或HTTPS来下载它。它足够聪明,可以处理Git,Mercurial,Bazaar和Subversion。Go对GitHub和Bitbucket等常用的代码管理站点有特殊的支持,同时也支持自定义URL。

如何管理GitHub中单个Repository中的多个Versions?

有一种解决方法,可以让你在同一个Repository中,保存软件package的多个版本,并使用branches/tags来区分它们。go get支持自定义URL,你可以将版本号插入到package的导入路径中。这意味着需要编写一个代理服务,用于解析URL,并将请求代理到存储Repository的branch/tag上。

幸运的是,有人已经为我们做了这些工作。GoPkg.in完全符合上述内容。

例如:

使用此方法管理gophermail package。这意味着,不是使用github.com/jpoehls/gophermail导入软件package,而是使用gopkg.in/jpoehls/gophermail.v0。.v0是因为gophermail还没有达到1.0。当我发布1.0并声明稳定的API时,导入路径将更改为gopkg.in/jpoehls/gophermail.v1。

弄到这,好像和下面Prometheus的ceph_exporter输出没太大关系啊。。。。。。。。等我再研究研究。。。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@centos7 ~]# go version
go version go1.10 linux/amd64
[root@centos7 ~]# go get -v github.com/digitalocean/ceph_exporter
github.com/digitalocean/ceph_exporter (download)
github.com/digitalocean/ceph_exporter/vendor/github.com/ceph/go-ceph/rados
github.com/digitalocean/ceph_exporter/vendor/github.com/beorn7/perks/quantile
github.com/digitalocean/ceph_exporter/vendor/github.com/golang/protobuf/proto
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/client_model/go
github.com/digitalocean/ceph_exporter/vendor/github.com/matttproud/golang_protobuf_extensions/pbutil
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/common/model
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/common/expfmt
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/procfs/xfs
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/procfs
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/client_golang/prometheus
github.com/digitalocean/ceph_exporter/collectors
github.com/digitalocean/ceph_exporter/vendor/github.com/prometheus/client_golang/prometheus/promhttp
github.com/digitalocean/ceph_exporter/vendor/gopkg.in/yaml.v2

参考链接:

【1】http://zduck.com/2014/go-and-package-versioning
【2】https://stackoverflow.com/questions/24855081/how-do-i-import-a-specific-version-of-a-package-using-go-get

【3】https://ieevee.com/tech/2017/07/10/go-import.html

工欲善其事,必先利其器。这篇文章记录怎样在Linux上使用Vundle管理Vim插件,提高效率。

步骤很简单,大体如下:

系统环境

1
2
3
4
5
6
7
[root@centos7 ~]# yum install -y redhat-lsb
[root@centos7 ~]# lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 7.4.1708 (Core)
Release: 7.4.1708
Codename: Core

Install vim and git

1
[root@centos7 ~]# yum install -y git vim

Set up Vundle

1
[root@centos7 ~]# git clone https://github.com/VundleVim/Vundle.vim.git ~/.vim/bundle/Vundle.vim

Configure Plugins

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@centos7 ~]# vim ~/.vimrc
set nocompatible " be iMproved, required
filetype off " required

" set the runtime path to include Vundle and initialize
set rtp+=~/.vim/bundle/Vundle.vim
call vundle#begin()
" alternatively, pass a path where Vundle should install plugins
"call vundle#begin('~/some/path/here')

" let Vundle manage Vundle, required
Plugin 'VundleVim/Vundle.vim'

" All of your Plugins must be added before the following line
call vundle#end() " required
filetype plugin indent on " required
" To ignore plugin indent changes, instead use:
"filetype plugin on
"
" Brief help
" :PluginList - lists configured plugins
" :PluginInstall - installs plugins; append `!` to update or just :PluginUpdate
" :PluginSearch foo - searches for foo; append `!` to refresh local cache
" :PluginClean - confirms removal of unused plugins; append `!` to auto-approve removal
"
" see :h vundle for more details or wiki for FAQ
" Put your non-Plugin stuff after this line

打开~/.vimrc,并添加你要安装的Plugins

1
2
3
4
5
[root@centos7 ~]# vim ~/.vimrc
call vundle#begin()
" 在其之间添加你要安装的Plugins,例如我这里安装一个NERDTree
Plugin 'scrooloose/nerdtree'
call vundle#end()

Install Plugins

启动vim并运行:PluginInstall

1
2
[root@centos7 ~]# vim ~/.vimrc
:PluginInstall

配置已安装NERDTree插件

1
2
3
[root@centos7 ~]# vim ~/.vimrc
添加以下,注意<F2>后面有空格,配置好以后,使用vim打开任意文件就可以使用F2来调出NERDTree了
map <F2> :NERDTreeToggle<CR>

当然vim不止这一个插件,这里只是介绍一下怎样配置,其他好玩的插件请自己发掘。

vim常用选项

http://gohom.win/2015/06/08/Vim-Setup/

Centos 7 安装vdbench

1
2
3
4
5
6
7
1、首先安装Java JDK和一些工具包,这里使用的是java-1.7.0
[root@cephL ~]# sudo yum install -y java-1.7.0-openjdk java-1.7.0-openjdk-devel unzip
[root@cephL ~]# mkdir vdbench && cd vdbench
2、把下载后的vdbench拷贝到该目录并解压,这里使用的是vdbench50406
[root@cephL vdbench]# unzip vdbench50406.zip
3、测试是否可以运行
[root@cephL vdbench]# ./vdbench -t

运行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1、准备测试目录
[root@cephL node-1]# pwd
/root/node-1
2、到vdbench目录准备配置文件
[root@cephL ~]# cd /root/vdbench/
[root@cephL vdbench]# vi filesystem.conf
fsd=fsd1,anchor=/root/node-1,depth=2,width=2,files=2,size=128k
fwd=fwd1,fsd=fsd1,operation=read,xfersize=4k,fileio=sequential,fileselect=random,threads=2
rd=rd1,fwd=fwd1,fwdrate=max,format=yes,elapsed=10,interval=1

该配置文件表示:
第一行:
fsd、fwd、rd:是唯一标识
depth:从/root/node-1目录开始,在其中创建2层深度的目录(深度)
width:从/root/node-1目录开始,每层目录创建2个平级目录(广度)
files:在使用depth和width创建的目录中,最深层每个目录创建2个文件
size:每个文件大小为128k
openflags:
第二行:
operation:值为read,表示每个线程,根据fileselect的值(这里是随机)选择一个文件后,打开该文件进行读取
xfersize:连续读取4k blocks(xfersize=4k)直到文件结束(size=128k),关闭文件并随机选择另一个文件
fileio:表示文件IO的方式,random或者sequential
fileselect:值为random,表示每个线程随机选择一个文件
threads:值为2,表示启动2个线程(线程默认值为1)
第三行:
fwdrate:每秒有多少file system operations,max为无限制
format:值为yes,表示创建完整的目录结构,包括所有文件初始化到所需的128k大小
elapsed:持续运行时间,默认设置为30(以秒为单位)。注意:至少是interval的2倍,
interval:该参数指定每个报告间隔时间(以秒为单位)

3、开始一个简单的测试
[root@cephL vdbench]# ./vdbench -f filesystem.conf
4、查看被测目录中生成的测试文件
[root@cephL ~]# tree /root/node-1/ -h
/root/node-1/ --- depth 0
├── [ 68] no_dismount.txt
├── [ 44] vdb.1_1.dir --- depth 1 width 1
│   ├── [ 50] vdb.2_1.dir --- depth 2 width 1
│   │   ├── [128K] vdb_f0001.file --- depth 2 width 1 files 1
│   │   └── [128K] vdb_f0002.file --- depth 2 width 1 files 2
│   └── [ 50] vdb.2_2.dir --- depth 2 width 2
│   ├── [128K] vdb_f0001.file --- depth 2 width 2 files 1
│   └── [128K] vdb_f0002.file --- depth 2 width 2 files 2
├── [ 44] vdb.1_2.dir --- depth 1 width 2
│   ├── [ 50] vdb.2_1.dir --- depth 2 width 1
│   │   ├── [128K] vdb_f0001.file --- depth 2 width 1 files 1
│   │   └── [128K] vdb_f0002.file --- depth 2 width 1 files 2
│   └── [ 50] vdb.2_2.dir --- depth 2 width 2
│   ├── [128K] vdb_f0001.file --- depth 2 width 2 files 1
│   └── [128K] vdb_f0002.file --- depth 2 width 2 files 2
└── [ 159] vdb_control.file

6 directories, 10 files

其他常用参数

openflags

1
2
这个参数可以指定在SD, WD, FSD, FWD, RD中
可用

sizes(注意这里有s)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
这个参数可以指定在FSD中
该参数指定文件的大小(size),可以指定一个或一组文件的大小(size)。
例如:sizes=(32k,50,64k,50),其中第一个数字表示文件大小(size),第二个数字表示必须具有此大小(size)的文件的百分比。如果指定一组文件,百分比加起来必须为100。
当您指定sizes=(nnn,0)时,vdbench将创建平均大小(size)为'nnn'字节的文件

以上标出(size)可能与下面的规则有关,所以保留原文。以下规则个人理解为,如果大于某个size规则(如果有小数点),那么必须是某个规则的倍数。
有一些规则与最终使用的文件大小(size)有关:
如果size > 10m,size将为1m的倍数。(个人理解,size为10.1m是不行的,必须是1m的倍数)
如果size > 1m,size将为100k的倍数。
如果size > 100k,size将为10k的倍数。
如果size < 100k,size将为1k的倍数。

关于百分比的实验:
实验1
在8个文件中,4个1k的文件,4个2k的文件
[root@cephL vdbench]# vi filesystem.conf
fsd=fsd1,anchor=/root/node-1,depth=2,width=2,files=2,sizes=(1k,50,2k,50)
fwd=fwd1,fsd=fsd1,operation=read,xfersize=4k,fileio=sequential,fileselect=random,threads=2
rd=rd1,fwd=fwd1,fwdrate=max,format=yes,elapsed=10,interval=1

[root@cephL vdbench]# ./vdbench -f filesystem.conf
[root@cephL ~]# tree node-1/ -h
node-1/
├── [ 68] no_dismount.txt
├── [ 44] vdb.1_1.dir
│   ├── [ 50] vdb.2_1.dir
│   │   ├── [2.0K] vdb_f0001.file
│   │   └── [1.0K] vdb_f0002.file
│   └── [ 50] vdb.2_2.dir
│   ├── [1.0K] vdb_f0001.file
│   └── [2.0K] vdb_f0002.file
├── [ 44] vdb.1_2.dir
│   ├── [ 50] vdb.2_1.dir
│   │   ├── [1.0K] vdb_f0001.file
│   │   └── [1.0K] vdb_f0002.file
│   └── [ 50] vdb.2_2.dir
│   ├── [2.0K] vdb_f0001.file
│   └── [2.0K] vdb_f0002.file
└── [ 194] vdb_control.file

6 directories, 10 files

实验2
在8个文件中,1个1k的文件,3个2k的文件,3个3k的文件,1个4k的文件。不知道为什么。。。。。。。。
[root@cephL vdbench]# vi filesystem.conf
fsd=fsd1,anchor=/root/node-1,depth=2,width=2,files=2,sizes=(1k,25,2k,25,3k,25,4k,25)
fwd=fwd1,fsd=fsd1,operation=read,xfersize=4k,fileio=sequential,fileselect=random,threads=2
rd=rd1,fwd=fwd1,fwdrate=max,format=yes,elapsed=10,interval=1

[root@cephL vdbench]# ./vdbench -f filesystem.conf
[root@cephL ~]# tree node-1/ -h
node-1/
├── [ 68] no_dismount.txt
├── [ 44] vdb.1_1.dir
│   ├── [ 50] vdb.2_1.dir
│   │   ├── [3.0K] vdb_f0001.file
│   │   └── [2.0K] vdb_f0002.file
│   └── [ 50] vdb.2_2.dir
│   ├── [1.0K] vdb_f0001.file
│   └── [3.0K] vdb_f0002.file
├── [ 44] vdb.1_2.dir
│   ├── [ 50] vdb.2_1.dir
│   │   ├── [2.0K] vdb_f0001.file
│   │   └── [2.0K] vdb_f0002.file
│   └── [ 50] vdb.2_2.dir
│   ├── [4.0K] vdb_f0001.file
│   └── [3.0K] vdb_f0002.file
└── [ 234] vdb_control.file

6 directories, 10 files

多机测试

应用场景为多个NFS Client挂在相同NFS Server的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
hd=default,vdbench=/home/vdbench,user=root,shell=ssh
hd=hd1,system=node1
hd=hd2,system=node2
hd=hd3,system=node3

fsd=fsd1,anchor=/mnt/test863,depth=1,width=10,files=10000,size=20m,shared=yes
fwd=format,threads=6,xfersize=1m
fwd=default,xfersize=1m,fileio=random,fileselect=random,rdpct=100,threads=6
fwd=fwd1,fsd=fsd1,host=hd1
fwd=fwd2,fsd=fsd1,host=hd2
fwd=fwd3,fsd=fsd1,host=hd3

rd=rd1,fwd=fwd*,fwdrate=max,format=(restart,only),elapsed=600,interval=1

参数解析:

有3台测试节点node1、node2、node3。每台测试节点的/home/vdbench/目录都存在可执行vdbench二进制文件(位置必须相同),使用root用户通过ssh方式连接(节点间需要做ssh免密),每台测试节点的测试目录为/mnt/test863,目录深度为1,最深层目录中的目录宽度为10,最深层每个目录中有10000个文件,每个文件大小20mb

关于shared=yes

随着Vdbench运行多个slaves和可选的多个hosts,slaves和hosts之间关于文件状态的通信变得困难。使所有这些slaves设备相互通信所涉及的开销变得过于昂贵。您不希望一个slave删除另一个slave当前正在读取或写入的文件。因此,Vdbench不允许您跨slaves和hosts共享FSD。

当然,在你开始使用庞大的文件系统之前,这一切听起来都很棒。 您刚刚填满了500 TB的磁盘文件,然后您决定要与一个或多个远程主机共享该数据。 从头开始重新创建整个文件结构需要很长时间。 该怎么办?

指定’shared = yes’时,Vdbench将允许您共享文件系统定义(FSD)。 它通过允许每个slave设备仅使用FSD文件结构中定义的每个“第n”文件来实现这一点,其中“n”等于slave数量。(It does this by allowing each slave to use only every ‘nth’ file as is defined in the FSD file structure, where ‘n’ equals the amount of slaves.)

这意味着不同的host不会互相踩到脚趾,但有一个例外:当您指定’format = yes’时,Vdbench首先删除已存在的文件结构。由于这可能是一个旧的文件结构,Vdbench无法传播文件删除周围,让每个slave删除他的’第n’文件。因此,每个slave设备都会尝试删除所有文件,但如果删除失败则不会生成错误消息(因为不同的slave设备只是删除了它)。这些失败的删除将被计算并报告在“Miscellaneous statistics”中的“FILE_DELETE_SHARED”计数器下。但是,“FILE_DELETES”计数器可以包含高于存在的文件数量的计数。我已经看到多个slaves设备能够同时删除同一个文件而没有操作系统将任何错误传递给Java的情况。

关于rdpct(Read Percentage)

此参数允许您混合读取和写入。 使用operation=read只允许你做read,operation=write只允许你做write。 但是,指定rdpct=,您将能够在同一个选定文件中混合读取和写入。请注意,对于sequential(顺序),这没有多大意义。您可以以读取块1,写入块2,读取块3等。对于随机I/O,这非常有意义。

关于format=

  • no

    不需要任何格式,但现有文件结构必须与 FSD 定义的结构相匹配。

  • yes

    Vdbench 将首先删除当前文件结构,然后再次创建文件结构。然后,它将执行您在 RD 中的请求。

  • restart

    Vdbench将仅创建尚未创建的文件,并且还将扩展未达到其正确大小的文件。 (这是totalsize和workingsetsize可以发挥作用的地方)。

  • only

    与’yes’相同,但Vdbench不会执行当前的RD。

  • dir(ectories)

    与‘yes’相同,但它只会创建目录。

  • clean

    Vdbench只会删除当前的文件结构,而不会执行当前的RD。

  • once

    这将覆盖每个forxxx参数循环完成format的默认行为。

  • limited

    format将在elapsed=seconds之后终止,而不是所有文件或为totalsize=选择的文件已格式化之后终止。

  • complete

    只能与’format=no’一起使用,并且会告诉Vdbench format已经完成,但是Vdbench不应该尝试通过目录搜索来验证每个目录和文件的状态。 当然,如果一个或多个目录或文件丢失或文件未达到其预期大小,结果不可预测。在测试期间删除或创建目录或文件时非常危险。

Welcome to RocksDB

RocksDB:A Persistent Key-Value Store for Flash and RAM Storage

RocksDB是一个嵌入式key-value store C ++库,其中keys和values是任意byte streams。RocksDB由Facebook Database Engineering Team开发和维护,基于LevelDB二次开发,并对LevelDB API提供向后兼容。它支持原子读取和写入。RocksDB借鉴了开源leveldb项目中的重要代码以及Apache HBase的重要思想。最初的代码是从开源的leveldb 1.5中分离出来的。它还建立在Facebook之前在RocksDB开发的代码和想法之上。

RocksDB针对Flash进行了优化,延迟极低。RocksDB使用Log Structured Database Engine进行存储,完全用C ++编写。一个名为RocksJava的Java版本目前正在开发中。

RocksDB具有高度灵活的配置选项,可以调整以运行在各种生产环境上的不同设备之上,包括pure memory,Flash,hard disks或HDFS。它支持各种压缩算法并且为production support和debugging提供良好的工具。

Features

  • 专门希望存储在本地Flash drives或RAM中,高达几TB数据的应用程序服务器而设计

  • 针对fast storage设备,存储small到medium size key-values进行了优化( flash devices 或 in-memory )

  • 与CPU数量线性扩展,以便在具有多核的处理器上运行良好

    Features Not in LevelDB

RocksDB引入了几十个新的主要features。不在LevelDB中的feature列表

Assumptions and Goals

Performance

RocksDB设计主要专注于fast storage和server workloads。它应该利用Flash或RAM子系统提供的高速率读/写全部潜力。它应该支持高效的point lookups以及range scans。它应该可配置为支持high random-read workloads,high update workloads或两者的组合(最优解)。其架构应易于调整Read Amplification, Write Amplification 和 Space Amplification。

Production Support

Compatibility

ceph L版已经发布很久了,官方说默认使用BlueStore作为OSD的存储后端,在Cephalocon APAC 2018上也是讨论的焦点之一。

提到BlueStore,不得不说一说Ceph的STORAGE DEVICES。

STORAGE DEVICES

Ceph守护进程将数据存储在磁盘上:

1
2
3
4
5
6
Ceph OSDs ( Object Storage Daemons )
Client端的大多数数据写入Ceph后被存储的地方,一般而言,每个OSD都由单一存储设备支持,如传统硬盘(HDD)或固态硬盘(SSD)。
OSD还可以由多种设备组合,如存储数据的HDD和存储某些元数据的SSD(或SSD的分区)。
群集中OSD的数量通常取决于你要存储的数据量,还需要考虑每个存储设备的容量以及冗余级别和存储类型(replication或erasure coding)。
Ceph Monitor
管理关键群集状态,如cluster membership和authentication信息。对于较小的集群,需要几千兆字节(几个GB),然而对于较大的集群,monitor的数据库可以达到几十甚至几百千兆(几十个GB甚至几百个GB)。

OSD BACKENDS

OSD可以通过两种方式管理存储的数据。从Luminous 12.2.z发行版开始,新的默认(推荐)后端是 BlueStore。在Luminous之前,默认(也是唯一的选择)是 FileStore。

BLUESTORE

BlueStore是专门用于Ceph OSD管理磁盘上的数据的专用存储后端。在过去十年间,受到了FileStore管理OSD经验的启发.
BlueStore的主要功能包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
直接管理存储设备 ( Direct management of storage devices )
BlueStore使用原始块设备或分区。这避免了任何可能限制性能或增加复杂性的抽象层(如像XFS这样的本地文件系统)。

使用RocksDB进行元数据管理 ( Metadata management with RocksDB )
为了管理内部元数据,我们嵌入了RocksDB的key/value数据库。例如在磁盘上,从object names到block locations的映射。

完整的数据和元数据校验 ( Full data and metadata checksumming )
默认情况下,写入BlueStore的所有数据和元数据都受到一个或多个校验和的保护。没有数据或元数据在未经过验证的情况下,就从磁盘读取或返回给用户。

内置压缩 ( Inline compression )
写入的数据在写入磁盘之前可以选择压缩。

多设备元数据分层 ( Multi-device metadata tiering )
BlueStore允许将其内部journal(预写日志,write-ahead log)写入单独的高速设备(如SSD,NVMe或NVDIMM)以提高性能。
如果有大量更快速的存储可用,则内部元数据也可以存储在更快的设备上。

高效的写时复制 ( Efficient copy-on-write )
RBD和CephFS快照依赖于copy-on-write clone机制,也在BlueStore中得到了有效的实现。这将为常规快照和erasure coded池提供高效的IO(依靠clone实现高效的two-phase commits)

http://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/
http://docs.ceph.com/docs/master/rados/operations/bluestore-migration/

FILESTORE

FileStore是在Ceph中存储objects的传统方法。它依赖于标准文件系统(通常是XFS)和某个元数据的key/value数据库(传统上是LevelDB,现在是RocksDB)结合使用。
FileStore经过良好测试并广泛用于生产,但由于其整体设计和对传统文件系统存储object数据的依赖性,因此存在许多性能缺陷。
尽管FileStore通常能够在大多数与POSIX兼容的文件系统(包括btrfs和ext4)上运行,但我们只建议使用XFS。
btrfs和ext4都有已知的bug和缺陷,使用它们可能会导致数据丢失。默认情况下,所有的Ceph提供的工具都将使用XFS。

http://docs.ceph.com/docs/master/rados/configuration/filestore-config-ref/

在ceph L版代码结构改动比较大,增加了CEPH-MGR向外部监测和管理系统提供额外的监测接口,今天就用虚拟机搭建实验环境玩一玩。

环境信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@cephL ~]# yum install -y redhat-lsb
[root@cephL ~]# lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 7.4.1708 (Core)
Release: 7.4.1708
Codename: Core

[root@cephL ~]# lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
sda 8:0 0 40G 0 disk
├─sda1 8:1 0 1G 0 part /boot
└─sda2 8:2 0 39G 0 part
├─centos-root 253:0 0 36G 0 lvm /
└─centos-swap 253:1 0 3G 0 lvm [SWAP]
sdb 8:16 0 30G 0 disk
sdc 8:32 0 30G 0 disk
sr0 11:0 1 1024M 0 rom

安装

安装pip和ceph-deploy

1
2
3
4
5
6
7
[root@cephL ~]# curl "https://bootstrap.pypa.io/get-pip.py" -o "get-pip.py"
[root@cephL ~]# python get-pip.py
[root@cephL ~]# python -m pip install -U pip
[root@cephL ~]# pip install --upgrade setuptools
[root@cephL ~]# pip install ceph-deploy
[root@cephL ~]# ceph-deploy --version
2.0.0

安装ceph软件包

1
2
[root@cephL ~]# mkdir ceph-deploy && cd ceph-deploy
[root@cephL ceph-deploy]# ceph-deploy install cephL --release luminous

开始部署一个新的集群,然后为它写一个CLUSTER.conf和keyring

1
[root@cephL ceph-deploy]# ceph-deploy new --public-network 192.168.56.101/24  --cluster-network 192.168.56.101/24 cephL

部署MON

1
2
3
[root@cephL ceph-deploy]# ceph-deploy mon create-initial
[root@cephL ceph-deploy]# ceph-deploy mon create cephL
ceph 1110 1 0 12:57 ? 00:00:01 /usr/bin/ceph-mon -f --cluster ceph --id cephL --setuser ceph --setgroup ceph

部署OSD

bluestore方法

1
2
3
4
5
# 在创建osd时,L版默认是bluestore
[root@cephL ceph-deploy]# ceph-deploy osd create --data /dev/sdb cephL
ceph 1514 1 0 12:57 ? 00:00:01 /usr/bin/ceph-osd -f --cluster ceph --id 0 --setuser ceph --setgroup ceph
[root@cephL ceph-deploy]# ceph-deploy osd create --data /dev/sdc cephL
ceph 1518 1 0 12:57 ? 00:00:01 /usr/bin/ceph-osd -f --cluster ceph --id 1 --setuser ceph --setgroup ceph

遇到问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[root@cephL ceph-deploy]# ceph -s
2018-04-10 12:00:19.660298 7fd1fe0ae700 -1 auth: unable to find a keyring on /etc/ceph/ceph.client.admin.keyring,/etc/ceph/ceph.keyring,/etc/ceph/keyring,/etc/ceph/keyring.bin,: (2) No such file or directory
2018-04-10 12:00:19.660310 7fd1fe0ae700 -1 monclient: ERROR: missing keyring, cannot use cephx for authentication
2018-04-10 12:00:19.660312 7fd1fe0ae700 0 librados: client.admin initialization error (2) No such file or directory
[errno 2] error connecting to the cluster

[root@cephL ceph-deploy]# chmod +r *
[root@cephL ceph-deploy]# cp ceph.client.admin.keyring /etc/ceph/
[root@cephL ceph-deploy]# ceph -s
cluster:
id: 765752b7-1f77-4d0d-bc18-936b8ad409fd
health: HEALTH_WARN
no active mgr

services:
mon: 1 daemons, quorum cephL
mgr: no daemons active
osd: 2 osds: 2 up, 2 in

data:
pools: 0 pools, 0 pgs
objects: 0 objects, 0 bytes
usage: 0 kB used, 0 kB / 0 kB avail
pgs:

filestore方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# 如果是filestore则需要对data device和journal device先做GPT partition
--data DATA The OSD data logical volume (vg/lv) or absolute path to device
--journal JOURNAL Logical Volume (vg/lv) or path to GPT partition

[root@cephL ceph-deploy]# fdisk /dev/sdb
WARNING: fdisk GPT support is currently new, and therefore in an experimental phase. Use at your own discretion.
欢迎使用 fdisk (util-linux 2.23.2)。
更改将停留在内存中,直到您决定将更改写入磁盘。
使用写入命令前请三思。
命令(输入 m 获取帮助):g
Building a new GPT disklabel (GUID: 80097CEF-475B-4161-ACC7-7164F6A39DD2)
命令(输入 m 获取帮助):n
分区号 (1-128,默认 1):
第一个扇区 (2048-62914526,默认 2048):
Last sector, +sectors or +size{K,M,G,T,P} (2048-62914526,默认 62914526):
已创建分区 1
命令(输入 m 获取帮助):w
The partition table has been altered!
Calling ioctl() to re-read partition table.
正在同步磁盘。

[root@cephL ceph-deploy]# fdisk /dev/sdc
WARNING: fdisk GPT support is currently new, and therefore in an experimental phase. Use at your own discretion.
欢迎使用 fdisk (util-linux 2.23.2)。
更改将停留在内存中,直到您决定将更改写入磁盘。
使用写入命令前请三思。
命令(输入 m 获取帮助):g
Building a new GPT disklabel (GUID: 21DFA98C-5BCF-40E7-A120-3DEDEA6600ED)
命令(输入 m 获取帮助):n
分区号 (1-128,默认 1):
第一个扇区 (2048-62914526,默认 2048):
Last sector, +sectors or +size{K,M,G,T,P} (2048-62914526,默认 62914526):
已创建分区 1
命令(输入 m 获取帮助):w
The partition table has been altered!
Calling ioctl() to re-read partition table.
正在同步磁盘。

[root@cephL ceph-deploy]# lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
sda 8:0 0 40G 0 disk
├─sda1 8:1 0 1G 0 part /boot
└─sda2 8:2 0 39G 0 part
├─centos-root 253:0 0 36G 0 lvm /
└─centos-swap 253:1 0 3G 0 lvm [SWAP]
sdb 8:16 0 30G 0 disk
└─sdb1 8:17 0 30G 0 part
sdc 8:32 0 30G 0 disk
└─sdc1 8:33 0 30G 0 part
sr0 11:0 1 1024M 0 rom

[root@cephL ceph-deploy]# ceph-deploy osd create --filestore --fs-type xfs --data /dev/sdb1 --journal /dev/sdc1 cephL
[ceph_deploy.conf][DEBUG ] found configuration file at: /root/.cephdeploy.conf
[ceph_deploy.cli][INFO ] Invoked (2.0.0): /usr/bin/ceph-deploy osd create --filestore --fs-type xfs --data /dev/sdb1 --journal /dev/sdc1 cephL
[ceph_deploy.cli][INFO ] ceph-deploy options:
[ceph_deploy.cli][INFO ] verbose : False
[ceph_deploy.cli][INFO ] bluestore : None
[ceph_deploy.cli][INFO ] cd_conf : <ceph_deploy.conf.cephdeploy.Conf instance at 0x22c7320>
[ceph_deploy.cli][INFO ] cluster : ceph
[ceph_deploy.cli][INFO ] fs_type : xfs
[ceph_deploy.cli][INFO ] block_wal : None
[ceph_deploy.cli][INFO ] default_release : False
[ceph_deploy.cli][INFO ] username : None
[ceph_deploy.cli][INFO ] journal : /dev/sdc1
[ceph_deploy.cli][INFO ] subcommand : create
[ceph_deploy.cli][INFO ] host : cephL
[ceph_deploy.cli][INFO ] filestore : True
[ceph_deploy.cli][INFO ] func : <function osd at 0x225ae60>
[ceph_deploy.cli][INFO ] ceph_conf : None
[ceph_deploy.cli][INFO ] zap_disk : False
[ceph_deploy.cli][INFO ] data : /dev/sdb1
[ceph_deploy.cli][INFO ] block_db : None
[ceph_deploy.cli][INFO ] dmcrypt : False
[ceph_deploy.cli][INFO ] overwrite_conf : False
[ceph_deploy.cli][INFO ] dmcrypt_key_dir : /etc/ceph/dmcrypt-keys
[ceph_deploy.cli][INFO ] quiet : False
[ceph_deploy.cli][INFO ] debug : False
[ceph_deploy.osd][DEBUG ] Creating OSD on cluster ceph with data device /dev/sdb1
[cephL][DEBUG ] connected to host: cephL
[cephL][DEBUG ] detect platform information from remote host
[cephL][DEBUG ] detect machine type
[cephL][DEBUG ] find the location of an executable
[ceph_deploy.osd][INFO ] Distro info: CentOS Linux 7.4.1708 Core
[ceph_deploy.osd][DEBUG ] Deploying osd to cephL
[cephL][DEBUG ] write cluster configuration to /etc/ceph/{cluster}.conf
[cephL][DEBUG ] find the location of an executable
[cephL][INFO ] Running command: /usr/sbin/ceph-volume --cluster ceph lvm create --filestore --data /dev/sdb1 --journal /dev/sdc1
[cephL][DEBUG ] Running command: /bin/ceph-authtool --gen-print-key
[cephL][DEBUG ] Running command: /bin/ceph --cluster ceph --name client.bootstrap-osd --keyring /var/lib/ceph/bootstrap-osd/ceph.keyring -i - osd new 8b7be4a6-b563-434e-b030-132880a10d31
[cephL][DEBUG ] Running command: vgcreate --force --yes ceph-8e2515c1-6170-4299-b82c-a5a47681f946 /dev/sdb1
[cephL][DEBUG ] stdout: Physical volume "/dev/sdb1" successfully created.
[cephL][DEBUG ] stdout: Volume group "ceph-8e2515c1-6170-4299-b82c-a5a47681f946" successfully created
[cephL][DEBUG ] Running command: lvcreate --yes -l 100%FREE -n osd-data-8b7be4a6-b563-434e-b030-132880a10d31 ceph-8e2515c1-6170-4299-b82c-a5a47681f946
[cephL][DEBUG ] stdout: Logical volume "osd-data-8b7be4a6-b563-434e-b030-132880a10d31" created.
[cephL][DEBUG ] Running command: /bin/ceph-authtool --gen-print-key
[cephL][DEBUG ] Running command: mkfs -t xfs -f -i size=2048 /dev/ceph-8e2515c1-6170-4299-b82c-a5a47681f946/osd-data-8b7be4a6-b563-434e-b030-132880a10d31
[cephL][DEBUG ] stdout: meta-data=/dev/ceph-8e2515c1-6170-4299-b82c-a5a47681f946/osd-data-8b7be4a6-b563-434e-b030-132880a10d31 isize=2048 agcount=4, agsize=1965824 blks
[cephL][DEBUG ] = sectsz=512 attr=2, projid32bit=1
[cephL][DEBUG ] = crc=1 finobt=0, sparse=0
[cephL][DEBUG ] data = bsize=4096 blocks=7863296, imaxpct=25
[cephL][DEBUG ] = sunit=0 swidth=0 blks
[cephL][DEBUG ] naming =version 2 bsize=4096 ascii-ci=0 ftype=1
[cephL][DEBUG ] log =internal log bsize=4096 blocks=3839, version=2
[cephL][DEBUG ] = sectsz=512 sunit=0 blks, lazy-count=1
[cephL][DEBUG ] realtime =none extsz=4096 blocks=0, rtextents=0
[cephL][DEBUG ] Running command: mount -t xfs -o rw,noatime,inode64 /dev/ceph-8e2515c1-6170-4299-b82c-a5a47681f946/osd-data-8b7be4a6-b563-434e-b030-132880a10d31 /var/lib/ceph/osd/ceph-0
[cephL][DEBUG ] Running command: chown -R ceph:ceph /dev/sdc1
[cephL][DEBUG ] Running command: ln -s /dev/sdc1 /var/lib/ceph/osd/ceph-0/journal
[cephL][DEBUG ] Running command: ceph --cluster ceph --name client.bootstrap-osd --keyring /var/lib/ceph/bootstrap-osd/ceph.keyring mon getmap -o /var/lib/ceph/osd/ceph-0/activate.monmap
[cephL][DEBUG ] stderr: got monmap epoch 1
[cephL][DEBUG ] Running command: chown -R ceph:ceph /dev/sdc1
[cephL][DEBUG ] Running command: chown -R ceph:ceph /var/lib/ceph/osd/ceph-0/
[cephL][DEBUG ] Running command: ceph-osd --cluster ceph --osd-objectstore filestore --mkfs -i 0 --monmap /var/lib/ceph/osd/ceph-0/activate.monmap --osd-data /var/lib/ceph/osd/ceph-0/ --osd-journal /var/lib/ceph/osd/ceph-0/journal --osd-uuid 8b7be4a6-b563-434e-b030-132880a10d31 --setuser ceph --setgroup ceph
[cephL][DEBUG ] stderr: 2018-05-07 23:01:34.834993 7f315e466d00 -1 journal check: ondisk fsid 00000000-0000-0000-0000-000000000000 doesn't match expected 8b7be4a6-b563-434e-b030-132880a10d31, invalid (someone else's?) journal
[cephL][DEBUG ] stderr: 2018-05-07 23:01:34.865621 7f315e466d00 -1 journal do_read_entry(4096): bad header magic
[cephL][DEBUG ] 2018-05-07 23:01:34.865667 7f315e466d00 -1 journal do_read_entry(4096): bad header magic
[cephL][DEBUG ] 2018-05-07 23:01:34.865988 7f315e466d00 -1 read_settings error reading settings: (2) No such file or directory
[cephL][DEBUG ] stderr: 2018-05-07 23:01:34.916284 7f315e466d00 -1 created object store /var/lib/ceph/osd/ceph-0/ for osd.0 fsid 39f3b85e-ee3c-4d8d-93c2-7f7c8aa47121
[cephL][DEBUG ] Running command: ceph-authtool /var/lib/ceph/osd/ceph-0/keyring --create-keyring --name osd.0 --add-key AQBDavBa0IPpIBAAlQxlaWxNrnTX/uaOMdZEQw==
[cephL][DEBUG ] stdout: creating /var/lib/ceph/osd/ceph-0/keyring
[cephL][DEBUG ] added entity osd.0 auth auth(auid = 18446744073709551615 key=AQBDavBa0IPpIBAAlQxlaWxNrnTX/uaOMdZEQw== with 0 caps)
[cephL][DEBUG ] Running command: chown -R ceph:ceph /var/lib/ceph/osd/ceph-0/keyring
[cephL][DEBUG ] --> ceph-volume lvm prepare successful for: /dev/sdb1
[cephL][DEBUG ] Running command: ln -snf /dev/sdc1 /var/lib/ceph/osd/ceph-0/journal
[cephL][DEBUG ] Running command: chown -R ceph:ceph /dev/sdc1
[cephL][DEBUG ] Running command: systemctl enable ceph-volume@lvm-0-8b7be4a6-b563-434e-b030-132880a10d31
[cephL][DEBUG ] stderr: Created symlink from /etc/systemd/system/multi-user.target.wants/ceph-volume@lvm-0-8b7be4a6-b563-434e-b030-132880a10d31.service to /usr/lib/systemd/system/ceph-volume@.service.
[cephL][DEBUG ] Running command: systemctl start ceph-osd@0
[cephL][DEBUG ] --> ceph-volume lvm activate successful for osd ID: 0
[cephL][DEBUG ] --> ceph-volume lvm create successful for: /dev/sdb1
[cephL][INFO ] checking OSD status...
[cephL][DEBUG ] find the location of an executable
[cephL][INFO ] Running command: /bin/ceph --cluster=ceph osd stat --format=json
[ceph_deploy.osd][DEBUG ] Host cephL is now ready for osd use.

移除OSD

1
2
3
4
5
6
7
8
9
10
11
12
# 使OSD进入out状态
[root@cephL ceph-deploy]# ceph osd out 0
marked out osd.0.
# 观察数据迁移
[root@cephL ceph-deploy]# ceph -w
# 停止对应的OSD进程
[root@cephL ceph-deploy]# sudo systemctl stop ceph-osd@0
# 清除数据
[root@cephL ceph-deploy]# ceph osd purge 0 --yes-i-really-mean-it
purged osd.0
# 在ceph.conf中移除osd配置
[root@cephL ceph-deploy]# vi /etc/ceph/ceph.conf

部署CEPH-MGR

1
2
3
4
5
6
7
8
9
10
install netstat tool
[root@cephL ~]# yum -y install net-tools

[root@cephL ceph-deploy]# ceph-deploy mgr create cephL:cephLMGR
ceph 1111 1 0 12:57 ? 00:00:08 /usr/bin/ceph-mgr -f --cluster ceph --id cephLMGR --setuser ceph --setgroup ceph
[root@cephL ceph-deploy]# ceph mgr module enable dashboard

open 7000 port
[root@cephL ceph-deploy]# sudo firewall-cmd --zone=public --add-port=7000/tcp --permanent
[root@cephL ceph-deploy]# sudo firewall-cmd --reload

相关命令

1
2
3
[root@cephL ceph-deploy]# ceph mgr module ls
[root@cephL ceph-deploy]# ceph mgr services
[root@cephL ceph-deploy]# ceph tell mgr help

部署MDS并创建CEPH FS

1
2
[root@cephL ceph-deploy]# ceph-deploy mds create cephL
ceph 2150 1 0 13:00 ? 00:00:00 /usr/bin/ceph-mds -f --cluster ceph --id cephL --setuser ceph --setgroup ceph

Ceph文件系统至少需要两个RADOS pool,一个用于存储数据,一个用于存储元数据。

配置这些pool时,可以考虑:

​ 对元数据pool使用更多的replication数量,因为该pool中的任何数据丢失都可能导致整个文件系统无法访问。

​ 为元数据pool使用SSD等较低延迟的存储设备,因为这将直接影响客户端上文件系统操作的延迟。

1
2
3
4
5
ceph osd pool create cephfs_data <pg_num>
ceph osd pool create cephfs_metadata <pg_num>
例如:
[root@cephL ceph-deploy]# ceph osd pool create cephfs_data 32
[root@cephL ceph-deploy]# ceph osd pool create cephfs_metadata 32

更改pool的副本数

1
2
3
4
ceph osd pool set {poolname} size {num-replicas}
例如:
[root@cephL ceph-deploy]# ceph osd pool set cephfs_data size 1
[root@cephL ceph-deploy]# ceph osd pool set cephfs_data size 1

一旦创建了pool,就可以使用fs new命令启用文件系统:

1
2
3
ceph fs new <fs_name> <metadata> <data>
例如:
ceph fs new cephFS cephfs_metadata cephfs_data

一旦创建了文件系统,您的MDS将能够进入active状态。例如,在single MDS system中:

1
2
[root@cephL ceph-deploy]# ceph mds stat
cephFS-1/1/1 up {0=cephL=up:active}

一旦创建了文件系统并且MDS处于active状态,你就可以挂载文件系统了。如果您创建了多个文件系统,在挂载文件系统时,选择使用哪一个。

如果创建了多个文件系统,并且client在挂载时没有指定挂载哪个文件系统,你可以使用ceph fs set-default命令来设置client默认看到的文件系统。

挂载CEPH FS ( File System ) 有两种方式:

KERNEL DRIVER

要挂载Ceph文件系统,您可以在知道monitor主机IP地址的情况下使用mount命令,或使用mount.ceph utility将monitor主机名解析为IP地址。例如:

1
2
3
4
5
6
7
sudo mkdir /mnt/mycephfs
sudo mount -t ceph 192.168.0.1:6789:/ /mnt/mycephfs
例如:
[root@cephL ceph-deploy]# sudo mount -t ceph 192.168.56.101:6789:/ /mnt/mycephfs
mount error 22 = Invalid argument
Ceph 10.x (Jewel)版本开始,如果使用kernel方式(无论是krbd还是cephFS)官方推荐至少使用4.x的kernel。
如果无法升级linux kernel,那么映射rbd请使用librbd方式,cephFS请使用fuse方式。

如果挂载Ceph文件系统时开启了cephx authentication,您必须指定user和secret。

1
sudo mount -t ceph 192.168.0.1:6789:/ /mnt/mycephfs -o name=admin,secret=AQATSKdNGBnwLhAAnNDKnH65FmVKpXZJVasUeQ==

上述用法在Bash history中留下了secret。更安全的方法是从文件中读取secret。 例如:

1
sudo mount -t ceph 192.168.0.1:6789:/ /mnt/mycephfs -o name=admin,secretfile=/etc/ceph/admin.secret

如果您有多个文件系统,请使用mds_namespace选项指定要挂载的文件系统,例如-o mds_namespace=myfs

要卸载Ceph文件系统,可以使用umount命令。 例如:

1
2
sudo umount /mnt/mycephfs
提示:在执行此命令之前,请确保您不在挂载的目录中。

FUSE

在用户空间(FUSE)中挂载Ceph文件系统之前,请确保客户端主机具有Ceph配置文件的副本以及Ceph元数据服务器的CAPS keyring。

在您的客户端主机上,将Ceph配置文件从monitor主机复制到/etc/ceph目录。

1
2
sudo mkdir -p /etc/ceph
sudo scp {user}@{server-machine}:/etc/ceph/ceph.conf /etc/ceph/ceph.conf

在您的客户端主机上,将monitor主机的Ceph keyring复制到/etc/ceph目录。

1
sudo scp {user}@{server-machine}:/etc/ceph/ceph.keyring /etc/ceph/ceph.keyring

确保Ceph配置文件和keyring在您的客户端机器上设置了适当的权限(例如,chmod 644)。

要将Ceph文件系统挂在为FUSE,可以使用ceph-fuse命令。 例如:

1
2
sudo mkdir /home/usernname/cephfs
sudo ceph-fuse -m 192.168.0.1:6789 /home/username/cephfs

如果您拥有多个文件系统,请使用 –client_mds_namespace 命令行参数指定要挂载哪一个文件系统,或者向ceph.conf中添加client_mds_namespace设置。

要自动挂载ceph-fuse,您可以在system fstab中添加一个条目。此外还可以使用ceph-fuse@.service和ceph-fuse.target systemd units。通常这些unit文件为ceph-fuse描述默认的dependencies和推荐的execution context。例如使用ceph-fuse挂载到/mnt:

1
sudo systemctl start ceph-fuse@/mnt.service

持久化挂载点可通过以下方式进行设置:

1
sudo systemctl enable ceph-fuse@/mnt.service

部署RGW

Ceph Object Gateway原来叫RADOS Gateway,它是构建在librados之上的对象存储接口,为应用程序提供了一个RESTful gateway,用户可以通过HTTP协议访问Ceph存储集群。

Ceph Object Storage支持两个接口:

  • S3-compatible:与Amazon S3 RESTful API中一些子集兼容的接口,提供对象存储功能。

  • Swift-compatible:与OpenStack Swift API中一些子集兼容的接口,提供对象存储功能。

Ceph Object Storage使用Ceph Object Gateway daemon (radosgw),它是一个HTTP server,用于与Ceph存储集群进行交互。由于它提供了与OpenStack Swift和Amazon S3兼容的接口,因此Ceph Object Gateway具有自己的用户管理。Ceph Object Gateway可以将数据存储在与Ceph Filesystem和Ceph Block Device相同的Ceph存储集群中。但是我相信在生产环境中不会这么做,如果数据量大的话会影响Ceph Filesystem和Ceph Block Device的性能,个人一般会独立出一个Ceph Object Gateway集群。S3和Swift API共享一个通用的namespace,因此您可以使用一个API编写数据并使用另一个API检索它。

1
Note:Ceph Object Storage 不使用 Ceph Metadata Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 必须部署MGR,才能部署RGW

[root@cephL ceph-deploy]# ceph-deploy rgw create cephL:RGW
root 2799 1 0 13:13 ? 00:00:00 /usr/bin/radosgw -f --cluster ceph --name client.rgw.RGW --setuser ceph --setgroup ceph

# 重启RGW
[root@cephL ~]# systemctl restart ceph-radosgw@rgw.cephL.service
[root@cephL ~]# systemctl restart ceph-radosgw@rgw

问题一,这难道是ceph-deploy 2.0.0的坑?
[root@cephL ~]# tailf /var/log/ceph/ceph-client.rgw.log
2018-05-11 22:30:31.999421 7f537c31fe00 0 ceph version 12.2.4 (52085d5249a80c5f5121a76d6288429f35e4e77b) luminous (stable), process (unknown), pid 3450
2018-05-11 22:30:32.021546 7f537c31fe00 -1 auth: unable to find a keyring on /var/lib/ceph/radosgw/ceph-rgw/keyring: (2) No such file or directory
2018-05-11 22:30:32.021561 7f537c31fe00 -1 monclient: ERROR: missing keyring, cannot use cephx for authentication
2018-05-11 22:30:32.021563 7f537c31fe00 0 librados: client.rgw initialization error (2) No such file or directory
2018-05-11 22:30:32.022900 7f537c31fe00 -1 Couldn't init storage provider (RADOS)

[root@cephL radosgw]# pwd
/var/lib/ceph/radosgw
[root@cephL radosgw]# ls
ceph-rgw.RGW
[root@cephL radosgw]# mv ceph-rgw.RGW ceph-rgw


配置变动

在L版中,删除pool的操作做了强制限制。需要在/etc/ceph/ceph.conf中加入相关参数才允许删除pool。

1
2
# 允许删除pool,需要添加
mon allow pool delete = true

Go命令

Go发行版包含了一个名为go的命令,实现Go包和命令的自动下载,编译,安装和测试。

动机

Go从开始就明确目标:能够仅使用源代码本身的信息来构建Go代码,而不需要编写makefile或Makefile。

起初,在没有Go编译器的时候,为了达到自动化构建的目的,Go单独使用一个程序生成makefile。新的go命令的目的使我们回到这个理想。

Go的惯例

该go命令要求代码遵循几个关键的惯例:

1
2
3
4
5
1、导入路径源自源码的URL。对于Bitbucket、GitHub、Google Code和Launchpad来说, 其代码仓库的根目录由该仓库的主URL确定,无需 http:// 前缀。 子目录名附加在该路径之后。
例如https://github.com/golang/example这个项目,仓库根目录的导入路径为“github.com/golang/example”。 stringutil 包存储在子目录中,因此它的导入路径为“github.com/golang/example/stringutil”。
2、本地文件系统中存储源码的位置源自导入路径。特别地,首选路径为 $GOPATH/src/<导入路径>。若 $GOPATH 未设置, go命令会回到标准Go包存储源码的地方,即 $GOROOT/src/pkg/<导入路径>。 若 $GOPATH 设置了多个路径,go命令就会尝试这些目录的每一个 <目录>/src/<导入路径>。
3、源码树中的每个目录都对应于单个包。通过将一个目录限定为单个包, 我们无需先指定目录,再从该目录中指定包来创建混合的导入路径了。此外, 大部分文件管理工具和用户界面,都是将目录作为基本工作单元的。 将基本的Go单元—包—同化为文件系统的结构,也就意味着文件系统工具成了Go包的工具。 复制、移动或删除一个包就对应于复制、移动或删除一个目录。
4、包只通过源码中的信息构建。这会让它更容易适应构建环境和条件的改变。 例如,若我们允许额外的配置(如编译器标志或命令行选项等),那么每当构建工具被更改后, 相应的配置也需要更新;它天生还会绑定至特定的工具链。

go 命令基础

首先配置GOPATH,进入此目录后,我们就可以添加一些源码了。假设我们想要使用codesearch项目中的索引包, 以及一个左倾红黑树包。我们可以用“go get”子命令安装二者:

1
2
$ go get code.google.com/p/codesearch/index
$ go get github.com/petar/GoLLRB/llrb

这两个包现在已被下载并安装到我们的 $GOPATH 目录中了。该目录树现在包含了 src/code.google.com/p/codesearch/index/ 和 src/github.com/petar/GoLLRB/llrb/ 这两个目录,以及那些库和依赖的已编译包 (在 pkg/ 中)

“go list”子命令会列出其对应实参的导入路径,而模式”./…” 意为从当前目录(”./“)开始,查找该目录下的所有包(”…”):

1
2
3
yujiangdeMBP-13:go yujiang$ go list ./...
github.com/prometheus/prometheus/cmd/prometheus
github.com/prometheus/prometheus/cmd/promtool

我们也可以测试这些包:

1
go test ./...

“go install”子命令会将包的最新副本安装到pkg目录中。由于 go 命令会分析依赖关系,因此,除非该包导入的其它包已过期,否则 “go install” 也会递归地根据依赖关系安装它们。

go get

可以根据要求和实际情况从互联网上下载或更新指定的代码包及其依赖包,并对它们进行编译和安装。

1
2
[root@centos7 ~]# go help get
usage: go get [-d] [-f] [-fix] [-insecure] [-t] [-u] [-v] [build flags] [packages]

-d标志指示在下载软件包后停止,不安装软件包。

-f标志仅在设置-u时有效,该标记会让命令程序忽略掉对已下载代码包的导入路径的检查。如果下载并安装的代码包所属的项目是你从别人那里Fork过来的,那么这样做就尤为重要了。

-fix标志指示命令程序在下载代码包后先执行修正动作,而后再进行编译和安装。

-insecure标志允许命令程序使用非安全的scheme(如HTTP)去下载指定的代码包。如果你用的代码仓库(如公司内部的Gitlab)没有HTTPS支持,可以添加此标记。请在确定安全的情况下使用它。

-t标志指示让命令程序同时下载并安装指定的代码包中的测试源码文件中依赖的代码包。

-u标志指示让命令利用网络来更新已有代码包及其依赖包。默认情况下,该命令只会从网络上下载本地不存在的代码包,而不会更新已有的代码包。

-v标志启用详细的进度和调试输出。

-x标志可以看到go get命令执行过程中所使用的所有命令

Get也接受build flags来控制安装。请参阅go help build

当检出一个新的包时,get创建目标目录GOPATH/src/<import-path>。如果GOPATH包含多个条目,get使用第一个条目。欲了解更多详情,请参阅:go help gopath

在checkout或update软件包时,请查找与本地安装的Go版本相匹配的branch或tag。最重要的规则是,如果本地安装运行版本“go1”,则搜索名为“go1”的branch或tag。如果不存在这样的版本,它将检索软件包的默认branch。

当去checkout或update git repository时,它还会更新repository引用的任何git子模块。

永远不要checksout或update存储在vendor目录中的代码。

有关指定软件包的更多信息,请参阅go help packages

有关go get如何查找要下载的源代码的更多信息,请参阅go help importpath

另请参见:go buildgo installgo clean

参考资料

【1】http://wiki.jikexueyuan.com/project/go-command-tutorial/0.3.html

配置 Golang 开发环境

1、首先根据你所使用的系统,下载对应的Golang包。

国内可以在这里下载:https://studygolang.com/dl

2、然后配置Golang的环境变量,配置方法类似于Java环境变量

持久化方法:编辑 ~/.bash_profile 文件,配置自己的目录,例如:

1
2
3
4
5
6
7
8
9
10
11
12
GOPATH="/Users/yujiang/go"
GOROOT="/usr/local/go"

# 测试段
export GOPATH="/root/gocodes"
export GOROOT=/usr/local/go
export GOBIN=$GOROOT/bin
export GOPKG=$GOROOT/pkg/tool/linux_amd64
export GOARCH=amd64
export GOOS=linux
export GOPATH=/Golang
export PATH=$PATH:$GOBIN:$GOPKG:$GOPATH/bin

使配置生效
source ~/.bash_profile

主要的环境变量有:

GOROOT:你所安装的go可执行文件的目录

GOROOT="/usr/local/go"

GOPATH:是自己的项目所在目录,也就是所谓的工作目录,可以配置多个目录,使用’;’分隔。

GOPATH如果配置多个目录,使用go install/go get时,会默认匹配第一个目录,后面的忽略。

GOPATH="/Users/yujiang/go"

3、工作目录结构

通常在你的工作目录中,包含3个子目录

1
2
3
bin 存放编译后的二进制文件
src 存放源码文件
pkg 存放编译后的库文件*.a