인턴기록지
dt_ewl_update_historicals.py 코드 설명 본문
update_historical()
pull_time을 기준으로 이벤트 시간을 비교해 새로 추가된 new데이터들을 뽑아와 DB에 new데이터를 넣어주는 전체적인 과정을 거치는 main함수 격이다.
# main function of updating new data
def update_historical():
""" 디렉토리의 파일들 중 대여반납, 수리, 재배치 내역 파일인 경우
이력 데이터를 update하는 함수 들을 돌린다.
함수호출 순서
dt_ewl_history_load.get_pulltime()
-> dt_ewl_history_load.load_new_data()
-> load_data()
"""
params_database = dt_ewl_config.config(filename='database.ini',
section='database')
params_dir = dt_ewl_config.config(filename='database.ini',
section='file')
file_path = str(params_dir["folder"])
# Sort files in descending order of modification date
files = os.listdir(file_path)
for i in range(len(files)):
for j in range(len(files)):
if datetime.datetime.fromtimestamp(
os.path.getmtime(file_path+files[i])) > \
datetime.datetime.fromtimestamp(
os.path.getmtime(file_path+files[j])):
(files[i], files[j]) = (files[j], files[i])
# The 3 most recently modified txt files (repair, replacing, rental_return)
txt_file = []
for data_file in files:
if data_file[-3:] == 'txt':
txt_file.append(data_file)
print("txt file\n", txt_file)
# Execute function by reading files in directory
# 제일 최근 수정된 파일 중 txt파일만 모아 3개만 반복문 돌림 -> repair, replacing, rental
# master data까지 같은 폴더에 들어간다면 txt_file[0:3] -> txt_file[0:5]로 바꿔줘야 함
for filename in txt_file[0:3]:
if filename.find('ewl_rental_history') >= 0 :
logger.debug("filename is {}".format(filename))
# get pulltime of rental_history
time_value = dt_ewl_history_load.get_pulltime(params_database,
filename)
# get new data of rental_history
update_rows = dt_ewl_history_load.load_new_rental_data(params_dir,
filename,
time_value)
# insert new data into DB
load_rental_data(params_database,
update_rows)
elif filename.find('ewl_replacing') >= 0 :
logger.debug("filename is {}".format(filename))
# get pulltime of replacing_history
time_value = dt_ewl_history_load.get_pulltime(params_database,
filename)
# get new data of replacing_history
update_rows = dt_ewl_history_load.load_new_replacing_data(params_dir,
filename,
time_value)
# insert new data into DB
load_replacing_data(params_database,
update_rows)
elif filename.find('ewl_repair') >= 0 :
logger.debug("filename is {}".format(filename))
# get pulltime of repair_history
time_value = dt_ewl_history_load.get_pulltime(params_database,
filename)
# get new data of repair_history
update_rows = dt_ewl_history_load.load_new_repair_data(params_dir,
filename,
time_value)
# insert new data into DB
load_repair_data(params_database, update_rows)

