Notice
Recent Posts
Recent Comments
Link
«   2026/04   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30
Tags
more
Archives
Today
Total
관리 메뉴

인턴기록지

pull_appended_data.py 새로 추가된 데이터 가져오기 본문

Python

pull_appended_data.py 새로 추가된 데이터 가져오기

인턴신분현경이 2020. 11. 13. 11:24

8월 1일에 데이터를 가져오고 9월 1일에 데이터를 가져오면 한달동안 새로운 데이터가 쌓인다.

이 데이터를 따로 뽑아서 8월 데이터가 들어있는 db에 추가를 해줘서 9월데이터와 같은지 비교하는 과정을 거쳤다.

 

여기서 중요한 것이 db의 ewl_pulllog의 pull_time을 보고 해당 파일의 pull_time보다 뒤에 추가된 데이터를 가져오는 것이다.

ewl_pulllog table

위와 같이 새벽 1시에 데이터를 가져왔다고 가정하고 진행한다.

 

<새로 추가된 데이터를 뽑는 과정>

#!/usr/bin/python3
import os
import datetime
import csv
from configparser import ConfigParser
import psycopg2
# append data insert db function
import load

## pull log비교해서  그 시간보다 이후의 데이터만 남도록 자르기(= append 된 것만) ##

FOLDER = os.path.abspath('/Users/hklee/digitaltwin/hk/append_data/9월')
sub_len = 100000

# ewl_pulllog 의 pull_time을 가져오는 함수
def get_pulltime(cursor, file_name):
    if file_name.find('repair') >= 0:
        pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_repair%';"
        cursor.execute(pull_query)
    elif file_name.find('replacing') >= 0:
        pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_replacing%';"
        cursor.execute(pull_query)
    elif file_name.find('rental') >= 0:
        pull_query = "SELECT pull_time FROM ewl_pulllog where file_name like 'ewl_rental_return%';"
        cursor.execute(pull_query)

    pull_time = cursor.fetchall()
    # print(pull_time[0][0])
    # 여기서 pull_time은 datetime type이라서 str으로 바꿔준후 다시 시간형식에 맞춰 바꿔준후 리턴
    # 이렇게 안하니 datetime(datetime(2020,02,05)) 이런식으로 나와서 곤란;
    pull_time = datetime.datetime.strptime(str(pull_time[0][0]), "%Y-%m-%d %H:%M:%S")

    return pull_time

