Notice
Recent Posts
Recent Comments
Link
«   2026/04   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
Tags
more
Archives
Today
Total
관리 메뉴

인턴기록지

[Druid] 400만개 데이터 ingestion (csv spec) 본문

Druid/Project

[Druid] 400만개 데이터 ingestion (csv spec)

인턴신분현경이 2020. 11. 26. 16:46

전처리가 끝난 데이터를 ewl_events_log 테이블에 넣어준다.

이 테이블의 데이터를 드루이드와 연결해서 가져오면 너무 좋지만

 

내 컴퓨터환경에서는 postgresql과 어딘가 아다리가 안맞아서 연결이 되지 않는다. 그러나 길리더님 환경에서는 올라감;

따라서 된다는 가정하에 sql문을 생각해보았다. 

일단 초기에 지금까지 모은 데이터를 druid에 넣어준다. 

매 달들어오는 데이터들을 append시켜주기 때문에 처음에 넣어준 데이터에 append해서 넣어준다.

 

위의 과정을 해주기 위해 일단 8월1일 pull한 것이라 생각하고 8월1일 00:19 분까지의 데이터들을 csv로 뽑아서 넣어줬다.

드루이드의 장점인가? 400만개 정도 ingestion하는데 1분? 안쪽으로 밖에 걸리지 않았다.

[root@localhost tutorial]# python post-index-task-main.py --file events_log-index.json
Beginning indexing data for events
<class 'str'>
<urllib.request.Request object at 0x7f2036ffb908>
Task started: index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z
Task log:     http://localhost:8081/druid/indexer/v1/task/index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z/log
Task status:  http://localhost:8081/druid/indexer/v1/task/index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z/status
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task index_parallel_events_dblinkci_2020-11-26T15:36:54.347Z still running...
Task finished with status: SUCCESS
Completed indexing data for events. Now loading indexed data onto the cluster...
events loading complete! You may now query your data

ingestion spec

{
    "type" : "index_parallel",
    "spec" : {
      "dataSchema" : {
        "dataSource" : "events",
          "timestampSpec": {
            "column": "event_time",
            "format": "auto"
          },
          "dimensionsSpec" : {
            "dimensions" : [
              "bcycl_no",
              "station_no",
              "event_type",
              "cleansing_done",
              "cleansing_result",
              "last_update_time"
            ]
          },
          "metricsSpec" : [],
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "month",
            "queryGranularity" : "minute",
            "rollup" : false
          }
      },
      "ioConfig" : {
        "type" : "index_parallel",
        "inputSource" : {
          "type" : "local",
          "baseDir" : "quickstart/tutorial/",
          "filter" : "events_log_0801.csv"
        },
        "inputFormat" : {
          "type" : "csv",
          "findColumnsFromHeader" : true
        },
        "appendToExisting" : true
      },
      "tuningConfig" : {
        "type" : "index_parallel",
        "maxRowsPerSegment" : 500000,
        "maxRowsInMemory" : 5000000
      }
    }
  }
  

 

중간에 버벅거림

더보기

(밑과 같이 저게 계속나오면서 50분걸리다가 갑자기 어느 순간부터 1분도 안걸리게 데이터가 들어갔다. 인터넷문젠강,,?)

events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...
events is 0.0% finished loading...

이제 8월 1일~ 8월 31일까지의 데이터 (새로 들어온 데이터라고 가정)을 넣어보자

이 데이터는 sql문으로 가져오기 위해서는 밑의 sql문으로 실행

select * from ewl_events_log
where event_time >= (select pull_time::timestamp - '40 minutes'::interval from ewl_pulllog order by pull_time desc limit 1)
and (event_type = 'return' or event_type = 'rental')
order by event_time asc

ewl_pulllog가 데이터파일을 언제 가져왔는지 기록해주는 테이블이니 pull_time 을 내림차순으로 하나만 뽑으면 제일 최근 가져온 데이터 파일의 pull_time을 가져온다.

이 풀타임에서 40분을 빼준이유는 대여반납 데이터는 8월1일 01:00에 가져오지만 데이터가 8월 1일 00:19분 까지의 데이터가 들어가 있다. (수리, 재배치내역은 7월 31일 23:59 까지 들어가 있다) 

 

따라서 8월 1일 00:20분 이후 부터의 대여반납 이력들은 9월 1일 01시에 pull 해올 때 기록이 되어서 들어온다.

그래서 40분을 빼준거다.

 

{
    "type" : "index_parallel",
    "spec" : {
      "dataSchema" : {
        "dataSource" : "events",
          "timestampSpec": {
            "column": "event_time",
            "format": "auto"
          },
          "dimensionsSpec" : {
            "dimensions" : [
              "bcycl_no",
              "station_no",
              "event_type",
              "cleansing_done",
              "cleansing_result",
              "last_update_time"
            ]
          },
          "metricsSpec" : [],
          "granularitySpec" : {
            "type" : "uniform",
            "segmentGranularity" : "month",
            "queryGranularity" : "minute",
            "rollup" : false
          }
      },
      "ioConfig" : {
        "type" : "index_parallel",
        "inputSource" : {
          "type" : "local",
          "baseDir" : "quickstart/tutorial/",
          "filter" : "events_log_0901.csv"
        },
        "inputFormat" : {
          "type" : "csv",
          "findColumnsFromHeader" : true
        },
        "appendToExisting" : true
      },
      "tuningConfig" : {
        "type" : "index_parallel",
        "maxRowsPerSegment" : 500000,
        "maxRowsInMemory" : 5000000
      }
    }
  }
  

위의 json으로 넣어준다. 

 

여기서 제일 주목할 부분은 appendToExistiong 이다.

저게 false이면 overwrite가 되서 초기데이터가 없어지고 새로 들어온 데이터들이 덮어진다.

 

따라서 true로 설정해서 데이터가 추가되도록 해준다. 

python post-index-task-main.py --file events_log-index.json
위의 명령어에서 --file을 생략하고 싶어서 post-index-task-main.py를 수정해줬다.

def main():
  parser = argparse.ArgumentParser(description='Post Druid indexing tasks.')
  parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8081/', help='Druid Overlord url')
  parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url')
  # parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file')
  # 위와 같이 required 조건이 있는데 디폴트 값으로 ingestion .json을 넣어주었다.
  parser.add_argument('--file', '-f', type=str, default='events_test-index.json', help='Query JSON file')
  parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks')
  parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks')
  parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load')
  parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors')
  parser.add_argument('--user', type=str, default=None, help='Basic auth username')
  parser.add_argument('--password', type=str, default=None, help='Basic auth password')
  args = parser.parse_args()

 

[root@localhost tutorial]# python post-index-task-main.py
Beginning indexing data for events
<class 'str'>
<urllib.request.Request object at 0x7fa30dcdf898>
Task started: index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z
Task log:     http://localhost:8081/druid/indexer/v1/task/index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z/log
Task status:  http://localhost:8081/druid/indexer/v1/task/index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z/status
Task index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z still running...
Task index_parallel_events_lboljdma_2020-11-26T16:33:52.880Z still running...
Task finished with status: SUCCESS
Completed indexing data for events. Now loading indexed data onto the cluster...
events loading complete! You may now query your data

실행!

10만개 정도의 데이터인데 7초 만에 완료;;;;;;;;;;;;;;;