인턴기록지
pull_appended_data.py 새로 추가된 데이터 가져오기 본문
8월 1일에 데이터를 가져오고 9월 1일에 데이터를 가져오면 한달동안 새로운 데이터가 쌓인다.
이 데이터를 따로 뽑아서 8월 데이터가 들어있는 db에 추가를 해줘서 9월데이터와 같은지 비교하는 과정을 거쳤다.
여기서 중요한 것이 db의 ewl_pulllog의 pull_time을 보고 해당 파일의 pull_time보다 뒤에 추가된 데이터를 가져오는 것이다.

위와 같이 새벽 1시에 데이터를 가져왔다고 가정하고 진행한다.
<새로 추가된 데이터를 뽑는 과정>
#!/usr/bin/python3
import os
import datetime
import csv
from configparser import ConfigParser
import psycopg2
# append data insert db function
import load
## pull log비교해서 그 시간보다 이후의 데이터만 남도록 자르기(= append 된 것만) ##
FOLDER = os.path.abspath('/Users/hklee/digitaltwin/hk/append_data/9월')
sub_len = 100000
# ewl_pulllog 의 pull_time을 가져오는 함수
def get_pulltime(cursor, file_name):
if file_name.find('repair') >= 0:
pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_repair%';"
cursor.execute(pull_query)
elif file_name.find('replacing') >= 0:
pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_replacing%';"
cursor.execute(pull_query)
elif file_name.find('rental') >= 0:
pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_rental_return%';"
cursor.execute(pull_query)
pull_time = cursor.fetchall()
# print(pull_time[0][0])
# 여기서 pull_time은 datetime type이라서 str으로 바꿔준후 다시 시간형식에 맞춰 바꿔준후 리턴
# 이렇게 안하니 datetime(datetime(2020,02,05)) 이런식으로 나와서 곤란;
pull_time = datetime.datetime.strptime(str(pull_time[0][0]), "%Y-%m-%d %H:%M:%S")
return pull_time
# 위에서 가져온 pull_time과 데이터가 들어온 시간을 비교해 새로 들어온 데이터를 뽑아주는 함수
def load_new_data(file_name, time_value):
filepath = os.path.join(FOLDER, file_name)
try:
datafile = open(filepath, mode='r', encoding='utf-8')
csv_reader = csv.reader(datafile)
data_rows = list(csv_reader)
# 각 테이블의 일시 컬럼 index구해주기
if file_name.find('repair') >= 0:
out_time_column_index = data_rows[0].index("등록_일시")
in_time_column_index = data_rows[0].index("완료_일시")
file_type = 1
elif file_name.find('replacing') >= 0:
out_time_column_index = data_rows[0].index("분리_일시")
in_time_column_index = data_rows[0].index("결합_일시")
file_type = 2
elif file_name.find('rental_') >= 0:
out_time_column_index = data_rows[0].index("대여_일시")
in_time_column_index = data_rows[0].index("반납_일시")
file_type = 3
# print("timecolumn index", out_time_column_index, in_time_column_index)
columns = data_rows.pop(0)
# num_rows = len(data_rows)
# print("num rows ",num_rows)
# print("pull_time : ",time_value)
update_data_rows = []
update_data_rows.append(columns)
# print(update_data_rows)
for row in data_rows:
# out data o in data o
if row[out_time_column_index] != '' and row[in_time_column_index] != '':
if file_type == 1 : # repair
# 분리시간 >= pull_time 인 경우 (날짜 로만 비교 8월 1일 부터~)
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d")
if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 00, 00) :
update_data_rows.append(row)
elif file_type == 2 : # replacing
# 재배치의 경우에는 옛날 자전거의 경우 out in 데이터가 완벽하지 않으면 추가가 되지 않았다.
# 따라서 out 했을 때가 8/1 전이라해도 in 데이터가 없다면 데이터파일에 기록이 되지 않는다.
# 그래서 out도 시간을 비교해주고 8/1 이후에 들어온 in 데이터가 있어서 새로 데이터가 추가가 됐을수도 있으니
# in 시간도 비교해준다.
# 분리시간 > pull_time || 결합시간 > pull_time
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
vs_time_int = datetime.datetime.strptime(row[in_time_column_index], "%Y-%m-%d %H:%M")
if vs_time > time_value or vs_time_int > time_value:
update_data_rows.append(row)
else: # rental return
# 대여반납은 00:20분에 데이터가 나눠진다.
# 19분까지 8월 데이터고 20분 부터 9월데이터로 들어간다.
# 따라서 time포맷을 20분으로 바꿔준다.
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 20, 00) :
update_data_rows.append(row)
# out data o in data x
elif row[out_time_column_index] != '' and row[in_time_column_index] == '':
if file_type == 2 : # replacing
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y%m%d%H%M%S")
if vs_time > time_value:
update_data_rows.append(row)
elif file_type == 1 : # repair
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d")
if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 00, 00):
update_data_rows.append(row)
else : # rental return
vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 20, 00):
update_data_rows.append(row)
# out data x in data o 재배치에만 존재
elif row[out_time_column_index] == '' and row[in_time_column_index] != '' :
if datetime.datetime.strptime(row[in_time_column_index], "%Y-%m-%d %H:%M") >= time_value:
update_data_rows.append(row)
else: #이외의 데이터들
print("except data", row)
# append 된 데이터 txt로 write
test_path = os.path.abspath('/Users/hklee/Downloads/new_data_file/test_data')
if file_type == 1:
new_file = open(test_path+'\\repair_new_data.txt', mode='at', encoding='UTF-8')
elif file_type == 2:
new_file = open(test_path+'\\replacing_new_data.txt', mode='at', encoding='UTF-8')
elif file_type == 3:
new_file = open(test_path+'\\rental_return_new_data.txt', mode='at', encoding='UTF-8')
for row in update_data_rows:
for ix in range(len(row)):
if ix == len(row)-1:
new_file.write(row[ix])
else:
# 수리내역의 고장 부분은 하나로 묶여야 한다. ("기타, 체인")
if file_type == 1 and ix == 4 :
new_file.write('"')
new_file.writelines(row[ix])
new_file.write('"')
new_file.write(',')
else:
new_file.write(row[ix])
new_file.write(',')
new_file.write('\n')
new_file.close()
datafile.close()
print("pull_appended_data finish")
return new_file.name
except (Exception, psycopg2.DatabaseError) as error:
print(error)
def config(filename='database.ini', section='database'):
# create a parser
parser = ConfigParser()
# read config file
parser.read(filename)
# get section, default to postgresql
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db
if __name__ == '__main__':
""" Connect to the PostgreSQL database server """
conn = None
try:
# read connection parameters
params = config()
print(params)
# connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
# create a cursor
cur = conn.cursor()
for filename in os.listdir(FOLDER):
print('\n')
if filename.find('ewl_rental_history') >= 0 : #or filename.find('ewl_repair') >= 0 or filename.find('ewl_replacing') >= 0:
print(filename)
time_value = get_pulltime(cur, filename)
# print("value: ",time_value)
new_data_file = load_new_data(filename, time_value)
# 경로 변경할 때 밑에도new_data_file[49:] 인덱스 변경해야 함
# load.py 함수 이용해서 db 에 insert
load.load_init_data(cur, conn, new_data_file[49:])
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
print('Database connection closed.')'Python' 카테고리의 다른 글
| update_data.py 업데이트 된 데이터 db에 반영 (0) | 2020.11.13 |
|---|---|
| pull_updated_data.py 업데이트된 데이터 뽑기 (0) | 2020.11.13 |
| load_init_data.py (최종) (0) | 2020.11.13 |
| SQLite3 Install (linux) (0) | 2020.10.06 |
| centos7 virtualenv 설치/활성화 (0) | 2020.09.21 |