인턴기록지
[Druid] 400만개 데이터 ingestion (csv spec) 본문
전처리가 끝난 데이터를 ewl_events_log 테이블에 넣어준다.
이 테이블의 데이터를 드루이드와 연결해서 가져오면 너무 좋지만
내 컴퓨터환경에서는 postgresql과 어딘가 아다리가 안맞아서 연결이 되지 않는다. 그러나 길리더님 환경에서는 올라감;
따라서 된다는 가정하에 sql문을 생각해보았다.
일단 초기에 지금까지 모은 데이터를 druid에 넣어준다.
매 달들어오는 데이터들을 append시켜주기 때문에 처음에 넣어준 데이터에 append해서 넣어준다.
위의 과정을 해주기 위해 일단 8월1일 pull한 것이라 생각하고 8월1일 00:19 분까지의 데이터들을 csv로 뽑아서 넣어줬다.
드루이드의 장점인가? 400만개 정도 ingestion하는데 1분? 안쪽으로 밖에 걸리지 않았다.
[root@localhost tutorial]# python post-index-task-main.py --file events_log-index.json
Beginning indexing data for events
<class 'str'>
<urllib.request.Request object at 0x7f2036ffb908>
Task started: index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z
Task log: http://localhost:8081/druid/indexer/v1/task/index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z/log
Task status: http://localhost:8081/druid/indexer/v1/task/index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z/status
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task finished with status: SUCCESS
Completed indexing data for events. Now loading indexed data onto the cluster...
events loading complete! You may now query your data
ingestion spec
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "events",
"timestampSpec": {
"column": "event_time",
"format": "auto"
},
"dimensionsSpec" : {
"dimensions" : [
"bcycl_no",
"station_no",
"event_type",
"cleansing_done",
"cleansing_result",
"last_update_time"
]
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "month",
"queryGranularity" : "minute",
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "events_log_0801.csv"
},
"inputFormat" : {
"type" : "csv",
"findColumnsFromHeader" : true
},
"appendToExisting" : true
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 500000,
"maxRowsInMemory" : 5000000
}
}
}
중간에 버벅거림
(밑과 같이 저게 계속나오면서 50분걸리다가 갑자기 어느 순간부터 1분도 안걸리게 데이터가 들어갔다. 인터넷문젠강,,?)
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
이제 8월 1일~ 8월 31일까지의 데이터 (새로 들어온 데이터라고 가정)을 넣어보자
이 데이터는 sql문으로 가져오기 위해서는 밑의 sql문으로 실행
select * from ewl_events_log
where event_time >= (select pull_time::timestamp - '40 minutes'::interval from ewl_pulllog order by pull_time desc limit 1)
and (event_type = 'return' or event_type = 'rental')
order by event_time asc
ewl_pulllog가 데이터파일을 언제 가져왔는지 기록해주는 테이블이니 pull_time 을 내림차순으로 하나만 뽑으면 제일 최근 가져온 데이터 파일의 pull_time을 가져온다.
이 풀타임에서 40분을 빼준이유는 대여반납 데이터는 8월1일 01:00에 가져오지만 데이터가 8월 1일 00:19분 까지의 데이터가 들어가 있다. (수리, 재배치내역은 7월 31일 23:59 까지 들어가 있다)
따라서 8월 1일 00:20분 이후 부터의 대여반납 이력들은 9월 1일 01시에 pull 해올 때 기록이 되어서 들어온다.
그래서 40분을 빼준거다.
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "events",
"timestampSpec": {
"column": "event_time",
"format": "auto"
},
"dimensionsSpec" : {
"dimensions" : [
"bcycl_no",
"station_no",
"event_type",
"cleansing_done",
"cleansing_result",
"last_update_time"
]
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "month",
"queryGranularity" : "minute",
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "events_log_0901.csv"
},
"inputFormat" : {
"type" : "csv",
"findColumnsFromHeader" : true
},
"appendToExisting" : true
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 500000,
"maxRowsInMemory" : 5000000
}
}
}
위의 json으로 넣어준다.
여기서 제일 주목할 부분은 appendToExistiong 이다.
저게 false이면 overwrite가 되서 초기데이터가 없어지고 새로 들어온 데이터들이 덮어진다.
따라서 true로 설정해서 데이터가 추가되도록 해준다.
python post-index-task-main.py --file events_log-index.json
위의 명령어에서 --file을 생략하고 싶어서 post-index-task-main.py를 수정해줬다.
def main():
parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8081/', help='Druid Overlord url')
parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
# parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
# 위와 같이 required 조건이 있는데 디폴트 값으로 ingestion .json을 넣어주었다.
parser.add_argument('--file', '-f', type=str, default='events_test-index.json', help='Query JSON file')
parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
parser.add_argument('--user', type=str, default=None, help='Basic auth username')
parser.add_argument('--password', type=str, default=None, help='Basic auth password')
args = parser.parse_args()
[root@localhost tutorial]# python post-index-task-main.py
Beginning indexing data for events
<class 'str'>
<urllib.request.Request object at 0x7fa30dcdf898>
Task started: index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z
Task log: http://localhost:8081/druid/indexer/v1/task/index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z/log
Task status: http://localhost:8081/druid/indexer/v1/task/index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z/status
Task index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z still running...
Task index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z still running...
Task finished with status: SUCCESS
Completed indexing data for events. Now loading indexed data onto the cluster...
events loading complete! You may now query your data
실행!
10만개 정도의 데이터인데 7초 만에 완료;;;;;;;;;;;;;;;
'Druid > Project' 카테고리의 다른 글
| [Druid] SQL ingestion spec - postgresql (postgresql database data) on docker (0) | 2020.11.26 |
|---|---|
| [Druid] post-index-task-main.py 수정 (최종본!) (0) | 2020.11.26 |
| [Druid] ingestion spec for csv file (csv 파일로 ingestion) (0) | 2020.11.26 |
| (인터넷연결없이) jdk8 & druid install (0) | 2020.09.24 |
| python2 code convert to python3 (0) | 2020.09.21 |