waka.dev

2025-12-07 RubyのTCPソケットでMySQLにクエリを発行する

前回 Ruby の TCPソケット通信で MySQL の認証を通すやり方を書いたので、その続き。
認証を通せれば、Rubyから SQL を発行して実行できるようになるのでやり方をまとめる。

MySQL には各種処理を実行するためのコマンドというのがあり、SQL の実行は COM_QUERY というコマンドになる。

主要なコマンド一覧

コマンド 渡すもの
COM_QUIT 0x01 なし
COM_INIT_DB 0x02 database名
COM_QUERY 0x03 SQLクエリ文字列
COM_PING 0x0E なし
COM_BINLOG_DUMP 0x12 binlog位置情報
COM_REGISTER_SLAVE 0x15 スレーブ情報
COM_STMT_PREPARE 0x16 SQLクエリ
COM_STMT_EXECUTE 0x17 statement_id + パラメータ
COM_STMT_CLOSE 0x19 statement_id

コマンド値は 8ビット符号なし整数でパケット送信する。

例えば、接続確認用の PING コマンドの実行と結果の確認はこんな感じで取れる。

socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

sequence_id = 0

def read_packet
  header = socket.read(4)

  packet_length = header[0].unpack1('C') | (header[1].unpack1('C') << 8) | (header[2].unpack1('C') << 16)
  sequence_id = header[3]unpack1('C')
  payload = socket.read(packet_length)

  { packet_length:, payload: }
end

def send_packet(payload)
  sequence_id += 1

  packet_length = payload.length
  header = [packet_length].pack('V')[0..2] + [sequence_id].pack('C')
  socket.write(header + payload)
end

payload = [0x0E].pack('C') # PINGコマンド
send_packet(payload)

response = read_packet(payload)
if response[:payload][0].unpack1('C') == 0x00 # 成功
  puts 'PONG'
else
  puts 'Oops'
end

COM_QUERY コマンドを実行する

SQL を実行させるための COM_QUERY コマンドのペイロードはこれでOK

payload = [0x03].pack('C') + sql.encode('UTF-8')

クエリ実行のパケットを送信すると、MySQL でクエリが実行されて結果をパケット受信できるようになる。
実行結果は OK Packet / ERR Packet / Result Set の3パターンあって、構造が違うのでそれぞれパースする必要がある。

どのパターンかはパケットのペイロードの 1 バイト目を見ればわかるようになっている。

  • 0x00 -> OK Packet
  • 0xFF -> Err Packet
  • 0x01~0xFA -> Result Set
  • 0xFB -> Local INFILE Request

LOCAL INFILE Request は特殊なので今回は未対応。

OK Packet の場合

結果セットを返さないクエリが成功したときがこれ。
メジャーなところだと、INSERT / UPDATE / DELETE文を実行したときこれが返る。

パケットの構造はこんな感じ。

サイズ フィールド
1 byte integer header 0x00が入る
length-encoded integer affected_rows
length-encoded integer last_insert_id
2 bytes integer status_flags サーバ状態を表すフラグ
2 bytes integer warnings
N bytes string info NULL終端

length-encoded integer は最初の 1 バイトで長さが決まる整数。
最初の 1 バイトごとにこんな感じで長さと値を取る。

  • 0x00 から 0xFA(250)
    • 最初の 1 バイトがそのまま値になる
  • 0xFC
    • 後続の 2 バイトが値
  • 0xFD
    • 後続の 3 バイトが値
  • 0xFE
    • 後続の 8 バイトが値
  • 0xFB
    • NULL
def length_encoded_integer(payload, offset)
  first_byte = payload[offset].unpack1('C')

  case first_byte
  when 0..250
    { value: first_byte, bytes_read: 1 }
  when 0xFC
    value = payload[(offset + 1)..(offset + 2)].unpack1('s<')
    { value: value, bytes_read: 3 }
  when 0xFD
    value = payload[(offset + 1)..(offset + 3)].unpack1('V') & 0xFFFFFF
    { value: value, bytes_read: 4 }
  when 0xFE
    value = payload[(offset + 1)..(offset + 8)].unpack1('Q<')
    { value: value, bytes_read: 9 }
  else # Included 0xFB
    { value: nil, bytes_read: 1 }
  end
end

ERR Packet の場合

文字通りクエリ実行がエラーになった時がこれ。

パケットの構造はこんな感じ。

