connection.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "database/sql"
  6. "database/sql/driver"
  7. "encoding/binary"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "math"
  13. "net"
  14. "reflect"
  15. "strings"
  16. "time"
  17. )
  18. // MySQL Packet Data Types
  19. const TypeNullTerminatedString = int(0)
  20. const TypeFixedString = int(1)
  21. const TypeFixedInt = int(2)
  22. const TypeLenEncInt = int(3)
  23. const TypeRestOfPacketString = int(4)
  24. // Integer Maximums
  25. const MaxUint8 = 1<<8 - 1
  26. const MaxUint16 = 1<<16 - 1
  27. const MaxUint24 = 1<<24 - 1
  28. const MaxUint64 = 1<<64 - 1
  29. // Misc. Constants
  30. const NullByte byte = 0
  31. const MaxPacketSize = MaxUint16
  32. type Config struct {
  33. Host string `json:"host"`
  34. Port int `json:"port"`
  35. User string `json:"user"`
  36. Pass string `json:"password"`
  37. Database string `json:"database"`
  38. SSL bool `json:"ssl"`
  39. VerifyCert bool `json:"verify_cert"`
  40. Timeout time.Duration
  41. }
  42. func splitByBytesFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
  43. if atEOF {
  44. return 0, nil, errors.New("scanner found EOF")
  45. }
  46. return 1, data[:1], nil
  47. }
  48. func newBinlogConfig(dsn string) (*Config, error) {
  49. var err error
  50. b, err := ioutil.ReadFile(dsn)
  51. if err != nil {
  52. return nil, err
  53. }
  54. config := Config{}
  55. err = json.Unmarshal(b, &config)
  56. return &config, err
  57. }
  58. type Conn struct {
  59. Config *Config
  60. tcpConn *net.TCPConn
  61. Handshake *Handshake
  62. HandshakeResponse *HandshakeResponse
  63. buffer *bufio.ReadWriter
  64. scanner *bufio.Scanner
  65. err error
  66. sequenceId uint64
  67. writeBuf *bytes.Buffer
  68. }
  69. func newBinlogConn(config *Config) Conn {
  70. return Conn{
  71. Config: config,
  72. sequenceId: 1,
  73. }
  74. }
  75. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  76. return nil, nil
  77. }
  78. func (c Conn) Close() error {
  79. return nil
  80. }
  81. func (c Conn) Begin() (driver.Tx, error) {
  82. return nil, nil
  83. }
  84. type Driver struct{}
  85. func (d Driver) Open(dsn string) (driver.Conn, error) {
  86. config, err := newBinlogConfig(dsn)
  87. if nil != err {
  88. return nil, err
  89. }
  90. c := newBinlogConn(config)
  91. dialer := net.Dialer{Timeout: c.Config.Timeout}
  92. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  93. t, err := dialer.Dial("tcp", addr)
  94. if err != nil {
  95. netErr, ok := err.(net.Error)
  96. if ok && !netErr.Temporary() {
  97. fmt.Printf("Error: %s", netErr.Error())
  98. return nil, err
  99. }
  100. } else {
  101. c.tcpConn = t.(*net.TCPConn)
  102. }
  103. err = c.decodeHandshakePacket()
  104. if err != nil {
  105. return nil, err
  106. }
  107. err = c.writeHandshakeResponse()
  108. if err != nil {
  109. return nil, err
  110. }
  111. err = c.listen()
  112. return c, err
  113. }
  114. func (c *Conn) listen() error {
  115. ph, err := c.getPacketHeader()
  116. if err != nil {
  117. return err
  118. }
  119. switch ph.Status {
  120. case 0x01:
  121. p, err := c.decodeAuthMoreDataResponsePacket(ph)
  122. if err != nil {
  123. return err
  124. }
  125. fmt.Printf("%+v", p)
  126. case 0x00:
  127. fmt.Println("OK")
  128. case 0xFE:
  129. fmt.Println("EOF")
  130. case 0xFF:
  131. fmt.Println("ERROR")
  132. }
  133. err = c.scanner.Err()
  134. if err != nil {
  135. return err
  136. }
  137. err = c.listen() // Listen forever until we get an error.
  138. if err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. type PacketHeader struct {
  144. Length uint64
  145. SequenceID uint64
  146. Status uint64
  147. }
  148. func (c *Conn) getPacketHeader() (PacketHeader, error) {
  149. ph := PacketHeader{}
  150. ph.Length = c.getInt(TypeFixedInt, 3)
  151. ph.SequenceID = c.getInt(TypeFixedInt, 1)
  152. ph.Status = c.getInt(TypeFixedInt, 1)
  153. err := c.scanner.Err()
  154. if err != nil {
  155. return ph, err
  156. }
  157. return ph, nil
  158. }
  159. func init() {
  160. sql.Register("mysql-binlog", &Driver{})
  161. }
  162. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  163. if c.buffer == nil {
  164. c.buffer = bufio.NewReadWriter(
  165. bufio.NewReader(c.tcpConn),
  166. bufio.NewWriter(c.tcpConn),
  167. )
  168. c.scanner = bufio.NewScanner(c.buffer.Reader)
  169. c.scanner.Split(splitByBytesFunc)
  170. }
  171. b := make([]byte, 0)
  172. for i := uint64(0); i < l; i++ {
  173. c.scanner.Scan()
  174. b = append(b, c.scanner.Bytes()...)
  175. }
  176. return bytes.NewBuffer(b)
  177. }
  178. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  179. l := uint64(1)
  180. s := c.readBytes(l)
  181. b := s.Bytes()
  182. for true {
  183. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  184. break
  185. }
  186. s = c.readBytes(uint64(l))
  187. b = append(b, s.Bytes()...)
  188. }
  189. return bytes.NewBuffer(b)
  190. }
  191. func (c *Conn) discardBytes(l int) {
  192. for i := 0; i < l; i++ {
  193. c.scanner.Scan()
  194. }
  195. }
  196. func (c *Conn) getInt(t int, l uint64) uint64 {
  197. var v uint64
  198. switch t {
  199. case TypeFixedInt:
  200. v = c.decFixedInt(l)
  201. default:
  202. v = 0
  203. }
  204. return v
  205. }
  206. func (c *Conn) getString(t int, l uint64) string {
  207. var v string
  208. switch t {
  209. case TypeFixedString:
  210. v = c.decFixedString(l)
  211. case TypeNullTerminatedString:
  212. v = c.decNullTerminatedString()
  213. default:
  214. v = ""
  215. }
  216. return v
  217. }
  218. func (c *Conn) decNullTerminatedString() string {
  219. b := c.getBytesUntilNull()
  220. return strings.TrimRight(b.String(), string(NullByte))
  221. }
  222. func (c *Conn) decFixedString(l uint64) string {
  223. b := c.readBytes(l)
  224. return b.String()
  225. }
  226. func (c *Conn) decFixedInt(l uint64) uint64 {
  227. var i uint64
  228. b := c.readBytes(l)
  229. if l <= 2 {
  230. var x uint16
  231. pb := c.padBytes(2, b.Bytes())
  232. br := bytes.NewReader(pb)
  233. _ = binary.Read(br, binary.LittleEndian, &x)
  234. i = uint64(x)
  235. } else if l <= 4 {
  236. var x uint32
  237. pb := c.padBytes(4, b.Bytes())
  238. br := bytes.NewReader(pb)
  239. _ = binary.Read(br, binary.LittleEndian, &x)
  240. i = uint64(x)
  241. } else if l <= 8 {
  242. var x uint64
  243. pb := c.padBytes(8, b.Bytes())
  244. br := bytes.NewReader(pb)
  245. _ = binary.Read(br, binary.LittleEndian, &x)
  246. i = x
  247. }
  248. return i
  249. }
  250. func (c *Conn) padBytes(l int, b []byte) []byte {
  251. bl := len(b)
  252. pl := l - bl
  253. for i := 0; i < pl; i++ {
  254. b = append(b, NullByte)
  255. }
  256. return b
  257. }
  258. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  259. b := make([]byte, 8)
  260. binary.LittleEndian.PutUint64(b, v)
  261. return b[:l]
  262. }
  263. func (c *Conn) encLenEncInt(v uint64) []byte {
  264. prefix := make([]byte, 1)
  265. var b []byte
  266. switch {
  267. case v < MaxUint8:
  268. b = make([]byte, 2)
  269. binary.LittleEndian.PutUint16(b, uint16(v))
  270. b = b[:1]
  271. case v >= MaxUint8 && v < MaxUint16:
  272. prefix[0] = 0xFC
  273. b = make([]byte, 3)
  274. binary.LittleEndian.PutUint16(b, uint16(v))
  275. b = b[:2]
  276. case v >= MaxUint16 && v < MaxUint24:
  277. prefix[0] = 0xFD
  278. b = make([]byte, 4)
  279. binary.LittleEndian.PutUint32(b, uint32(v))
  280. b = b[:3]
  281. case v >= MaxUint24 && v < MaxUint64:
  282. prefix[0] = 0xFE
  283. b = make([]byte, 9)
  284. binary.LittleEndian.PutUint64(b, uint64(v))
  285. }
  286. if len(b) > 1 {
  287. b = append(prefix, b...)
  288. }
  289. return b
  290. }
  291. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  292. l := len(b)
  293. t := reflect.TypeOf(s)
  294. v := reflect.New(t.Elem()).Elem()
  295. for i := uint(0); i < uint(v.NumField()); i++ {
  296. f := v.Field(int(i))
  297. var v bool
  298. switch {
  299. case l > 4:
  300. x := binary.LittleEndian.Uint64(b)
  301. flag := uint64(1 << i)
  302. v = x&flag > 0
  303. case l > 2:
  304. x := binary.LittleEndian.Uint32(b)
  305. flag := uint32(1 << i)
  306. v = x&flag > 0
  307. case l > 1:
  308. x := binary.LittleEndian.Uint16(b)
  309. flag := uint16(1 << i)
  310. v = x&flag > 0
  311. default:
  312. x := uint(b[0])
  313. flag := uint(1 << i)
  314. v = x&flag > 0
  315. }
  316. f.SetBool(v)
  317. }
  318. return v.Interface()
  319. }
  320. func (c *Conn) structToBitmask(s interface{}) []byte {
  321. t := reflect.TypeOf(s).Elem()
  322. sV := reflect.ValueOf(s).Elem()
  323. fC := uint(t.NumField())
  324. m := uint64(0)
  325. for i := uint(0); i < fC; i++ {
  326. f := sV.Field(int(i))
  327. v := f.Bool()
  328. if v {
  329. m |= 1 << i
  330. }
  331. }
  332. l := uint64(math.Ceil(float64(fC) / 8.0))
  333. b := make([]byte, 8)
  334. binary.LittleEndian.PutUint64(b, m)
  335. switch {
  336. case l > 4: // 64 bits
  337. b = b[:8]
  338. case l > 2: // 32 bits
  339. b = b[:4]
  340. case l > 1: // 16 bits
  341. b = b[:2]
  342. default: // 8 bits
  343. b = b[:1]
  344. }
  345. return b
  346. }
  347. func (c *Conn) putString(t int, v string) uint64 {
  348. b := make([]byte, 0)
  349. switch t {
  350. case TypeFixedString:
  351. b = c.encFixedString(v)
  352. case TypeNullTerminatedString:
  353. b = c.encNullTerminatedString(v)
  354. case TypeRestOfPacketString:
  355. b = c.encRestOfPacketString(v)
  356. }
  357. l, err := c.writeBuf.Write(b)
  358. if err != nil {
  359. c.err = err
  360. }
  361. return uint64(l)
  362. }
  363. func (c *Conn) encNullTerminatedString(v string) []byte {
  364. return append([]byte(v), NullByte)
  365. }
  366. func (c *Conn) encFixedString(v string) []byte {
  367. return []byte(v)
  368. }
  369. func (c *Conn) encRestOfPacketString(v string) []byte {
  370. s := c.encFixedString(v)
  371. return s
  372. }
  373. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  374. c.setupWriteBuffer()
  375. b := make([]byte, 0)
  376. switch t {
  377. case TypeFixedInt:
  378. b = c.encFixedLenInt(v, l)
  379. case TypeLenEncInt:
  380. b = c.encLenEncInt(v)
  381. }
  382. n, err := c.writeBuf.Write(b)
  383. if err != nil {
  384. c.err = err
  385. }
  386. return uint64(n)
  387. }
  388. func (c *Conn) putNullBytes(n uint64) uint64 {
  389. c.setupWriteBuffer()
  390. b := make([]byte, n)
  391. l, err := c.writeBuf.Write(b)
  392. if err != nil {
  393. c.err = err
  394. }
  395. return uint64(l)
  396. }
  397. func (c *Conn) putBytes(v []byte) uint64 {
  398. c.setupWriteBuffer()
  399. l, err := c.writeBuf.Write(v)
  400. if err != nil {
  401. c.err = err
  402. }
  403. return uint64(l)
  404. }
  405. func (c *Conn) Flush() error {
  406. if c.err != nil {
  407. return c.err
  408. }
  409. c.writeBuf = c.addHeader()
  410. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  411. if c.buffer.Flush() != nil {
  412. return c.buffer.Flush()
  413. }
  414. return nil
  415. }
  416. func (c *Conn) addHeader() *bytes.Buffer {
  417. pl := uint64(c.writeBuf.Len())
  418. sId := uint64(c.sequenceId)
  419. c.sequenceId++
  420. plB := c.encFixedLenInt(pl, 3)
  421. sIdB := c.encFixedLenInt(sId, 1)
  422. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  423. }
  424. func (c *Conn) setupWriteBuffer() {
  425. if c.writeBuf == nil {
  426. c.writeBuf = bytes.NewBuffer(nil)
  427. }
  428. }