Notice
Recent Posts
Recent Comments
Link
«   2026/04   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
Tags
more
Archives
Today
Total
관리 메뉴

인턴기록지

[Druid] post-index-task-main 코드 분석 본문

Druid/Project

[Druid] post-index-task-main 코드 분석

인턴신분현경이 2020. 9. 21. 13:23
bin/post-index-task --file quickstart/tutorial/compact.json --url http://localhost:8081

위의 명령어를 수행하는 post - index - task 파일을 수행하는 main파일을 분석해보았다.

 

 

메인함수

def main():
  parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
  parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url')
  parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
  parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
  parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
  parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
  parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
  parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
  parser.add_argument('--user', type=str, default=None, help='Basic auth username')
  parser.add_argument('--password', type=str, default=None, help='Basic auth password')
 
  #입력받은 인자값을 args에 넣음
  args = parser.parse_args()

  submit_timeout_at = time.time() + args.submit_timeout
  complete_timeout_at = time.time() + args.complete_timeout

  task_contents = read_task_file(args)
  task_json = json.loads(task_contents)
  if task_json['type'] == "compact":
    datasource = task_json['dataSource']
  else:
    datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
  sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))

  task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]

  sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id))
  sys.stderr.write('\033[1m' + "Task log:     " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
  sys.stderr.write('\033[1m' + "Task status:  " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id))

  task_status = await_task_completion(args, task_id, complete_timeout_at)
  sys.stderr.write("Task finished with status: {0}\n".format(task_status))
  if task_status != 'SUCCESS':
    sys.exit(1)

  sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource))
  load_timeout_at = time.time() + args.load_timeout
  await_load_completion(args, datasource, load_timeout_at)

parser 사용

호출 당시 인자 값을 줘서 동작을 다르게 하고 싶을 때 import argparser 모듈을 활용한다.

 parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')

ArgumentParser()

인자값을 받을 수 있는 인스턴스 생성

description - 인자 도움말 전에 표시할 텍스트

 

  parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url')
  parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
  parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
  parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
  parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
  parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
  parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
  parser.add_argument('--user', type=str, default=None, help='Basic auth username')
  parser.add_argument('--password', type=str, default=None, help='Basic auth password')

add_argument()

입력받을 인자값 등록

name or flags - 옵션 문자열의 이름이나 리스트 (--url, -u)

action - 명령행에서 이 인자가 발견되 떼 수행 할 액션의 기본형

nargs - 소비되어야 하는 명령행 인자 수

default -  인자가 명령 행에 없는 경우 생성되는 값

type - 명령행 인자가 변환되어야 할 형

required - 명령행 옵션을 생략 할 수 있는지 아닌지

help - 인자가 하는 일에 대한 설명

metavar - 사용메시지에 사용되는 인자의 이름

 


입력받은 인자 값을 read_task_file함수의 인자 값으로 넘겨준다.

def read_task_file(args):
  with open(args.file, 'r') as f:
    contents = f.read()
    # We don't use the parsed data, but we want to throw early if it's invalid
    try:
      json.loads(contents)
    except Exception:
      sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(Exception)))
      sys.exit(1)
    return contents

인자 값에 넣어준 파일을 읽어와서

json.load 함수를 이용해 json문자열인 contents를  python객체로 변환해 주고 리턴해준다.

 


json 파일을 인젝션 해줄 때 데이터 형식 json파일을 살펴보면

"type" : "index_parallel",
"spec" : {
        "dataSchema" : {
            "dataSource" : "100_history_made",
            "timestampSpec" : {
                "column" : "rental_date",
                "format" : "iso"
            },

인젝션 할 때 타입과

인젝션 한 후 이름을 무엇으로 할 것인지 적어 놓은 부분이 있다. (datasource)

 

밑의 조건문에서 위 두개를 활용해 구분을 한다.

  if task_json['type'] == "compact":
    datasource = task_json['dataSource']
  else:
    datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]

type이 compact와 index_parallel , index_hadoop, kill 등이 있다.

compact는 세그먼트들을 통합하는 타입이고 index_parallel은 일괄처리 타입이다.

 

따라서 조건문에서 type이 compact 일때에는 dataSource부분만 넣어주고 

그렇지 않을 때에는 spec->dataSchema->dataSource 부분을 넣어준다.

 


인젝션 할 때 task id를 생성해주는 함수 post_task를 살펴보자

 task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]
