Notice
Recent Posts
Recent Comments
Link
«   2026/04   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
Tags
more
Archives
Today
Total
관리 메뉴

인턴기록지

[Druid] post-index-task-main.py 수정 (최종본!) 본문

Druid/Project

[Druid] post-index-task-main.py 수정 (최종본!)

인턴신분현경이 2020. 11. 26. 17:01

드루이드에 ingestion을 하는 명령어

/bin/post-index-task --file test-index.json --url http://localhost:8081

 

위의 post-index-task는 밑처럼 되어있다.

#!/bin/bash -eu

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

PWD="$(pwd)"
WHEREAMI="$(dirname "$0")"
WHEREAMI="$(cd "$WHEREAMI" && pwd)"

if [ -x "$(command -v python2)" ]
then
  exec python2 "$WHEREAMI/post-index-task-main" "$@"
else
  exec "$WHEREAMI/post-index-task-main" "$@"
fi

 

파이썬 버전에 따라 post-index-task-main을 실행한다. 따라서 실질적으로 드루이드와 연결된 부분은

post-index-task-main.py이다.

 

초기 post-index-task-main.py 는 파이썬2 버전이라서 

우리 디지털트윈 프로젝트는 3버전을 사용해서 3버전으로 바꿔주는 과정을 거쳤다. 해당 과정은 밑의 링크로 걸어 두겠다.

 

왜 /bin/post-index-task 를 안쓰고 굳이 python ~.py로 하냐면

매달 또는 매주 또는 매일 프로그램이 자동으로 돌아가서 데이터를 세종시 서버에서 가져오고 전처리 과정을 거쳐 나온 데이터를 postgresql과 druid에 넣어주는 자동화프로그램을 만들기 때문에 python ~.py로 돌리는 것으로 하면 함수로 불러서 쫙좍~ 해주면 되니까!!!!!

 

최종적으로 완성된 post-index-task-main.py

메인함수 안의 --file 부분의 default 부분을 실행해줄 json파일 이름을 넣어주면 된다.

#!/usr/bin/python3
import argparse
import base64
import json
import re
import sys
import time
import urllib.request, urllib.parse, urllib.error
from urllib.parse import urlparse

def read_task_file(args):
  with open(args.file, 'r') as f:
    contents = f.read()
    # We don't use the parsed data, but we want to throw early if it's invalid
    try:
      json.loads(contents)
    except Exception:
      sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(Exception)))
      sys.exit(1)
    return contents

def add_basic_auth_header(args, req):
  if (args.user is not None):
    basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password))
    req.add_header("Authorization", "Basic %s" % basic_auth_encoded)

# Keep trying until timeout_at, maybe die then
def post_task(args, task_json, timeout_at):
  print(type(args.url))
  try:
    url = args.url.rstrip("/") + "/druid/indexer/v1/task"
    # task_json = json.loads(task_json)
    # task_json = urllib.parse.urlencode(task_json)
    task_json = task_json.encode('utf-8')
    req = urllib.request.Request(url, task_json, {'Content-Type' : 'application/json'})
    print(req)
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
    response = urllib.request.urlopen(req, None, response_timeout)
    return response.read().rstrip()
  except urllib.error.URLError as e:
    if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <= 500:
      # 4xx (problem with the request) or 500 (something wrong on the server)
      raise_friendly_error(e)
    elif time.time() >= timeout_at:
      # No futher retries
      raise_friendly_error(e)
    elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303, 305, 307] and \
        e.info().getheader("Location") is not None:
      # Set the new location in args.url so it can be used by await_task_completion and re-issue the request
      location = urlparse.urlparse(e.info().getheader("Location"))
      args.url = "{0}://{1}".format(location.scheme, location.netloc)
      sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url))
      return post_task(args, task_json, timeout_at)
    else:
      # If at first you don't succeed, try, try again!
      sleep_time = 5
      if not args.quiet:
        extra = ''
        if hasattr(e, 'read'):
          extra = e.read().rstrip()
        sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip())
        sys.stderr.write("\n")
      time.sleep(sleep_time)
      return post_task(args, task_json, timeout_at)