load_repair_data()
새로 추가된 수리내역데이터를 수리하기위해 빼간 내역과 수리 후 집어넣은 내역 총 2개로 나눠서 DB에 넣어주기 위한 과정을 거치는 함수이다.
# Create a list for td_vw_breakdown table schema
def load_repair_data(database, tuple_rows):
""" 새로 추가된 수리내역 데이터를 repair_in(뺀 이력), repair_out(넣은 이력) 으로 나눠서
해당 데이터 리스트를 insert_tuple함수로 보낸다.
:param database: Postgresql info
:param tuple_rows: 새로 추가된 수리내역 데이터 List
"""
# remove header
tuple_rows.pop(0)
# set table_name
table_name = 'td_vw_breakdown_info'
logger.debug("table_name is {}".format(table_name))
try:
# connect DB
conn = psycopg2.connect(**database)
cursor = conn.cursor()
num_rows = len(tuple_rows)
# set last_update_time
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# get column by each table
table_info_query = """SELECT * FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}';"""
cursor.execute(table_info_query)
tb_bind_seprat_columns = cursor.fetchall()
bind_seprat_tuple_list = []
# data pre-processing
for arow in tuple_rows :
arow.append(f"{current_time}")
# change null value -> None
for i in range(len(tb_bind_seprat_columns)):
if arow[i] == "" :
arow[i] = None
bind_seprat_tuple_list.append(tuple(arow))
# data를 2개의 리스트로 나눠줌
clean_out = [] # repair_in
clean_in = [] # repair_out
# Put the data in the list according to the ewl_evets_log schema
# repair_in - 자전거번호, 정류소번호, 빼간 시간, repair_in, cleansing_done, cleansing_result, last_update_time
# repair_out- 자전거번호, 정류소번호, 빼간 시간, repair_out, cleansing_done, cleansing_result, last_update_time
for l in bind_seprat_tuple_list:
# 자전거를 넣은 시간이 not null인 경우 -> 데이터 2개로 쪼개기
if l[6] is not None :
clean_out.append([l[1], None, l[5],
'repair_in', False, 'Y', l[7]])
clean_in.append([l[1], None, l[6],
'repair_out', False, 'Y', l[7]])
# 자전거를 넣은 시간이 null인 경우 -> 자전거 뺀 데이터만 넣어주기
elif l[6] is None :
clean_out.append([l[1], None, l[5],
'repair_in', False, 'Y', l[7]])
else:
logger.error("{} Data not applicable".format(l))
# check duplicate data
clean_out = check_duplicate(clean_out)
clean_in = check_duplicate(clean_in)
start = 0
while start < num_rows:
end = start + SUB_LEN
if end > num_rows :
end = num_rows
rows_to_insert = clean_out[start:end]
insert_tuple(cursor, conn, 'ewl_events_log', rows_to_insert)
rows_to_insert = clean_in[start:end]
insert_tuple(cursor, conn, 'ewl_events_log', rows_to_insert)
start = end
except psycopg2.DatabaseError as error:
logger.error(f"load_repair_data() {error}")
load_replacing_data()
전체적으로는 load_repair_data()와 비슷한데 다른 부분만 설명하겠당
# data pre-processing
for arow in tuple_rows :
arow.append(f"{current_time}")
# 재배치내역의 ex) 20200813024513 인 경우 처리 -> timsestamp형식으로 바꾸고 초 단위 자름
# 위 경우는 bind 데이터가 없는 seprat경우만 있는 데이터이므로 밑의 조건문 걸어줌
if arow[3] == '' and arow[4] == '':
arow[2] = datetime.datetime.strptime(arow[2], "%Y%m%d%H%M%S")
arow[2] = datetime.datetime(arow[2].year, arow[2].month,
arow[2].day, arow[2].hour,
arow[2].minute, 00)
# null value -> None
for i in range(len(tb_bind_seprat_columns)):
if arow[i] == "" :
arow[i] = None
bind_seprat_tuple_list.append(tuple(arow))
# data를 2개의 리스트로 나눠줌-> seprat, bind
clean_out = []
clean_in = []
# Put the data in the list according to the ewl_evets_log schema
for l in bind_seprat_tuple_list:
# seprat, bind 데이터 둘다 존재할 경우
if l[2] is not None and l[4] is not None :
clean_out.append([l[0], l[1], l[2],
'seprat', False, 'Y', l[6]])
clean_in.append([l[0], l[3], l[4],
'bind', False, 'Y', l[6]])
# bind 데이터만 존재하는 경우
elif l[1] is None and l[2] is None:
clean_in.append([l[0], l[3], l[4],
'bind', False, 'Y', l[6]])
# seprat 데이터만 존재하는 경우
elif l[3] is None and l[4] is None:
clean_out.append([l[0], l[1], l[2],
'seprat', False, 'Y', l[6]])
else:
logger.error("{} Data not applicable".format(l))
load_rental_data()
rental return data를 2개로 나눠주는 함수도 위와 비슷하고 밑에만 다르기 때문에 밑에만 설명
# data를 2개의 리스트로 나눠줌
clean_out = []
clean_in = []
# Put the data in the list according to the ewl_evets_log schema
for l in bind_seprat_tuple_list:
# rental return 둘다 데이터가 있는 경우
if l[2] is not None and l[4] is not None:
clean_out.append([l[0], l[3], l[2],
'rental', False, 'Y', l[10]])
clean_in.append([l[0], l[5], l[4],
'return', False, 'Y', l[10]])
# rental 데이터만 있는 경우
elif l[2] is not None and l[4] is None:
clean_out.append([l[0], l[3], l[2],
'rental', False, 'Y', l[10]])
else:
logger.error("{} Data not applicable".format(l))'Python' 카테고리의 다른 글
| dt_ewl_history_load.py 코드 설명 (0) | 2020.12.08 |
|---|---|
| [Python] Sphinx 사용법 (0) | 2020.12.02 |
| update_data.py 업데이트 된 데이터 db에 반영 (0) | 2020.11.13 |
| pull_updated_data.py 업데이트된 데이터 뽑기 (0) | 2020.11.13 |
| pull_appended_data.py 새로 추가된 데이터 가져오기 (0) | 2020.11.13 |