サイズ フィールド
1 byte integer header 0xFFが入る
2 bytes integer error_code
1 byte integer sql_state_marker "#"固定
5 bytes string sql_state
N bytes string error_message NULL終端

Result Set の場合

SELECT みたいな結果行レコードが返るクエリがこれ。

こいつは複雑で、複数のパケットで構成される。

  • カラム数パケット
    • この後何回カラム定義パケットを取ればいいかを伝える
  • カラム定義パケット
  • EOFパケット
    • カラム定義パケットが終わったことを伝える
  • 行データパケット
  • EOFパケット
    • 行データパケットが終わったことを伝える

カラム数パケット

サイズ フィールド
1 byte integer column_count カラムの数が入る

カラム定義パケット

サイズ フィールド
length-encoded string catalog "def" というダミー値固定
length-encoded string schema データベース名
length-encoded string table テーブル名(エイリアス)
length-encoded string org_table 物理テーブル名
length-encoded string name カラム名(エイリアス)
length-encoded string org_name 物理カラム名
length-encoded integer length_of_fixed_fields "0x0c" 固定
2 bytes integer charset
4 bytes integer column_length カラムの最大長
1 byte integer column_type データ型
2 bytes integer flags カラムの属性を表すビットフラグ
1 byte integer decimals 小数点以下の桁数
2 bytes integer filter "0x00 0x00" 固定

length-encoded stringlength-encoded integer + length-encoded integer が返す値の長さの文字列。
こんな感じで取る。

def length_encoded_string(payload, offset)
  length_info = length_encoded_integer(payload, offset)
  return { value: '', bytes_read: length_info[:bytes_read] } if length_info[:value].nil?

  string_start = length_info[:bytes_read]
  string_end = string_start + length_info[:value] - 1
  value = payload[string_start..string_end] || ''

  { value: value, bytes_read: length_info[:bytes_read] + length_info[:value] }
end

EOF パケット

サイズ フィールド
1 byte integer eof "0xFE" 固定

行データパケット
カラム数分 length-encoded string が繰り返され、取れる値がカラムの値になる。
ただし、1 バイト目が 0xFB の場合は値が NULL になっているので最初に NULL チェックする。

これらをまとめると、Result Set パケットを処理するコードはこんな感じになる。

# カラム定義パケットのパース
def parse_column_definition(payload)
  offset = 0

  catalog = length_encoded_string(payload, offset)
  offset += catalog[:bytes_read]

  schema = length_encoded_string(payload, offset)
  offset += schema[:bytes_read]

  table = length_encoded_string(payload, offset)
  offset += table[:bytes_read]

  org_table = length_encoded_string(payload, offset)
  offset += org_table[:bytes_read]

  name = length_encoded_string(payload, offset)
  offset += name[:bytes_read]

  org_name = length_encoded_string(payload, offset)
  offset += org_name[:bytes_read]

  # length of fixed-length fields (1 byte)
  offset += 1

  charset = payload[offset..(offset + 1)].unpack1('v')
  offset += 2

  column_length = payload[offset..(offset + 3)].unpack1('V')
  offset += 4

  column_type = payload[offset].unpack1('C')

  {
    schema: schema[:value],
    table: table[:value],
    org_table: org_table[:value],
    name: name[:value],
    org_name: org_name[:value],
    charset: charset,
    column_length: column_length,
    column_type: column_type
  }
end

# 行データパケットのパース
def parse_row_data(payload, columns)
  first_byte = payload[0].unpack1('C')
  row = {}
  offset = 0

  columns.each do |column|
    column_name_key = column[:name].downcase.to_sym

    if offset >= payload.length
      row[column_name_key] = nil
      next
    end

    if first_byte == 0xFB
      # NULL value
      row[column_name_key] = nil
      offset += 1
    else
      # row data (length-encoded string)
      value = length_encoded_string(payload, offset)
      row[column_name_key] = value[:value]
      offset += value[:bytes_read]
    end
  end

  row
end

# カラム数パケット、カラム定義パケットを処理
columns = []
column_count = length_encoded_integer(payload, 0)[:value].to_i
column_count.times do
  column_packet = read_packet
  column_info = parse_column_definition(column_packet[:payload])
  columns << column_info
end

# EOF パケットを読み進める
connection.read_packet

# 行データパケットを処理
rows = []
loop do
  row_packet = read_packet

  # EOF パケットになったら終了
  if row_packet[:payload][0].unpack1('C') == 0xFE
    break
  end

  rows << parse_row_data(row_packet[:payload], columns)
end

puts rows