# Keep trying until timeout_at, maybe die then
def await_task_completion(args, task_id, timeout_at):
  while True:
    url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id)
    req = urllib.request.Request(url)
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
    response = urllib.request.urlopen(req, None, response_timeout)
    response_obj = json.loads(response.read())
    response_status_code = response_obj["status"]["statusCode"]
    if response_status_code in ['SUCCESS', 'FAILED']:
      return response_status_code
    else:
      if time.time() < timeout_at:
        if not args.quiet:
          sys.stderr.write("Task {0} still running...\n".format(task_id))
        timeleft = timeout_at - time.time()
        time.sleep(min(5, timeleft))
      else:
        raise Exception("Task {0} did not finish in time!".format(task_id))

def raise_friendly_error(e):
  if isinstance(e, urllib.error.HTTPError):
    text = e.read().strip()
    reresult = re.search(r'<pre>(.*?)</pre>', text, re.DOTALL)
    if reresult:
      text = reresult.group(1).strip()
    raise Exception("HTTP Error {0}: {1}, check overlord log for more details.\n{2}".format(e.code, e.reason, text))
  raise e

def await_load_completion(args, datasource, timeout_at):
  while True:
    url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus"
    req = urllib.request.Request(url)
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
    response = urllib.request.urlopen(req, None, response_timeout)
    response_obj = json.loads(response.read())
    load_status = response_obj.get(datasource, 0.0)
    if load_status >= 100.0:
      sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource))
      return
    else:
      if time.time() < timeout_at:
        if not args.quiet:
          sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status))
        timeleft = timeout_at - time.time()
        time.sleep(min(5, timeleft))
      else:
        raise Exception("{0} was not loaded in time!".format(datasource))

def main():
  parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
  parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8081/', help='Druid Overlord url')
  parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
  parser.add_argument('--file', '-f', type=str, default='~~~~~~.json', help='Query JSON file')
  parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
  parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
  parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
  parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
  parser.add_argument('--user', type=str, default=None, help='Basic auth username')
  parser.add_argument('--password', type=str, default=None, help='Basic auth password')
  args = parser.parse_args()

  submit_timeout_at = time.time() + args.submit_timeout
  complete_timeout_at = time.time() + args.complete_timeout

  task_contents = read_task_file(args)
  task_json = json.loads(task_contents)
  if task_json['type'] == "compact":
    datasource = task_json['dataSource']
  else:
    datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
  sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))

  task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]

  sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id))
  sys.stderr.write('\033[1m' + "Task log:     " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
  sys.stderr.write('\033[1m' + "Task status:  " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id))

  task_status = await_task_completion(args, task_id, complete_timeout_at)
  sys.stderr.write("Task finished with status: {0}\n".format(task_status))
  if task_status != 'SUCCESS':
    sys.exit(1)

  sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource))
  load_timeout_at = time.time() + args.load_timeout
  await_load_completion(args, datasource, load_timeout_at)

try:
  main()
except KeyboardInterrupt:
  sys.exit(1)

 

실행~!~! 잘되는 것 확인!

python post-index-task-main.py

hk-intern2.tistory.com/27

 

post-index-task-main 코드 분석

bin/post-index-task --file quickstart/tutorial/compact.json --url http://localhost:8081 위의 명령어를 수행하는 post - index - task 파일을 수행하는 main파일을 분석해보았다. 메인함수 def main(): parse..

hk-intern2.tistory.com

hk-intern2.tistory.com/30

 

python2 code convert to python3

2to3 를 이용하면 python2코드에서 고쳐야 할 부분을 알려준다고 한다! 일단 저것을 이용할려면 yum install python-tools which 2to3 2to3 quickstart/tutorial/post-index-task-main.py --- quickstart/tutorial..

hk-intern2.tistory.com