s_tajima:TechBlog

渋谷で働くインフラエンジニアのTechブログです。

fluent-plugin-bigqueryでログの書き込みが痕跡なく欠損するケースがある問題

fluent-plugin-bigqueryを使ってBigQueryにStreaming Insertでログを書き込む時に、
痕跡なくログが欠損するケースがあるのでは? という話です。

fluent-plugin-bigqueryでのログの書き込み処理/エラー処理はこのようになっています。
res.success? がtrueであればエラーはなく書き込みが成功しているという想定。
falseの時にはレスポンスのjsonerrorエラーの中身を見て、ログを吐くなどのエラー処理をするようです。

res = client().execute(
  api_method: @bq.tabledata.insert_all,
  parameters: {
    'projectId' => @project,
    'datasetId' => @dataset,
    'tableId' => table_id,
  },
  body_object: {
    "rows" => rows
  }
)
unless res.success?
  # api_error? -> client cache clear
  @cached_client = nil

  res_obj = extract_response_obj(res.body)
  message = res_obj['error']['message'] || res.body
  if res_obj
    if @auto_create_table and res_obj and res_obj['error']['code'] == 404 and /Not Found: Table/i =~ message.to_s
      # Table Not Found: Auto Create Table
      create_table(table_id)
      raise "table created. send rows next time."
    end
  end
  log.error "tabledata.insertAll API", project_id: @project, dataset: @dataset, table: table_id, code: res.status, message: message
  raise "failed to insert into bigquery" # TODO: error class
end

https://github.com/kaizenplatform/fluent-plugin-bigquery/blob/master/lib/fluent/plugin/out_bigquery.rb#L323-L349

ではres.success?とはどのような処理かというとこちら。
BigQueryのAPIのレスポンスコードをみて400系以上でなければtrueが返る仕様のようです。

##
# Check if request failed
#
# @!attribute [r] error?
# @return [TrueClass, FalseClass]
#   true if result of operation is an error
def error?
  return self.response.status >= 400
end

##
# Check if request was successful
#
# @!attribute [r] success?
# @return [TrueClass, FalseClass]
#   true if result of operation was successful
def success?
  return !self.error?
end

https://github.com/google/google-api-ruby-client/blob/v0.8.3/lib/google/api_client/result.rb#L81-L99

BigQueryの挙動を検証していると、レスポンスコードが20Xであるにもかかわらず、
レスポンスのjsonに、特定の行でエラーが含まれることがありました。
そのようなケースでは、fluent-plugin-bigqueryではログが欠損してしまいます。

(この問題だけでなく他のいろいろな経緯から、) 実運用ではfluent-plugin-bigqueryを使うのをやめ、
自前でワーカーを作り以下のようなエラー処理をして運用をはじめました。
エラーになった場合にはエラーの内容を見てリトライをかけるようにしています。

# @gclient = Google::APIClient.new( .. )
# @bq_api = @gclient.discovered_api("bigquery", "v2")
# result = @gclient.execute( :api_method => @bq_api.tabledata.insert_all .. )

def check_result(result, data)
  unless result.success?
    @logger.warn "Insert failed partically. (API call)"
    JSON.parse(result.body)["error"]["errors"].each do |error|
      @logger.warn "Error is #{error["message"]}"
    end
    @retryable = true
    return false
  end

  errors = check_result_errors(result.body)
  unless errors.nil?
    @logger.warn "Insert failed partically. (some rows)"
    @logger.warn "Error is #{errors}"
    @logger.warn "Error data #{data[0]}"
    @retryable = retryable?(errors)
    return false
  end

  @logger.debug "Insert successes. (rows:#{data.size})"
  return true
end

def check_result_errors(body)
  body = JSON.parse(body)
  return nil if body["insertErrors"].nil?

  errors = []
  body["insertErrors"].each do |error|
    errors += error["errors"].inject([]){|x, e| x << e["reason"]}
  end
  return errors.uniq
end

def retryable?(errors)
  return false if errors.include?("invalid")
  true
end

と、ここまでが去年くらいの話。

すると、今日TwitterのタイムラインでBigQueryのログの欠損の話があがっていたのを見かけてこの話を思い出しました。
しばらく間があいたので、状況が変わってるかなと思いしらべてみたけど改善されてはいない模様。

ちなみに、簡単に該当状況が起こる再現コードを書くとこんな感じ。

project_id     = "PROJECT_ID"
dataset_id     = "test"
table_id       = "test"

# schema col1:STRING, col2:INTEGER
rows = [
    {"json"=> {"col1" => "string", "col2" => 1}},
    {"json"=> {"col2" => "string", "col2" =>"string"}},
]

# @gclient = Google::APIClient.new( .. )
# @bq_api = @gclient.discovered_api("bigquery", "v2")
# result = @gclient.execute( :api_method => @bq_api.tabledata.insert_all .. )
result = @gclient.execute(
  :api_method => $bq_api.tabledata.insert_all,
  :parameters => {
    'projectId' => project_id,
    'datasetId' => dataset_id,
    'tableId' => table_id,
  },
  :body_object => {
    "rows" => rows
  }
)

puts result.success?
# => true

puts result.body
# =>
# {
#  "kind": "bigquery#tableDataInsertAllResponse",
#  "insertErrors": [
#   {
#    "index": 1,
#    "errors": [
#     {
#      "reason": "invalid",
#      "location": "Field:col2",
#      "message": "Could not convert value to integer (bad value or out of range)."
#     }
#    ]
#   },
#   {
#    "index": 0,
#    "errors": [
#     {
#      "reason": "stopped"
#     }
#    ]
#   }
#  ]
# }

正直fluent-plugin-bigqueryの問題と言うよりは、
google-api-client もしくは BigQueryのAPIの仕様の問題という気がするので、プラグイン側にIssueを立てたりするわけでもないんですが、
痕跡なくログの欠損がおきてしまうとまずいという環境で使っている方は注意したほうがいいかなと思います。

追記(2015/12/02 18:40): 案の定すでにPRがあった。 https://github.com/kaizenplatform/fluent-plugin-bigquery/pull/38