# Keep trying until timeout_at, maybe die then
def post_task(args, task_json, timeout_at):
  try:
  # args에 받은 주소에 /druid/indexer/v1/task 붙여서 url에 넘겨줌
    url = args.url.rstrip("/") + "/druid/indexer/v1/task"
  # task_json이 string이라 Request 클래스에서는 string 타입이 안되고 bytes or request 객체 값만 가능
  # 따라서 utf-8로 인코딩을 해주고 넣어준다.
  # Request는 서버로 요청하는 함수이다
  # urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)
    task_json = task_json.encode('utf-8')
    req = urllib.request.Request(url, task_json, {'Content-Type' : 'application/json'})
    print(req)
  # 객체 값 0x7f7c9dee3be0
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
  # urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
  # 서버로 요청한 객체 값 req을 urlopen 클래스로 넘겨준다 웹서버로 부터 받은 값을 reponse에 넣어준다
    response = urllib.request.urlopen(req, None, response_timeout)
    print(response)
  # 객체 값 0x7f7c9e75ecf8
    return response.read().rstrip()
  except urllib.error.URLError as e:
    if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <= 500:
      # 4xx (problem with the request) or 500 (something wrong on the server)
      raise_friendly_error(e)
    elif time.time() >= timeout_at:
      # No futher retries
      raise_friendly_error(e)
    elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303, 305, 307] and \
        e.info().getheader("Location") is not None:
      # Set the new location in args.url so it can be used by await_task_completion and re-issue the request
      location = urlparse.urlparse(e.info().getheader("Location"))
      args.url = "{0}://{1}".format(location.scheme, location.netloc)
      sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url))
      return post_task(args, task_json, timeout_at)
    else:
      # If at first you don't succeed, try, try again!
      sleep_time = 5
      if not args.quiet:
        extra = ''
        if hasattr(e, 'read'):
          extra = e.read().rstrip()
        sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip())
        sys.stderr.write("\n")
      time.sleep(sleep_time)
      return post_task(args, task_json, timeout_at)

 

task_json = task_json.encode('utf-8') 중요! 이걸 추가하지 않아서 매우 시간을 많이 잡아먹었스 ㅜㅠ

 

  task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]

메인함수에서 response값을 읽어와 task란 값을 task_id에 넣어준다

 

  sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id))
  sys.stderr.write('\033[1m' + "Task log:     " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
  sys.stderr.write('\033[1m' + "Task status:  " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id))
  
  # 결과화면
  [root@localhost apache-druid-0.19.0]# python3 quickstart/tutorial/post-index-task-main.py  --file quickstart/tutorial/100-index.json
Beginning indexing data for test
<urllib.request.Request object at 0x7f7c9dee3be0>
<http.client.HTTPResponse object at 0x7f7c9e75ecf8>
Task started: index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z
Task log:     http://localhost:8081/druid/indexer/v1/task/index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z/log
Task status:  http://localhost:8081/druid/indexer/v1/task/index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z/status

task가 ingestion 성공하게 만드는 함수, 결과 출력 

# Keep trying until timeout_at, maybe die then
def await_task_completion(args, task_id, timeout_at):
  while True:
    url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id)
    req = urllib.request.Request(url)
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
    response = urllib.request.urlopen(req, None, response_timeout)
    response_obj = json.loads(response.read())
 # 밑의 response_obj코드를 보면 status , statuscode가 있다. success or failed 둘 중 하나이면 리턴
    response_status_code = response_obj["status"]["statusCode"]
    if response_status_code in ['SUCCESS', 'FAILED']:
      return response_status_code
    else:
      if time.time() < timeout_at:
        if not args.quiet:
          sys.stderr.write("Task {0} still running...\n".format(task_id))
        timeleft = timeout_at - time.time()
        time.sleep(min(5, timeleft))
      else:
        raise Exception("Task {0} did not finish in time!".format(task_id))

 

response_obj 는druid 의 ingestion tasks에서 확인 할 수 있다.

{
  "id": "index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z",
  "groupId": "index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z",
  "type": "index_parallel",
  "createdTime": "2020-09-23T00:30:05.797Z",
  "queueInsertionTime": "1970-01-01T00:00:00.000Z",
  "statusCode": "SUCCESS",
  "status": "SUCCESS",
  "runnerStatusCode": "WAITING",
  "duration": 4432,
  "location": {
    "host": "localhost",
    "port": 8100,
    "tlsPort": -1
  },
  "dataSource": "test",
  "errorMsg": null
}

 

# success or failed 가 task_status에 들어감
  task_status = await_task_completion(args, task_id, complete_timeout_at)
  sys.stderr.write("Task finished with status: {0}\n".format(task_status))
  if task_status != 'SUCCESS':
    sys.exit(1)
    
 # 결과화면
 Task finished with status: SUCCESS

 


data를 쿼리문으로 사용할 수 있게  indexed해준 데이터를 클러스터에 올리는 함수

def await_load_completion(args, datasource, timeout_at):
  while True:
    url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus"
    req = urllib.request.Request(url)
    add_basic_auth_header(args, req)
    timeleft = timeout_at - time.time()
    response_timeout = min(max(timeleft, 5), 10)
    response = urllib.request.urlopen(req, None, response_timeout)
    response_obj = json.loads(response.read())
    load_status = response_obj.get(datasource, 0.0)
 # 진행 퍼센트를 보여줌
    if load_status >= 100.0:
      sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource))
      return
    else:
      if time.time() < timeout_at:
        if not args.quiet:
          sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status))
        timeleft = timeout_at - time.time()
        time.sleep(min(5, timeleft))
      else:
        raise Exception("{0} was not loaded in time!".format(datasource))

 

 

 


참고

brownbears.tistory.com/413

 

[Python] argparse 사용법 (파이썬 인자값 추가하기)

파이썬 스크립트를 개발할 때, 호출 당시 인자값을 줘서 동작을 다르게 하고 싶은 경우가 있습니다. 이때, 파이썬 내장함수인 argparse 모듈을 사용하여 원하는 기능을 개발할 수 있습니다. 아래 ��

brownbears.tistory.com

www.daleseo.com/python-json/

 

[파이썬] json 모듈로 JSON 데이터 다루기

Engineering Blog by Dale Seo

www.daleseo.com