connection.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "database/sql"
  7. "database/sql/driver"
  8. "encoding/binary"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "io/ioutil"
  13. "math"
  14. "net"
  15. "reflect"
  16. "strings"
  17. "time"
  18. )
  19. // Misc. Constants
  20. const NullByte byte = 0
  21. const MaxPacketSize = MaxUint16
  22. // MySQL Packet Data Types
  23. const TypeNullTerminatedString = int(0)
  24. const TypeFixedString = int(1)
  25. const TypeFixedInt = int(2)
  26. const TypeLenEncInt = int(3)
  27. const TypeRestOfPacketString = int(4)
  28. const TypeLenEncString = int(5)
  29. // Integer Maximums
  30. const MaxUint08 = 1<<8 - 1
  31. const MaxUint16 = 1<<16 - 1
  32. const MaxUint24 = 1<<24 - 1
  33. const MaxUint64 = 1<<64 - 1
  34. // Packet Statuses
  35. const StatusOK = 0x00
  36. const StatusEOF = 0xFE
  37. const StatusErr = 0xFF
  38. const StatusAuth = 0x01
  39. type Config struct {
  40. Host string `json:"host"`
  41. Port int `json:"port"`
  42. User string `json:"user"`
  43. Pass string `json:"password"`
  44. Database string `json:"database"`
  45. SSL bool `json:"ssl"`
  46. SSLCA string `json:"ssl-ca"`
  47. SSLCer string `json:"ssl-cer"`
  48. SSLKey string `json:"ssl-key"`
  49. VerifyCert bool `json:"verify-cert"`
  50. ServerId uint64 `json:"server-id"`
  51. BinLogFile string `json:"binlog-file"`
  52. Timeout time.Duration
  53. }
  54. func newBinlogConfig(dsn string) (*Config, error) {
  55. var err error
  56. b, err := ioutil.ReadFile(dsn)
  57. if err != nil {
  58. return nil, err
  59. }
  60. config := Config{}
  61. err = json.Unmarshal(b, &config)
  62. return &config, err
  63. }
  64. type Conn struct {
  65. Config *Config
  66. curConn net.Conn
  67. tcpConn *net.TCPConn
  68. secTCPConn *tls.Conn
  69. Handshake *Handshake
  70. HandshakeResponse *HandshakeResponse
  71. buffer *bufio.ReadWriter
  72. scanner *bufio.Scanner
  73. err error
  74. sequenceId uint64
  75. writeBuf *bytes.Buffer
  76. StausFlags *StatusFlags
  77. }
  78. func newBinlogConn(config *Config) Conn {
  79. return Conn{
  80. Config: config,
  81. sequenceId: 1,
  82. }
  83. }
  84. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  85. return nil, nil
  86. }
  87. func (c Conn) Close() error {
  88. return nil
  89. }
  90. func (c Conn) Begin() (driver.Tx, error) {
  91. return nil, nil
  92. }
  93. type Driver struct{}
  94. func (d Driver) Open(dsn string) (driver.Conn, error) {
  95. config, err := newBinlogConfig(dsn)
  96. if nil != err {
  97. return nil, err
  98. }
  99. c := newBinlogConn(config)
  100. var t interface{}
  101. dialer := net.Dialer{Timeout: c.Config.Timeout}
  102. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  103. t, err = dialer.Dial("tcp", addr)
  104. if err != nil {
  105. netErr, ok := err.(net.Error)
  106. if ok && !netErr.Temporary() {
  107. fmt.Printf("Error: %s", netErr.Error())
  108. return nil, err
  109. }
  110. } else {
  111. c.tcpConn = t.(*net.TCPConn)
  112. c.setConnection(t.(net.Conn))
  113. }
  114. err = c.decodeHandshakePacket()
  115. if err != nil {
  116. return nil, err
  117. }
  118. c.HandshakeResponse = c.NewHandshakeResponse()
  119. // If we are on SSL send SSL_Request packet now
  120. if c.Config.SSL {
  121. err = c.writeSSLRequestPacket()
  122. if err != nil {
  123. return nil, err
  124. }
  125. tlsConf := NewClientTLSConfig(
  126. c.Config.SSLKey,
  127. c.Config.SSLCer,
  128. []byte(c.Config.SSLCA),
  129. c.Config.VerifyCert,
  130. c.Config.Host,
  131. )
  132. c.secTCPConn = tls.Client(c.tcpConn, tlsConf)
  133. c.setConnection(c.secTCPConn)
  134. }
  135. err = c.writeHandshakeResponse()
  136. if err != nil {
  137. return nil, err
  138. }
  139. r, err := c.listen()
  140. switch r.(type) {
  141. case *OKPacket: // Login successful.
  142. fmt.Println("OK: Init bin log")
  143. bldc := &BinLogDumpCommand{
  144. Status: COMMAND_BIN_LOG_DUMP,
  145. Position: 0,
  146. Flags: BINLOG_DUMP_NON_BLOCK,
  147. ServerId: c.Config.ServerId,
  148. Filename: c.Config.BinLogFile,
  149. }
  150. c.writeBinLogDumpCommand(bldc)
  151. _, err := c.listen()
  152. if err != nil {
  153. return nil, err
  154. }
  155. case *ErrorPacket: // Bad login.
  156. case *AuthMoreDataPacket: // Something's fucked
  157. fmt.Println("AuthMoreData")
  158. }
  159. return c, err
  160. }
  161. func (c *Conn) listen() (interface{}, error) {
  162. ph, err := c.getPacketHeader()
  163. if err != nil {
  164. return nil, err
  165. }
  166. c.sequenceId++
  167. var res interface{}
  168. switch ph.Status {
  169. case StatusAuth:
  170. res, err := c.decodeAuthMoreDataResponsePacket(ph)
  171. if err != nil {
  172. return nil, err
  173. }
  174. switch res.Data {
  175. case SHA2_FAST_AUTH_SUCCESS:
  176. case SHA2_REQUEST_PUBLIC_KEY:
  177. case SHA2_PERFORM_FULL_AUTHENTICATION:
  178. c.putBytes(append([]byte(c.Config.Pass), NullByte))
  179. if c.Flush() != nil {
  180. return nil, c.Flush()
  181. }
  182. }
  183. case StatusEOF:
  184. fallthrough
  185. case StatusOK:
  186. res, err = c.decodeOKPacket(ph)
  187. if err != nil {
  188. return nil, err
  189. }
  190. case StatusErr:
  191. res, err = c.decodeErrorPacket(ph)
  192. if err != nil {
  193. return nil, err
  194. }
  195. err = errors.New(
  196. fmt.Sprintf(
  197. "Error %d: %s",
  198. res.(*ErrorPacket).ErrorCode,
  199. res.(*ErrorPacket).ErrorMessage,
  200. ))
  201. return res, err
  202. }
  203. err = c.scanner.Err()
  204. if err != nil {
  205. return nil, err
  206. }
  207. return res, nil
  208. }
  209. type PacketHeader struct {
  210. Length uint64
  211. SequenceID uint64
  212. Status uint64
  213. }
  214. func (c *Conn) getPacketHeader() (PacketHeader, error) {
  215. ph := PacketHeader{}
  216. ph.Length = c.getInt(TypeFixedInt, 3)
  217. ph.SequenceID = c.getInt(TypeFixedInt, 1)
  218. ph.Status = c.getInt(TypeFixedInt, 1)
  219. err := c.scanner.Err()
  220. if err != nil {
  221. return ph, err
  222. }
  223. return ph, nil
  224. }
  225. func init() {
  226. sql.Register("mysql-binlog", &Driver{})
  227. }
  228. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  229. b := make([]byte, 0)
  230. for i := uint64(0); i < l; i++ {
  231. didScan := c.scanner.Scan()
  232. if !didScan {
  233. return nil
  234. }
  235. b = append(b, c.scanner.Bytes()...)
  236. }
  237. return bytes.NewBuffer(b)
  238. }
  239. func (c *Conn) getBytesUntilEOF() *bytes.Buffer {
  240. l := uint64(1)
  241. s := c.readBytes(l)
  242. b := s.Bytes()
  243. for true {
  244. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  245. break
  246. }
  247. s := c.readBytes(uint64(l))
  248. if s == nil {
  249. return bytes.NewBuffer(b)
  250. }
  251. b = append(b, s.Bytes()...)
  252. }
  253. return bytes.NewBuffer(b)
  254. }
  255. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  256. l := uint64(1)
  257. s := c.readBytes(l)
  258. b := s.Bytes()
  259. for true {
  260. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  261. break
  262. }
  263. s = c.readBytes(uint64(l))
  264. b = append(b, s.Bytes()...)
  265. }
  266. return bytes.NewBuffer(b)
  267. }
  268. func (c *Conn) discardBytes(l int) {
  269. for i := 0; i < l; i++ {
  270. c.scanner.Scan()
  271. }
  272. }
  273. func (c *Conn) getInt(t int, l uint64) uint64 {
  274. var v uint64
  275. switch t {
  276. case TypeFixedInt:
  277. v = c.decFixedInt(l)
  278. case TypeLenEncInt:
  279. v = c.decLenEncInt()
  280. default:
  281. v = 0
  282. }
  283. return v
  284. }
  285. func (c *Conn) getString(t int, l uint64) string {
  286. var v string
  287. switch t {
  288. case TypeFixedString:
  289. v = c.decFixedString(l)
  290. case TypeLenEncString:
  291. v = string(c.decLenEncInt())
  292. case TypeNullTerminatedString:
  293. v = c.decNullTerminatedString()
  294. case TypeRestOfPacketString:
  295. v = c.decRestOfPacketString()
  296. default:
  297. v = ""
  298. }
  299. return v
  300. }
  301. func (c *Conn) decRestOfPacketString() string {
  302. b := c.getBytesUntilEOF()
  303. return string(b.Bytes())
  304. }
  305. func (c *Conn) decNullTerminatedString() string {
  306. b := c.getBytesUntilNull()
  307. return strings.TrimRight(b.String(), string(NullByte))
  308. }
  309. func (c *Conn) decFixedString(l uint64) string {
  310. b := c.readBytes(l)
  311. return b.String()
  312. }
  313. func (c *Conn) decLenEncInt() uint64 {
  314. var l uint16
  315. b := c.readBytes(1)
  316. br := bytes.NewReader(b.Bytes())
  317. _ = binary.Read(br, binary.LittleEndian, &l)
  318. if l > 0 {
  319. return c.decFixedInt(uint64(l))
  320. } else {
  321. return 0
  322. }
  323. }
  324. func (c *Conn) decFixedInt(l uint64) uint64 {
  325. var i uint64
  326. b := c.readBytes(l)
  327. if l <= 2 {
  328. var x uint16
  329. pb := c.padBytes(2, b.Bytes())
  330. br := bytes.NewReader(pb)
  331. _ = binary.Read(br, binary.LittleEndian, &x)
  332. i = uint64(x)
  333. } else if l <= 4 {
  334. var x uint32
  335. pb := c.padBytes(4, b.Bytes())
  336. br := bytes.NewReader(pb)
  337. _ = binary.Read(br, binary.LittleEndian, &x)
  338. i = uint64(x)
  339. } else if l <= 8 {
  340. var x uint64
  341. pb := c.padBytes(8, b.Bytes())
  342. br := bytes.NewReader(pb)
  343. _ = binary.Read(br, binary.LittleEndian, &x)
  344. i = x
  345. }
  346. return i
  347. }
  348. func (c *Conn) padBytes(l int, b []byte) []byte {
  349. bl := len(b)
  350. pl := l - bl
  351. for i := 0; i < pl; i++ {
  352. b = append(b, NullByte)
  353. }
  354. return b
  355. }
  356. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  357. b := make([]byte, 8)
  358. binary.LittleEndian.PutUint64(b, v)
  359. return b[:l]
  360. }
  361. func (c *Conn) encLenEncInt(v uint64) []byte {
  362. prefix := make([]byte, 1)
  363. var b []byte
  364. switch {
  365. case v < MaxUint08:
  366. b = make([]byte, 2)
  367. binary.LittleEndian.PutUint16(b, uint16(v))
  368. b = b[:1]
  369. case v >= MaxUint08 && v < MaxUint16:
  370. prefix[0] = 0xFC
  371. b = make([]byte, 3)
  372. binary.LittleEndian.PutUint16(b, uint16(v))
  373. b = b[:2]
  374. case v >= MaxUint16 && v < MaxUint24:
  375. prefix[0] = 0xFD
  376. b = make([]byte, 4)
  377. binary.LittleEndian.PutUint32(b, uint32(v))
  378. b = b[:3]
  379. case v >= MaxUint24 && v < MaxUint64:
  380. prefix[0] = 0xFE
  381. b = make([]byte, 9)
  382. binary.LittleEndian.PutUint64(b, uint64(v))
  383. }
  384. if len(b) > 1 {
  385. b = append(prefix, b...)
  386. }
  387. return b
  388. }
  389. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  390. l := len(b)
  391. t := reflect.TypeOf(s)
  392. v := reflect.New(t.Elem()).Elem()
  393. for i := uint(0); i < uint(v.NumField()); i++ {
  394. f := v.Field(int(i))
  395. var v bool
  396. switch {
  397. case l > 4:
  398. x := binary.LittleEndian.Uint64(b)
  399. flag := uint64(1 << i)
  400. v = x&flag > 0
  401. case l > 2:
  402. x := binary.LittleEndian.Uint32(b)
  403. flag := uint32(1 << i)
  404. v = x&flag > 0
  405. case l > 1:
  406. x := binary.LittleEndian.Uint16(b)
  407. flag := uint16(1 << i)
  408. v = x&flag > 0
  409. default:
  410. x := uint(b[0])
  411. flag := uint(1 << i)
  412. v = x&flag > 0
  413. }
  414. f.SetBool(v)
  415. }
  416. return v.Interface()
  417. }
  418. func (c *Conn) structToBitmask(s interface{}) []byte {
  419. t := reflect.TypeOf(s).Elem()
  420. sV := reflect.ValueOf(s).Elem()
  421. fC := uint(t.NumField())
  422. m := uint64(0)
  423. for i := uint(0); i < fC; i++ {
  424. f := sV.Field(int(i))
  425. v := f.Bool()
  426. if v {
  427. m |= 1 << i
  428. }
  429. }
  430. l := uint64(math.Ceil(float64(fC) / 8.0))
  431. b := make([]byte, 8)
  432. binary.LittleEndian.PutUint64(b, m)
  433. switch {
  434. case l > 4: // 64 bits
  435. b = b[:8]
  436. case l > 2: // 32 bits
  437. b = b[:4]
  438. case l > 1: // 16 bits
  439. b = b[:2]
  440. default: // 8 bits
  441. b = b[:1]
  442. }
  443. return b
  444. }
  445. func (c *Conn) putString(t int, v string) uint64 {
  446. b := make([]byte, 0)
  447. switch t {
  448. case TypeFixedString:
  449. b = c.encFixedString(v)
  450. case TypeNullTerminatedString:
  451. b = c.encNullTerminatedString(v)
  452. case TypeRestOfPacketString:
  453. b = c.encRestOfPacketString(v)
  454. }
  455. l, err := c.writeBuf.Write(b)
  456. if err != nil {
  457. c.err = err
  458. }
  459. return uint64(l)
  460. }
  461. func (c *Conn) encNullTerminatedString(v string) []byte {
  462. return append([]byte(v), NullByte)
  463. }
  464. func (c *Conn) encFixedString(v string) []byte {
  465. return []byte(v)
  466. }
  467. func (c *Conn) encRestOfPacketString(v string) []byte {
  468. s := c.encFixedString(v)
  469. return s
  470. }
  471. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  472. c.setupWriteBuffer()
  473. b := make([]byte, 0)
  474. switch t {
  475. case TypeFixedInt:
  476. b = c.encFixedLenInt(v, l)
  477. case TypeLenEncInt:
  478. b = c.encLenEncInt(v)
  479. }
  480. n, err := c.writeBuf.Write(b)
  481. if err != nil {
  482. c.err = err
  483. }
  484. return uint64(n)
  485. }
  486. func (c *Conn) putNullBytes(n uint64) uint64 {
  487. c.setupWriteBuffer()
  488. b := make([]byte, n)
  489. l, err := c.writeBuf.Write(b)
  490. if err != nil {
  491. c.err = err
  492. }
  493. return uint64(l)
  494. }
  495. func (c *Conn) putBytes(v []byte) uint64 {
  496. c.setupWriteBuffer()
  497. l, err := c.writeBuf.Write(v)
  498. if err != nil {
  499. c.err = err
  500. }
  501. return uint64(l)
  502. }
  503. func (c *Conn) Flush() error {
  504. if c.err != nil {
  505. return c.err
  506. }
  507. c.writeBuf = c.addHeader()
  508. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  509. if c.buffer.Flush() != nil {
  510. return c.buffer.Flush()
  511. }
  512. c.writeBuf = nil
  513. return nil
  514. }
  515. func (c *Conn) addHeader() *bytes.Buffer {
  516. pl := uint64(c.writeBuf.Len())
  517. sId := uint64(c.sequenceId)
  518. c.sequenceId++
  519. plB := c.encFixedLenInt(pl, 3)
  520. sIdB := c.encFixedLenInt(sId, 1)
  521. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  522. }
  523. func (c *Conn) setupWriteBuffer() {
  524. if c.writeBuf == nil {
  525. c.writeBuf = bytes.NewBuffer(nil)
  526. }
  527. }
  528. type StatusFlags struct {
  529. }
  530. type OKPacket struct {
  531. PacketHeader
  532. Header uint64
  533. AffectedRows uint64
  534. LastInsertID uint64
  535. StatusFlags uint64
  536. Warnings uint64
  537. Info string
  538. SessionStateInfo string
  539. }
  540. func (c *Conn) decodeOKPacket(ph PacketHeader) (*OKPacket, error) {
  541. op := OKPacket{}
  542. op.PacketHeader = ph
  543. op.Header = ph.Status
  544. op.AffectedRows = c.getInt(TypeLenEncInt, 0)
  545. op.LastInsertID = c.getInt(TypeLenEncInt, 0)
  546. if c.HandshakeResponse.ClientFlag.Protocol41 {
  547. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  548. op.Warnings = c.getInt(TypeFixedInt, 1)
  549. } else if c.HandshakeResponse.ClientFlag.Transactions {
  550. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  551. }
  552. if c.HandshakeResponse.ClientFlag.SessionTrack {
  553. op.Info = c.getString(TypeRestOfPacketString, 0)
  554. } else {
  555. op.Info = c.getString(TypeRestOfPacketString, 0)
  556. }
  557. return &op, nil
  558. }
  559. type ErrorPacket struct {
  560. PacketHeader
  561. ErrorCode uint64
  562. ErrorMessage string
  563. SQLStateMarker string
  564. SQLState string
  565. }
  566. func (c *Conn) decodeErrorPacket(ph PacketHeader) (*ErrorPacket, error) {
  567. ep := ErrorPacket{}
  568. ep.PacketHeader = ph
  569. ep.ErrorCode = c.getInt(TypeFixedInt, 2)
  570. ep.SQLStateMarker = c.getString(TypeFixedString, 1)
  571. ep.SQLState = c.getString(TypeFixedString, 5)
  572. ep.ErrorMessage = c.getString(TypeRestOfPacketString, 0)
  573. err := c.scanner.Err()
  574. if err != nil {
  575. return nil, err
  576. }
  577. return &ep, nil
  578. }
  579. func (c *Conn) setConnection(nc net.Conn) {
  580. c.curConn = nc
  581. c.buffer = bufio.NewReadWriter(
  582. bufio.NewReader(c.curConn),
  583. bufio.NewWriter(c.curConn),
  584. )
  585. c.scanner = bufio.NewScanner(c.buffer.Reader)
  586. c.scanner.Split(bufio.ScanBytes)
  587. }