# 위에서 가져온 pull_time과 데이터가 들어온 시간을 비교해 새로 들어온 데이터를 뽑아주는 함수
def load_new_data(file_name, time_value):
    filepath = os.path.join(FOLDER, file_name)

    try:
        datafile = open(filepath, mode='r', encoding='utf-8')
        csv_reader = csv.reader(datafile)

        data_rows = list(csv_reader)

        # 각 테이블의 일시 컬럼 index구해주기
        if file_name.find('repair') >= 0:
            out_time_column_index = data_rows[0].index("등록_일시")
            in_time_column_index = data_rows[0].index("완료_일시")
            file_type = 1
        elif file_name.find('replacing') >= 0:
            out_time_column_index = data_rows[0].index("분리_일시")
            in_time_column_index = data_rows[0].index("결합_일시")
            file_type = 2
        elif file_name.find('rental_') >= 0:
            out_time_column_index = data_rows[0].index("대여_일시")
            in_time_column_index = data_rows[0].index("반납_일시")
            file_type = 3
        # print("timecolumn index", out_time_column_index, in_time_column_index)

        columns = data_rows.pop(0)
        # num_rows = len(data_rows)
        # print("num rows ",num_rows)
        # print("pull_time : ",time_value)

        update_data_rows = []
        update_data_rows.append(columns)
        # print(update_data_rows)

        for row in data_rows:
            # out data o in data o
            if row[out_time_column_index] != '' and row[in_time_column_index] != '':
                if file_type == 1 : # repair
                    # 분리시간 >= pull_time 인 경우 (날짜 로만 비교 8월 1일 부터~)
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d")
                    if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 00, 00) :
                        update_data_rows.append(row)
                elif file_type == 2 :   # replacing
                	# 재배치의 경우에는 옛날 자전거의 경우 out in 데이터가 완벽하지 않으면 추가가 되지 않았다.
                    # 따라서 out 했을 때가 8/1 전이라해도 in 데이터가 없다면 데이터파일에 기록이 되지 않는다.
                    # 그래서 out도 시간을 비교해주고 8/1 이후에 들어온 in 데이터가 있어서 새로 데이터가 추가가 됐을수도 있으니
                    # in 시간도 비교해준다.
                    # 분리시간 > pull_time  || 결합시간 > pull_time
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
                    vs_time_int = datetime.datetime.strptime(row[in_time_column_index], "%Y-%m-%d %H:%M")
                    if vs_time > time_value or vs_time_int > time_value:
                        update_data_rows.append(row)
                else:   # rental return
                	# 대여반납은 00:20분에 데이터가 나눠진다.
                    # 19분까지 8월 데이터고 20분 부터 9월데이터로 들어간다.
                    # 따라서 time포맷을 20분으로 바꿔준다.
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
                    if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 20, 00) :
                        update_data_rows.append(row)
            # out data o in data x
            elif row[out_time_column_index] != '' and row[in_time_column_index] == '':
                if file_type == 2 : # replacing
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y%m%d%H%M%S")
                    if vs_time > time_value:
                        update_data_rows.append(row)
                elif file_type == 1 :   # repair
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d")
                    if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 00, 00):
                        update_data_rows.append(row)
                else :  # rental return
                    vs_time = datetime.datetime.strptime(row[out_time_column_index], "%Y-%m-%d %H:%M")
                    if vs_time >= datetime.datetime(time_value.year, time_value.month, time_value.day, 00, 20, 00):
                        update_data_rows.append(row)
            # out data x in data o  재배치에만 존재
            elif row[out_time_column_index] == '' and row[in_time_column_index] != '' :
                if datetime.datetime.strptime(row[in_time_column_index], "%Y-%m-%d %H:%M") >= time_value:
                    update_data_rows.append(row)
            
            else:   #이외의 데이터들
                print("except data", row)

        #   append 된 데이터 txt로 write
        test_path = os.path.abspath('/Users/hklee/Downloads/new_data_file/test_data')
        if file_type == 1:
            new_file = open(test_path+'\\repair_new_data.txt', mode='at', encoding='UTF-8')
        elif file_type == 2:
            new_file = open(test_path+'\\replacing_new_data.txt', mode='at', encoding='UTF-8')
        elif file_type == 3:
            new_file = open(test_path+'\\rental_return_new_data.txt', mode='at', encoding='UTF-8')

        for row in update_data_rows:
            for ix in range(len(row)):
                if ix == len(row)-1:
                    new_file.write(row[ix])
                else:
                	# 수리내역의 고장 부분은 하나로 묶여야 한다. ("기타, 체인")
                    if file_type == 1 and ix == 4 :
                        new_file.write('"')
                        new_file.writelines(row[ix])
                        new_file.write('"')
                        new_file.write(',')
                    else:
                        new_file.write(row[ix])
                        new_file.write(',')
            new_file.write('\n')
        new_file.close()

        datafile.close()
        print("pull_appended_data finish")
        return new_file.name

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

def config(filename='database.ini', section='database'):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)

    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))

    return db


if __name__ == '__main__':
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        params = config()
        print(params)
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params)
        # create a cursor
        cur = conn.cursor()

        for filename in os.listdir(FOLDER):
            print('\n')
            if filename.find('ewl_rental_history') >= 0 : #or filename.find('ewl_repair') >= 0 or filename.find('ewl_replacing') >= 0:
                print(filename)
                time_value = get_pulltime(cur, filename)
                # print("value: ",time_value)
                new_data_file = load_new_data(filename, time_value)
                # 경로 변경할 때 밑에도new_data_file[49:] 인덱스 변경해야 함
                # load.py 함수 이용해서 db 에 insert
                load.load_init_data(cur, conn, new_data_file[49:])

        cur.close()

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)

    finally:
        if conn is not None:
            conn.close()
            print('Database connection closed.')