인턴기록지
[Druid] post-index-task-main 코드 분석 본문
bin/post-index-task --file quickstart/tutorial/compact.json --url http://localhost:8081
위의 명령어를 수행하는 post - index - task 파일을 수행하는 main파일을 분석해보았다.
메인함수
def main():
parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url')
parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
parser.add_argument('--user', type=str, default=None, help='Basic auth username')
parser.add_argument('--password', type=str, default=None, help='Basic auth password')
#입력받은 인자값을 args에 넣음
args = parser.parse_args()
submit_timeout_at = time.time() + args.submit_timeout
complete_timeout_at = time.time() + args.complete_timeout
task_contents = read_task_file(args)
task_json = json.loads(task_contents)
if task_json['type'] == "compact":
datasource = task_json['dataSource']
else:
datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
sys.stderr.write("Beginning indexing data for {0}\n".format(datasource))
task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]
sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id))
sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
sys.stderr.write('\033[1m' + "Task status: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id))
task_status = await_task_completion(args, task_id, complete_timeout_at)
sys.stderr.write("Task finished with status: {0}\n".format(task_status))
if task_status != 'SUCCESS':
sys.exit(1)
sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource))
load_timeout_at = time.time() + args.load_timeout
await_load_completion(args, datasource, load_timeout_at)
parser 사용
호출 당시 인자 값을 줘서 동작을 다르게 하고 싶을 때 import argparser 모듈을 활용한다.
parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
ArgumentParser()
인자값을 받을 수 있는 인스턴스 생성
description - 인자 도움말 전에 표시할 텍스트
parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url')
parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
parser.add_argument('--user', type=str, default=None, help='Basic auth username')
parser.add_argument('--password', type=str, default=None, help='Basic auth password')
add_argument()
입력받을 인자값 등록
name or flags - 옵션 문자열의 이름이나 리스트 (--url, -u)
action - 명령행에서 이 인자가 발견되 떼 수행 할 액션의 기본형
nargs - 소비되어야 하는 명령행 인자 수
default - 인자가 명령 행에 없는 경우 생성되는 값
type - 명령행 인자가 변환되어야 할 형
required - 명령행 옵션을 생략 할 수 있는지 아닌지
help - 인자가 하는 일에 대한 설명
metavar - 사용메시지에 사용되는 인자의 이름
입력받은 인자 값을 read_task_file함수의 인자 값으로 넘겨준다.
def read_task_file(args):
with open(args.file, 'r') as f:
contents = f.read()
# We don't use the parsed data, but we want to throw early if it's invalid
try:
json.loads(contents)
except Exception:
sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(Exception)))
sys.exit(1)
return contents
인자 값에 넣어준 파일을 읽어와서
json.load 함수를 이용해 json문자열인 contents를 python객체로 변환해 주고 리턴해준다.
json 파일을 인젝션 해줄 때 데이터 형식 json파일을 살펴보면
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "100_history_made",
"timestampSpec" : {
"column" : "rental_date",
"format" : "iso"
},
인젝션 할 때 타입과
인젝션 한 후 이름을 무엇으로 할 것인지 적어 놓은 부분이 있다. (datasource)
밑의 조건문에서 위 두개를 활용해 구분을 한다.
if task_json['type'] == "compact":
datasource = task_json['dataSource']
else:
datasource = json.loads(task_contents)["spec"]["dataSchema"]["dataSource"]
type이 compact와 index_parallel , index_hadoop, kill 등이 있다.
compact는 세그먼트들을 통합하는 타입이고 index_parallel은 일괄처리 타입이다.
따라서 조건문에서 type이 compact 일때에는 dataSource부분만 넣어주고
그렇지 않을 때에는 spec->dataSchema->dataSource 부분을 넣어준다.
인젝션 할 때 task id를 생성해주는 함수 post_task를 살펴보자
task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]
# Keep trying until timeout_at, maybe die then
def post_task(args, task_json, timeout_at):
try:
# args에 받은 주소에 /druid/indexer/v1/task 붙여서 url에 넘겨줌
url = args.url.rstrip("/") + "/druid/indexer/v1/task"
# task_json이 string이라 Request 클래스에서는 string 타입이 안되고 bytes or request 객체 값만 가능
# 따라서 utf-8로 인코딩을 해주고 넣어준다.
# Request는 서버로 요청하는 함수이다
# urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)
task_json = task_json.encode('utf-8')
req = urllib.request.Request(url, task_json, {'Content-Type' : 'application/json'})
print(req)
# 객체 값 0x7f7c9dee3be0
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
# urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
# 서버로 요청한 객체 값 req을 urlopen 클래스로 넘겨준다 웹서버로 부터 받은 값을 reponse에 넣어준다
response = urllib.request.urlopen(req, None, response_timeout)
print(response)
# 객체 값 0x7f7c9e75ecf8
return response.read().rstrip()
except urllib.error.URLError as e:
if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <= 500:
# 4xx (problem with the request) or 500 (something wrong on the server)
raise_friendly_error(e)
elif time.time() >= timeout_at:
# No futher retries
raise_friendly_error(e)
elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303, 305, 307] and \
e.info().getheader("Location") is not None:
# Set the new location in args.url so it can be used by await_task_completion and re-issue the request
location = urlparse.urlparse(e.info().getheader("Location"))
args.url = "{0}://{1}".format(location.scheme, location.netloc)
sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url))
return post_task(args, task_json, timeout_at)
else:
# If at first you don't succeed, try, try again!
sleep_time = 5
if not args.quiet:
extra = ''
if hasattr(e, 'read'):
extra = e.read().rstrip()
sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip())
sys.stderr.write("\n")
time.sleep(sleep_time)
return post_task(args, task_json, timeout_at)
task_json = task_json.encode('utf-8') 중요! 이걸 추가하지 않아서 매우 시간을 많이 잡아먹었스 ㅜㅠ
task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"]
메인함수에서 response값을 읽어와 task란 값을 task_id에 넣어준다
sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id))
sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id))
sys.stderr.write('\033[1m' + "Task status: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id))
# 결과화면
[root@localhost apache-druid-0.19.0]# python3 quickstart/tutorial/post-index-task-main.py --file quickstart/tutorial/100-index.json
Beginning indexing data for test
<urllib.request.Request object at 0x7f7c9dee3be0>
<http.client.HTTPResponse object at 0x7f7c9e75ecf8>
Task started: index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z
Task log: http://localhost:8081/druid/indexer/v1/task/index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z/log
Task status: http://localhost:8081/druid/indexer/v1/task/index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z/status
task가 ingestion 성공하게 만드는 함수, 결과 출력
# Keep trying until timeout_at, maybe die then
def await_task_completion(args, task_id, timeout_at):
while True:
url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id)
req = urllib.request.Request(url)
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
response = urllib.request.urlopen(req, None, response_timeout)
response_obj = json.loads(response.read())
# 밑의 response_obj코드를 보면 status , statuscode가 있다. success or failed 둘 중 하나이면 리턴
response_status_code = response_obj["status"]["statusCode"]
if response_status_code in ['SUCCESS', 'FAILED']:
return response_status_code
else:
if time.time() < timeout_at:
if not args.quiet:
sys.stderr.write("Task {0} still running...\n".format(task_id))
timeleft = timeout_at - time.time()
time.sleep(min(5, timeleft))
else:
raise Exception("Task {0} did not finish in time!".format(task_id))
response_obj 는druid 의 ingestion tasks에서 확인 할 수 있다.
{
"id": "index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z",
"groupId": "index_parallel_test_naofhhea_2020-09-23T00:30:05.795Z",
"type": "index_parallel",
"createdTime": "2020-09-23T00:30:05.797Z",
"queueInsertionTime": "1970-01-01T00:00:00.000Z",
"statusCode": "SUCCESS",
"status": "SUCCESS",
"runnerStatusCode": "WAITING",
"duration": 4432,
"location": {
"host": "localhost",
"port": 8100,
"tlsPort": -1
},
"dataSource": "test",
"errorMsg": null
}
# success or failed 가 task_status에 들어감
task_status = await_task_completion(args, task_id, complete_timeout_at)
sys.stderr.write("Task finished with status: {0}\n".format(task_status))
if task_status != 'SUCCESS':
sys.exit(1)
# 결과화면
Task finished with status: SUCCESS
data를 쿼리문으로 사용할 수 있게 indexed해준 데이터를 클러스터에 올리는 함수
def await_load_completion(args, datasource, timeout_at):
while True:
url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus"
req = urllib.request.Request(url)
add_basic_auth_header(args, req)
timeleft = timeout_at - time.time()
response_timeout = min(max(timeleft, 5), 10)
response = urllib.request.urlopen(req, None, response_timeout)
response_obj = json.loads(response.read())
load_status = response_obj.get(datasource, 0.0)
# 진행 퍼센트를 보여줌
if load_status >= 100.0:
sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource))
return
else:
if time.time() < timeout_at:
if not args.quiet:
sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status))
timeleft = timeout_at - time.time()
time.sleep(min(5, timeleft))
else:
raise Exception("{0} was not loaded in time!".format(datasource))
참고
[Python] argparse 사용법 (파이썬 인자값 추가하기)
파이썬 스크립트를 개발할 때, 호출 당시 인자값을 줘서 동작을 다르게 하고 싶은 경우가 있습니다. 이때, 파이썬 내장함수인 argparse 모듈을 사용하여 원하는 기능을 개발할 수 있습니다. 아래 ��
brownbears.tistory.com
[파이썬] json 모듈로 JSON 데이터 다루기
Engineering Blog by Dale Seo
www.daleseo.com
'Druid > Project' 카테고리의 다른 글
| (인터넷연결없이) jdk8 & druid install (0) | 2020.09.24 |
|---|---|
| python2 code convert to python3 (0) | 2020.09.21 |
| [Druid] 이미 있는 Datasource에 데이터 추가하기 (0) | 2020.09.18 |
| 데이터 파일 python으로 드루이드에 자동으로 올리기 (0) | 2020.09.18 |
| [Druid] Writing an ingestion spec - rollup이 된 data (0) | 2020.09.17 |