compete_jobs/common_task.py

172 lines
8.2 KiB
Python

import pandas as pd
import boto3
from datetime import datetime
import glob
from naukri.search_india import NaukriJobScraper
from naukri.jobdata_india import NaukriJobDetailScraper
from naukri.search_gulf_r import main as gulfSearch, output_filename_csv as gulf_search_file
from naukri.jobdata_gulf_r import NaukriGulfJobDetailScraper
import time
import os
import sys
def upload_file_to_bucket(localFilePath, localFileName, today_date):
s3 = boto3.client('s3')
bucket_name = 'compete-syndication'
file_path = localFilePath
s3_key = f'naukri/{today_date}/{localFileName}'
s3.upload_file(file_path, bucket_name, s3_key)
print(f'File "{file_path}" uploaded to S3 bucket "{bucket_name}" as "{s3_key}"')
def read_s3_file(filenameInS3):
aws_access_key_id = 'AKIAWWHGITBE7XFXWA7U'
aws_secret_access_key = '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'
# bucket_name =
# file_key = 'naukri/test_data.csv'
# file_key = f'naukri/{filenameInS3}'
s3_bucket = 'compete-syndication'
s3_file_key = f'naukri/{filenameInS3}'
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
s3_client = session.client('s3')
s3_object = s3_client.get_object(Bucket=s3_bucket, Key=s3_file_key)
df = pd.read_csv(s3_object['Body'])
print(df)
# file_content = response['Body'].read()
# # Print or process the file contents
# print(file_content.decode('utf-8')) # Assumes the file is text; adjust accordingly
def do_the_difference(today_file, last_file, column_for_diff, fresh_output, expired_output, common_output):
today_df = pd.read_csv(today_file)
last_file_df = pd.read_csv(last_file)
print(today_df.shape, last_file_df.shape)
today_df.drop_duplicates(subset=[column_for_diff], keep='first', inplace=True)
# today_df.to_csv('unique Compete_1_09-10-2023.csv', index=False)
last_file_df.drop_duplicates(subset=[column_for_diff], keep='first', inplace=True)
# last_file_df.to_csv('unique Compete_1_29-09-2023.csv', index=False)
print(today_df.shape, last_file_df.shape)
new_df = pd.merge(today_df, last_file_df, on=column_for_diff, how='left', indicator=True, suffixes=('', '_ignored')).query('_merge == "left_only"').drop(['_merge'], axis=1)
new_df.to_csv(fresh_output, index=False)
expired_df = pd.merge(last_file_df, today_df, on=column_for_diff, how='left', indicator=True, suffixes=('', '_ignored')).query('_merge == "left_only"').drop(['_merge'], axis=1)
expired_df.to_csv(expired_output, index=False)
print(new_df.shape, expired_df.shape)
common_df = pd.merge(today_df, last_file_df, on=column_for_diff, how='inner')
print(common_df.shape)
common_df.to_csv(common_output, index=False)
def extract_date_from_filename(filename):
date_str = filename.split("_")[-1].replace(".csv", "")
return datetime.strptime(date_str, "%d-%m-%Y")
def find_second_latest_file(folder_path, search_pattern):
files = glob.glob(os.path.join(folder_path, search_pattern))
files.sort(key=extract_date_from_filename, reverse=True)
if len(files) >= 2:
second_latest_file = files[1]
print("Second latest file:", second_latest_file)
return second_latest_file
else:
print("There are not enough files in the folder to find the second latest file.")
return None
def run_india_scraper(today_date):
# current_date = datetime.now()
# today_date = current_date.strftime('%d-%m-%Y')
india_search_input_file = "naukri/_industry_urls.csv"
india_search_output_file = f"india_data/daily_search_results/search_result_india_{today_date}.csv"
india_search_error_file = f"india_data/daily_error_folder/search_error_india_{today_date}.csv"
india_search_stats_file = f"india_data/daily_stats_folder/stats_india_search_{today_date}.txt"
start_time = time.time()
scraper = NaukriJobScraper(india_search_input_file, india_search_output_file, india_search_error_file)
scraper.scrape()
end_time = time.time()
duration_hours = (end_time - start_time) / 3600
print(f"Search program took {duration_hours:.2f} hours to run.")
with open(india_search_stats_file, "a") as stat:
stat.write(f"Search program took {duration_hours:.2f} hours to run. \n")
folder_path = "india_data/daily_search_results/"
search_pattern = "search_result_india_*.csv"
last_file = find_second_latest_file(folder_path, search_pattern)
fresh_output = f"india_data/daily_process_folder/new_jobs_on_{today_date}.csv"
expired_output = f"india_data/daily_upload_folder/Compete_1_India_Archive_{today_date}.csv"
common_output = f"india_data/daily_common_folder/common_data_on_{today_date}.csv"
do_the_difference(india_search_output_file, last_file, 'jdURL',
fresh_output, expired_output, common_output)
india_detail_file = f"india_data/daily_upload_folder/Compete_1_India_Active_{today_date}.csv"
india_detail_error_file = f"india_data/daily_error_folder/error_on_India_detail_{today_date}.txt"
start_time = time.time()
scraper = NaukriJobDetailScraper(fresh_output, india_detail_file, india_detail_error_file)
scraper.scrape()
end_time = time.time()
duration_hours = (end_time - start_time) / 3600
print(f"Jobdata program took {duration_hours:.2f} hours to run.")
with open(f'india_data/daily_stats_folder/stats_file_of_{today_date}.txt', "a") as stat:
stat.write(f"Jobdata program took {duration_hours:.2f} hours to run.\n")
current_date = datetime.now()
today_date = current_date.strftime('%d-%m-%Y')
upload_file_to_bucket(expired_output, f"Compete_1_India_Archive_{today_date}.csv" , today_date)
upload_file_to_bucket(india_detail_file, f"Compete_1_India_Active_{today_date}.csv" , today_date)
def run_gulf_scraper(today_date):
gulfSearch()
folder_path = "gulf_data/daily_search_results/"
search_pattern = "search_result_gulf_*.csv"
last_file = find_second_latest_file(folder_path, search_pattern)
# current_date = datetime.now()
# today_date = current_date.strftime('%d-%m-%Y')
fresh_output = f"gulf_data/daily_process_folder/new_jobs_on_{today_date}.csv"
expired_output = f"gulf_data/daily_upload_folder/Compete_1_Gulf_Archive_{today_date}.csv"
common_output = f"gulf_data/daily_common_folder/common_data_on_{today_date}.csv"
do_the_difference(gulf_search_file, last_file, "jdURL", fresh_output, expired_output, common_output)
upload_file_to_bucket(expired_output, f"Compete_1_Gulf_Archive_{today_date}.csv" ,today_date)
start_time = time.time()
gulf_detail_file = f"gulf_data/daily_upload_folder/Compete_1_Gulf_Active_{today_date}.csv"
gulf_detail_error_file = f"india_data/daily_error_folder/error_on_India_detail_{today_date}.txt"
scraper = NaukriGulfJobDetailScraper(fresh_output, gulf_detail_file, gulf_detail_error_file)
scraper.scrape()
end_time = time.time()
duration_hours = (end_time - start_time) / 3600
print(f"Jobdata program took {duration_hours:.2f} hours to run.")
with open(f'gulf_data/daily_stats_folder/stats_file_of_{today_date}.txt', "a") as stat:
stat.write(f"Jobdata program took {duration_hours:.2f} hours to run.\n")
upload_file_to_bucket(gulf_detail_file, f"Compete_1_Gulf_Active_{today_date}.csv" , today_date)
if __name__ == "__main__":
aws_access_key_id = 'AKIAWWHGITBE7XFXWA7U'
aws_secret_access_key = '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'
current_date = datetime.now()
today_date = current_date.strftime('%d-%m-%Y')
bucket_name = 'compete-syndication'
folder_name = f'naukri/{today_date}' # Replace with your desired folder name
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
folder_key = folder_name + '/' # Note the trailing slash
s3.put_object(Bucket=bucket_name, Key=folder_key)
if len(sys.argv) != 2:
print("Usage: python common_task.py [gulf|india]")
sys.exit(1)
option = sys.argv[1].lower()
if option == 'gulf':
run_gulf_scraper(today_date)
elif option == 'india':
run_india_scraper(today_date)
else:
print("Invalid argument. Please use 'gulf' or 'india' as the argument.")