connection.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "database/sql"
  6. "database/sql/driver"
  7. "encoding/binary"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "math"
  12. "math/bits"
  13. "net"
  14. "reflect"
  15. "strings"
  16. "time"
  17. )
  18. // MySQL Packet Data Types
  19. const TypeNullTerminatedString = int(0)
  20. const TypeFixedString = int(1)
  21. const TypeFixedInt = int(2)
  22. const TypeLenEncInt = int(3)
  23. // Integer Maximums
  24. const MaxUint8 = 1<<8 - 1
  25. const MaxUint16 = 1<<16 - 1
  26. const MaxUint24 = 1<<24 - 1
  27. const MaxUint64 = 1<<64 - 1
  28. // Misc. Constants
  29. const NullByte byte = 0
  30. const MaxPacketSize = MaxUint16
  31. type Config struct {
  32. Host string `json:"host"`
  33. Port int `json:"port"`
  34. User string `json:"user"`
  35. Pass string `json:"password"`
  36. Database string `json:"database"`
  37. SSL bool `json:"ssl"`
  38. VerifyCert bool `json:"verify_cert"`
  39. Timeout time.Duration
  40. }
  41. func splitByBytesFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
  42. if atEOF {
  43. return 0, nil, errors.New("scanner found EOF")
  44. }
  45. return 1, data[:1], nil
  46. }
  47. func newBinlogConfig(dsn string) (*Config, error) {
  48. var err error
  49. config := Config{}
  50. err = json.Unmarshal([]byte(dsn), &config)
  51. return &config, err
  52. }
  53. type Conn struct {
  54. Config *Config
  55. tcpConn *net.TCPConn
  56. Handshake *Handshake
  57. buffer *bufio.ReadWriter
  58. scanner *bufio.Scanner
  59. err error
  60. sequenceId uint64
  61. writeBuf *bytes.Buffer
  62. }
  63. func newBinlogConn(config *Config) Conn {
  64. return Conn{
  65. Config: config,
  66. sequenceId: 0,
  67. }
  68. }
  69. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  70. return nil, nil
  71. }
  72. func (c Conn) Close() error {
  73. return nil
  74. }
  75. func (c Conn) Begin() (driver.Tx, error) {
  76. return nil, nil
  77. }
  78. type Driver struct{}
  79. func (d Driver) Open(dsn string) (driver.Conn, error) {
  80. config, err := newBinlogConfig(dsn)
  81. if nil != err {
  82. return nil, err
  83. }
  84. c := newBinlogConn(config)
  85. dialer := net.Dialer{Timeout: c.Config.Timeout}
  86. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  87. t, err := dialer.Dial("tcp", addr)
  88. c.tcpConn = t.(*net.TCPConn)
  89. if err != nil {
  90. netErr, ok := err.(net.Error)
  91. if ok && netErr.Temporary() {
  92. fmt.Printf("Error: %s", netErr.Error())
  93. return nil, err
  94. }
  95. }
  96. err = c.decodeHandshakePacket()
  97. if err != nil {
  98. return nil, err
  99. }
  100. err = c.writeHandshakeResponse()
  101. if err != nil {
  102. return nil, err
  103. }
  104. return c, err
  105. }
  106. func init() {
  107. sql.Register("mysql-binlog", &Driver{})
  108. }
  109. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  110. if c.buffer == nil {
  111. c.buffer = bufio.NewReadWriter(
  112. bufio.NewReader(c.tcpConn),
  113. bufio.NewWriter(c.tcpConn),
  114. )
  115. c.scanner = bufio.NewScanner(c.buffer.Reader)
  116. c.scanner.Split(splitByBytesFunc)
  117. }
  118. b := make([]byte, 0)
  119. for i := uint64(0); i < l; i++ {
  120. c.scanner.Scan()
  121. b = append(b, c.scanner.Bytes()...)
  122. }
  123. return bytes.NewBuffer(b)
  124. }
  125. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  126. l := uint64(1)
  127. s := c.readBytes(l)
  128. b := s.Bytes()
  129. for true {
  130. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  131. break
  132. }
  133. s = c.readBytes(uint64(l))
  134. b = append(b, s.Bytes()...)
  135. }
  136. return bytes.NewBuffer(b)
  137. }
  138. func (c *Conn) discardBytes(l int) {
  139. for i := 0; i < l; i++ {
  140. c.scanner.Scan()
  141. }
  142. }
  143. func (c *Conn) getInt(t int, l uint64) uint64 {
  144. var v uint64
  145. switch t {
  146. case TypeFixedInt:
  147. v = c.decFixedInt(l)
  148. default:
  149. v = 0
  150. }
  151. return v
  152. }
  153. func (c *Conn) getString(t int, l uint64) string {
  154. var v string
  155. switch t {
  156. case TypeFixedString:
  157. v = c.decFixedString(l)
  158. case TypeNullTerminatedString:
  159. v = c.decNullTerminatedString()
  160. default:
  161. v = ""
  162. }
  163. return v
  164. }
  165. func (c *Conn) decNullTerminatedString() string {
  166. b := c.getBytesUntilNull()
  167. return strings.TrimRight(b.String(), string(NullByte))
  168. }
  169. func (c *Conn) decFixedString(l uint64) string {
  170. b := c.readBytes(l)
  171. return b.String()
  172. }
  173. func (c *Conn) decFixedInt(l uint64) uint64 {
  174. var i uint64
  175. b := c.readBytes(l)
  176. i, _ = binary.ReadUvarint(b)
  177. return i
  178. }
  179. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  180. b := make([]byte, 8)
  181. binary.LittleEndian.PutUint64(b, v)
  182. return b[:l]
  183. }
  184. func (c *Conn) encLenEncInt(v uint64) []byte {
  185. prefix := make([]byte, 1)
  186. var b []byte
  187. switch {
  188. case v < MaxUint8:
  189. b = make([]byte, 2)
  190. binary.LittleEndian.PutUint16(b, uint16(v))
  191. b = b[:1]
  192. case v >= MaxUint8 && v < MaxUint16:
  193. prefix[0] = 0xFC
  194. b = make([]byte, 3)
  195. binary.LittleEndian.PutUint16(b, uint16(v))
  196. b = b[:2]
  197. case v >= MaxUint16 && v < MaxUint24:
  198. prefix[0] = 0xFD
  199. b = make([]byte, 4)
  200. binary.LittleEndian.PutUint32(b, uint32(v))
  201. b = b[:3]
  202. case v >= MaxUint24 && v < MaxUint64:
  203. prefix[0] = 0xFE
  204. b = make([]byte, 9)
  205. binary.LittleEndian.PutUint64(b, uint64(v))
  206. }
  207. if len(b) > 1 {
  208. b = append(prefix, b...)
  209. }
  210. return b
  211. }
  212. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  213. l := len(b)
  214. t := reflect.TypeOf(s)
  215. v := reflect.New(t.Elem()).Elem()
  216. for i := uint(0); i < uint(v.NumField()); i++ {
  217. f := v.Field(int(i))
  218. var v bool
  219. switch {
  220. case l > 4:
  221. x := binary.LittleEndian.Uint64(b)
  222. flag := uint64(1 << i)
  223. v = x&flag > 0
  224. case l > 2:
  225. x := binary.LittleEndian.Uint32(b)
  226. flag := uint32(1 << i)
  227. v = x&flag > 0
  228. case l > 1:
  229. x := binary.LittleEndian.Uint16(b)
  230. flag := uint16(1 << i)
  231. v = x&flag > 0
  232. default:
  233. x := uint(b[0])
  234. flag := uint(1 << i)
  235. v = x&flag > 0
  236. }
  237. f.SetBool(v)
  238. }
  239. return v.Interface()
  240. }
  241. func (c *Conn) structToBitmask(s interface{}) []byte {
  242. t := reflect.TypeOf(s).Elem()
  243. sV := reflect.ValueOf(s).Elem()
  244. fC := uint(t.NumField())
  245. m := uint64(0)
  246. for i := uint(0); i < fC; i++ {
  247. f := sV.Field(int(i))
  248. v := f.Bool()
  249. if v {
  250. m |= 1 << i
  251. }
  252. }
  253. l := uint64(math.Ceil(float64(fC) / 8.0))
  254. b := make([]byte, 8)
  255. binary.BigEndian.PutUint64(b, bits.Reverse64(m))
  256. switch {
  257. case l > 4: // 64 bits
  258. b = b[:8]
  259. case l > 2: // 32 bits
  260. b = b[:4]
  261. case l > 1: // 16 bits
  262. b = b[:2]
  263. default: // 8 bits
  264. b = b[:1]
  265. }
  266. return b
  267. }
  268. func (c *Conn) putString(t int, v string) uint64 {
  269. b := make([]byte, 0)
  270. switch t {
  271. case TypeFixedString:
  272. b = c.encFixedString(v)
  273. case TypeNullTerminatedString:
  274. b = c.encNullTerminatedString(v)
  275. }
  276. l, err := c.writeBuf.Write(b)
  277. if err != nil {
  278. c.err = err
  279. }
  280. return uint64(l)
  281. }
  282. func (c *Conn) encNullTerminatedString(v string) []byte {
  283. return append([]byte(v), NullByte)
  284. }
  285. func (c *Conn) encFixedString(v string) []byte {
  286. return []byte(v)
  287. }
  288. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  289. c.setupWriteBuffer()
  290. b := make([]byte, 0)
  291. switch t {
  292. case TypeFixedInt:
  293. b = c.encFixedLenInt(v, l)
  294. case TypeLenEncInt:
  295. b = c.encLenEncInt(v)
  296. }
  297. n, err := c.writeBuf.Write(b)
  298. if err != nil {
  299. c.err = err
  300. }
  301. return uint64(n)
  302. }
  303. func (c *Conn) putNullBytes(n uint64) uint64 {
  304. c.setupWriteBuffer()
  305. b := make([]byte, n)
  306. l, err := c.writeBuf.Write(b)
  307. if err != nil {
  308. c.err = err
  309. }
  310. return uint64(l)
  311. }
  312. func (c *Conn) putBytes(v []byte) uint64 {
  313. c.setupWriteBuffer()
  314. l, err := c.writeBuf.Write(v)
  315. if err != nil {
  316. c.err = err
  317. }
  318. return uint64(l)
  319. }
  320. func (c *Conn) Flush() error {
  321. if c.err != nil {
  322. return c.err
  323. }
  324. c.writeBuf = c.addHeader()
  325. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  326. if c.buffer.Flush() != nil {
  327. return c.buffer.Flush()
  328. }
  329. return nil
  330. }
  331. func (c *Conn) addHeader() *bytes.Buffer {
  332. pl := uint64(c.writeBuf.Len()) + 4
  333. sId := uint64(c.sequenceId)
  334. c.sequenceId++
  335. plB := c.encFixedLenInt(pl, 3)
  336. sIdB := c.encFixedLenInt(sId, 1)
  337. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  338. }
  339. func (c *Conn) setupWriteBuffer() {
  340. if c.writeBuf == nil {
  341. c.writeBuf = bytes.NewBuffer(nil)
  342. }
  343. }
  344. func Reverse(s string) string {
  345. var b strings.Builder
  346. b.Grow(len(s))
  347. for i := len(s) - 1; i >= 0; i-- {
  348. b.WriteByte(s[i])
  349. }
  350. return b.String()
  351. }