Why is my selenium script blocked in production in headless mode, while it works completely fine in local enviroment? i use python
i have a selenium web scrapper, that runs well in my local development environment even in headless mode. However when i deploy it in production in a Linode Ubuntu VPS, somehow it fails with a Timeout exceeded message.Any help would be highly appreciated.
I use django management commands to run the script, and it fetches data from a japanese car website.
Here is the code
import json
from numbers import Number
from pathlib import Path
import traceback
import asyncio
import warnings
import time
import pandas as pd
from inspect import Traceback
import re
from bs4 import BeautifulSoup
from django.views.i18n import set_language
from google_currency import convert
from deep_translator import GoogleTranslator,LingueeTranslator
import undetected_chromedriver as UC
from selenium import webdriver
from selenium.webdriver import DesiredCapabilities
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.select import Select
from selenium_stealth import stealth
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException, NoAlertPresentException
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import service
from base_selenium_browser import BaseSeleniumBrowser
from basebrowser import BaseBrowser
from bs4 import BeautifulSoup
import random
from random import randint
from bs4 import SoupStrainer
from cleaning.cleaning_data import DataProcessor
from saving_data.saving_data import SavingData
from webdriver_manager.chrome import ChromeDriverManager
BASE_PATH = Path(__file__).resolve().parent.parent
class PriceFinderBot(BaseSeleniumBrowser):
def __init__(self):
self.__data_saver = SavingData()
options = webdriver.ChromeOptions()
# Core headless settings
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument('--ignore-certificate-errors')
options.add_argument('--allow-insecure-localhost')
# Stealth settings
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-infobars")
options.add_argument("--disable-web-security")
options.add_argument("--disable-extensions")
options.add_argument("--disable-popup-blocking")
options.add_argument("--ignore-certificate-errors")
# Window size and other properties
options.add_argument("--window-size=1920,1080")
options.add_argument("--start-maximized")
user_agents = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
options.add_argument(f"user-agent={random.choice(user_agents)}")
proxy = self.get_random_proxy()
print(f"The proxy is {proxy['https']}")
# options.add_argument(f"--proxy-server={proxy['http']}")
caps = DesiredCapabilities.CHROME
caps['goog:loggingPrefs'] = {'performance': 'ALL'}
caps['chromeOptions'] = {
'args': options.arguments,
'excludeSwitches': ['enable-automation']
}
(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
prefs = {
"profile.default_content_setting_values.notifications": 2,
"profile.password_manager_leak_detection": False,
"profile.password_manager_enabled": False,
"credentials_enable_service": False,
# "profile.managed_default_content_settings.images": 2,
}
options.add_experimental_option("prefs", prefs)
# options.add_argument("--window-size=1920,1080")
self.__data_processor = DataProcessor()
self.__data_saver = SavingData()
self.__data_filters = "bf_data_filters"
self.__driver_path = "./"
self.__waiting_time = 60
self.__car_name_filters = "bf_car_name_filters"
self.__driver = webdriver.Chrome(
service = Service(ChromeDriverManager().install()),
options=options,
)
self.__car_name_individual_filters = "bf_car_individual_filters"
self.__car_name_filter_name = "MultiForm[0].MakerCode"
# stealth(
# self.__driver,
# languages=['en-US','en'],
# vendor="Google Inc.",
# platform="Win32",
# webgl_vendor="Intel Inc.",
# renderer="Intel Iris OpenGL Engine",
# fix_hairline=True
# )
super(PriceFinderBot, self).__init__("Price Finder Auto Bot v2", "www.iauc.co.jp/service", use_ssl=True)
def get_random_proxy(self):
with open("proxies.proxies", "r") as file:
proxies = file.read().split("\n")
selected_proxy = random.choice(proxies)
print(f"Selected proxy : {selected_proxy}")
return {
"http": f"http://{selected_proxy}",
"https": f"https://{selected_proxy}"
}
def login(self):
wait = WebDriverWait(self.__driver, 20)
# 1. Navigate to the website
self.__driver.maximize_window()
url = self._join_base("/")
self.__driver.get(url)
member_id = os.environ.get("IAUC_USERNAME")
member_password = os.environ.get("IAUC_PASSWORD")
# 2. get the login link and click it
login_button = self.__wait_to_clickable((By.CLASS_NAME, 'login-btn'))
login_button.click()
if not member_id or not member_password:
return
# 3. filling in the member id
member_id_input = self.__wait_to_visible((By.NAME, 'id'))
member_id_input.send_keys(member_id)
# 4. Filling in the password
member_password_input = self.__wait_to_visible((By.NAME, 'password'))
member_password_input.send_keys(member_password)
# 5. Get the login button and click it
login_submit_btn = self.__wait_to_clickable((By.ID, 'login_button'))
login_submit_btn.submit()
try:
logout_btn = self.__wait_to_clickable((By.ID, 'logout'))
print(f"[*] Successfully logged into the system")
except NoSuchElementException as e:
print(f"[-] Incorrect memberid/password")
try:
self.clear_session()
self.logout()
except NoSuchElementException as ex:
print(f"[-] Some other error has occurred: {ex}")
except TimeoutException as e:
print(f"[-] Login button not found ")
try:
self.clear_session()
except Exception as e:
print(f"[-] Weird Error while loggin in {e}")
def logout(self):
logout= self.__wait_to_clickable((By.ID, 'logout'))
logout.click()
def clear_session(self):
session_clear = self.__wait_to_visible((By.ID, 'session-clear'))
if session_clear:
sess_clear = self.__wait_to_clickable((By.CLASS_NAME, 'button-yes'))
print(f"[*] Session limit has exceeded, resetting it")
sess_clear.click()
def start(self):
self.login()
cars = self.get_car_without_price(5)
for car in cars:
self.__get_average_price(car)
self.logout()
self.__driver.quit()
def __wait_to_clickable(self,selector_tuple):
element = WebDriverWait(self.__driver,self.__waiting_time).until(
EC.element_to_be_clickable(selector_tuple)
)
return element
def __wait_many_to_presence(self,selector_tuple):
elements = WebDriverWait(self.__driver,self.__waiting_time).until(
EC.presence_of_all_elements_located(selector_tuple)
)
return elements
def __wait_to_visible(self,selector_tuple):
element = WebDriverWait(self.__driver,self.__waiting_time).until(
EC.visibility_of_element_located(selector_tuple)
)
return element
def __wait_for_element_tobe_clickable(self,el):
element = WebDriverWait(self.__driver,self.__waiting_time).until(
EC.element_to_be_clickable(el)
)
return element
def __wait_visible_child(self,parent,selector_tuple):
element = WebDriverWait(parent,self.__waiting_time).until(
EC.visibility_of_element_located(selector_tuple)
)
return element
def get_car_without_price(self,limit):
return self.__data_saver.get_cars_without_price(limit)
def accept_alert(self,timeout):
try:
# Wait for the alert to be present
WebDriverWait(self.__driver, timeout).until(EC.alert_is_present())
alert = self.__driver.switch_to.alert
print(f"Alert text: {alert.text}")
alert.accept() # or alert.dismiss()
except TimeoutException:
print("No alert appeared within the specified time.")
except NoAlertPresentException:
print("No alert is currently present.")
def exec(self):
pass
def convertibleToNumber(self,value):
try:
int(value)
float(value)
return True
except:
return False
def checkValidNo(self,value, default):
if value is None:
return default
elif isinstance(value,str) and not self.convertibleToNumber(value):
return default
else:
return value
def smallest_bigger_value_within(self,value):
values = [0,10,20,30,40,50,60,70,80,90,100,110,120,150]
adjusted_value = value/1000
length = len(values)
i = 0
while(i<length-1):
if values[i] <= adjusted_value:
i = i+1
continue
else:
break
return (values[i-1],values[i])
def __get_average_price(self,car):
#Changing the language after login in
wait = WebDriverWait(self.__driver, 20)
try:
language_switch = self.__wait_to_visible((By.ID,'toggle_lang'))
lang_class = language_switch.get_attribute('class')
if lang_class.strip() == "jp":
language_switch.click()
print(f"[*] Successfully changed the language")
desired_make = car['modelMake']
desired_model = car['modelName']
desired_year = car["year"]
desired_model_code = car['modelCode']
mileage = car['mileage']
mileage_standard= self.checkValidNo(mileage, 70000)
score = car['score']
score_from = self.checkValidNo(score, 3)
score_to = float(self.checkValidNo(score, 3)) + 0.5
original_window = self.__driver.current_window_handle
no_of_windows = len(self.__driver.window_handles)
# # 6. Navigate to markt prices
market_price_btn = self.__wait_to_clickable((By.ID,'gmenu_market'))
market_price_btn.click()
wait.until(EC.number_of_windows_to_be(no_of_windows+1))
# Open a new tab
self.__driver.switch_to.window(self.__driver.window_handles[-1])
select_all_days_btn = self.__wait_to_clickable((By.ID,'btn_vehicle_all'))
select_all_days_btn.click()
print("Button vehicle all is clicked")
#7. Proceed by pressing next
next_button = self.__wait_to_clickable((By.CLASS_NAME,'page-next-button'))
if next_button.is_enabled():
next_button.click()
print(f"[*] Successfully moved to the auctions page")
else:
print("The form is not ready to go to the next stage")
#8 Fill in the form by choosing a make and model
# car_makes_input = self.__driver.find_elements(By.XPATH,'//*[@id="domestic-maker"]/ul/li[1]/input')
# car_makes_text_div = self.__driver.find_elements(By.XPATH,'//*[@id="domestic-maker"]/ul/li[3]/div[3]')
car_makes_domestic_div = self.__wait_to_visible((By.ID,'domestic-maker'))
car_makes_li = self.__driver.find_elements(By.XPATH,'//*[@id="domestic-maker"]/ul/li')
imported_makes_li = self.__driver.find_elements(By.XPATH, '//*[@id="foreign-maker"]/ul/li')
for idx,make_li in enumerate(car_makes_li):
li_input = make_li.find_element(By.XPATH,f"//*[@id='domestic-maker']/ul/li[{idx+1}]/div[1]")
li_div = make_li.find_element(By.XPATH,f"//*[@id='domestic-maker']/ul/li[{idx+1}]/div[3]")
if li_div.text.lower() == desired_make.lower():
li_input.click()
break
for idx,imported_make_li in enumerate(imported_makes_li):
li_input = imported_make_li.find_element(By.TAG_NAME,'input')
li_div = self.__driver.find_element(By.XPATH,f"//*[@id='foreign-maker']/ul/li[{idx+1}]/div[3]")
if li_div.text.lower() == desired_make.lower():
print("We found the make, we can now click")
li_input.click()
break
#9. Choose the models specified
car_model_lis = self.__driver.find_elements(By.XPATH, '//*[@id="box-type"]/ul/li')
for idx,model_li in enumerate(car_model_lis):
model_span = self.__driver.find_element(By.XPATH,f"//*[@id='box-type']/ul/li[{idx+1}]/div[2]/span[1]")
if desired_model.lower() in model_span.text.lower():
model_li_input = model_li.find_elements(By.TAG_NAME,'div')[0]
model_li_input.click()
break
#Select the 12 result button
search_period = self.__wait_to_clickable((By.ID,'searchPeriod'))
search_period_select = Select(search_period)
search_period_select.select_by_value('12')
# Expand the search
accordion_expand_btn = self.__wait_to_clickable((By.ID,'search-accordion'))
accordion_expand_btn.click()
WebDriverWait(self.__driver,self.__waiting_time).until(
EC.invisibility_of_element_located((By.ID, "loading"))
)
# Find the type/model code select
model_codes = self.__wait_many_to_presence((By.XPATH,'//*[@id="box-model"]/ul/li'))
for model_code in model_codes:
code_div = self.__wait_visible_child(model_code,(By.CLASS_NAME,'detail-check-body'))
if code_div.text == desired_model_code:
element = self.__wait_for_element_tobe_clickable(model_code)
element.click()
print(f"[*] Already set the modelCode")
#Select yeear from and year to
year_from_select = self.__wait_to_clickable((By.NAME,'modelOfYearFrom'))
year_from_sel = Select(year_from_select)
year_from_sel.select_by_value(str(desired_year))
print(" Already set year from")
# Select modelOfYearTo
year_to_select = self.__wait_to_clickable((By.NAME, 'modelOfYearTo'))
year_to_sel = Select(year_to_select)
year_to_sel.select_by_value(str(desired_year))
print("Already set year to")
# Mileage from
mileage_from,mileage_to = self.smallest_bigger_value_within(mileage_standard)
print(mileage_from)
print(mileage_to)
print(mileage_standard)
print(car['mileage'])
mileage_from_select = self.__wait_to_clickable((By.NAME,'mileageFrom'))
mileage_from_sel = Select(mileage_from_select)
mileage_from_sel.select_by_value(str(mileage_from))
print("Already set mileage from ")
mileage_to_select = self.__wait_to_clickable((By.NAME, 'mileageTo'))
mileage_to_sel = Select(mileage_to_select)
mileage_to_sel.select_by_value(str(mileage_to))
print("Already set mileage to")
# Rating from
rate_from_select = self.__wait_to_clickable((By.NAME, 'rateFrom'))
rate_from_sel = Select(rate_from_select)
rate_from_sel.select_by_value(str(score_from))
print("Already set rate from")
# Rating to
rate_to_select = self.__wait_to_clickable((By.NAME, 'rateTo'))
rate_to_sel = Select(rate_to_select)
rate_to_sel.select_by_value(str(score_to))
print("Already seet rate to")
filter_next_btn = self.__wait_to_clickable((By.ID,'next-bottom'))
if filter_next_btn.is_enabled():
filter_next_btn.click()
# choose a desired year
strainer = SoupStrainer('table')
data_table_soup = BeautifulSoup(self.__driver.page_source, 'html.parser', parse_only=strainer)
try:
cars_table = data_table_soup.find('table',id='carlist')
if cars_table is None:
print("No cars were found")
return
cars_rows = cars_table.find_all('tr',class_ ='line-auction')
car_info_list = []
for idx in range(0,len(cars_rows),3):
car_info = {}
first_row = cars_rows[idx]
second_row = cars_rows[idx+1]
third_row = cars_rows[idx+2]
first_row_tds = first_row.find_all('td',class_ = 'open-detail')
second_row_tds = second_row.find_all('td',class_='open-detail')
third_row_tds = third_row.find_all('td',class_='open-detail')
for index,car_td in enumerate(first_row_tds):
if index == 0 or index==2 : continue
if index == 1:
model = car_td.text.strip()
cleaned = re.sub(r'[一-龠ぁ-ゔァ-ヴー々〆〤ァ-ン゙゚\u3000-\u303f\uFF00-\uFFEF\u4E00-\u9FAF]|[^\w\s]|_', ' ', model)
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
car_info['model']=cleaned
if index == 3:
location = car_td.text
car_info['location'] = location
for index,second_car_td in enumerate(second_row_tds):
if index==0:
auction_site = second_car_td.text
car_info['auction_site'] = auction_site
if index==1:
year = second_car_td.find('p').text
car_info['year'] = year
if index ==2:
engine_type = second_car_td.find_all('p')[0].text
engine_size = second_car_td.find_all('p')[1].text
car_info['engine_type'] = engine_type
car_info['engine_size'] = engine_size
if index == 3:
mileage = second_car_td.find_all('p')[1].text
car_info['mileage']= mileage
if index ==4:
color = second_car_td.find_all('p')[0].text
color_no = second_car_td.find_all('p')[1].text
color_dict = {'color':color,'color_no':color_no}
car_info['color'] = color_dict
if index ==5:
transmission = second_car_td.find_all('p')[0].text
air_conditioner = second_car_td.find_all('p')[1].text
car_info['transmission'] = transmission
car_info['air_conditioner'] = air_conditioner
if index == 6:
score = second_car_td.find_all('p')[0].text
ext_int = second_car_td.find_all('p')[1].text
car_info['score'] = score
car_info['exterior_interior'] = ext_int.replace('\u00a0','')
if index == 7:
start_price = second_car_td.find_all('p')[1].text
car_info['start_price'] = start_price
if index==8:
final_price_string = second_car_td.find('div').text
print(final_price_string)
# final_price = "".join(char for char in final_price_string if char.isdigit())
final_price= figure = re.search(r"(?<=\b\w{3} \d{2})[\d,]+", final_price_string.strip("\n ")).group().replace(",",'')
# final_status = "".join(char for char in final_price if not char.isdigit())
final_status = re.sub(r"(?<=\b\w{3} \d{2})[\d,]+", "", final_price_string.strip("\n "))
car_info['final_price'] = final_price
car_info['final_status'] = final_status
# car_info['status'] = final_status
for index, third_row_td in enumerate(third_row_tds):
if index == 0: continue
if index == 1:
lot_no = third_row_td.text
car_info['lot_no'] = lot_no
car_info_list.append(car_info)
average_price = self.calculate_average(car_info_list)
self.__data_saver.update_car(car,average_price)
with open('iauc.json','w') as iauc_file:
iauc_file.write(json.dumps(car_info_list))
print(f"[*] Successfully written data on iauc.json")
# time.sleep(randint(3, 7))
except Exception as e:
print(f"[-] An error has occured {e}")
print(traceback.format_exc())
self.logout()
self.__driver.quit()
except Exception as e:
print(f"[-] An error has occured {e}")
self.logout()
print(f"[*] Successfully logged out of the system")
self.__driver.quit()
def calculate_average(self,carInfo):
print(f"Car Info : {carInfo}")
if not carInfo:
return
models = [car['model'] for car in carInfo]
auction_sites = [car['auction_site'] for car in carInfo]
years = [car['year'] for car in carInfo]
engine_types = [car['engine_type'] for car in carInfo]
engine_sizes = [car['engine_size'] for car in carInfo]
mileages = [car['mileage'] for car in carInfo]
transmissions = [car['transmission'] for car in carInfo]
air_conditioners = [car['air_conditioner'] for car in carInfo]
scores = [car['score'] for car in carInfo]
exterior_interiors = [car['exterior_interior'] for car in carInfo]
start_prices = [float(''.join(car['start_price'].split(','))) for car in carInfo]
final_price = [float(''.join(car['final_price'].split(','))) for car in carInfo]
final_statuses = [car['final_status'] for car in carInfo]
lot_numbers = [car['lot_no'] for car in carInfo]
df = pd.DataFrame({
"MODEL": models,
"AUCTION SITE": auction_sites,
"YEAR": years,
"ENGINE TYPE": engine_types,
"ENGINE SIZE": engine_sizes,
"MILEAGE": mileages,
"TRANSMISSIONS": transmissions,
"AIR CONDITIONERS": air_conditioners,
"SCORES": scores,
"EXTERIOR-INTERIOR": exterior_interiors,
"START PRICE": start_prices,
"FINAL PRICE": final_price,
"FINAL STATUS": final_statuses,
"LOT_NO": lot_numbers
})
print(df.describe())
print(f"The average price is : {df['FINAL PRICE'].mean().astype(int)}")
return df["FINAL PRICE"].mean().astype(int)
if __name__ == "__main__":
price_bot = PriceFinderBot()
price_bot.start() ```
And here is the output
[Error Output][1]
[1]: https://i.sstatic.net/6GTDLIBM.png