Định hướng:
1. Đọc file danh sách (Input)
2. Lấy file trong mỗi dòng của danh sách, lưu file trong folder (Files)
3. Cập nhật dòng đã lấy vào file mới (Output): nếu thành công Update cột TaiVe=1, nếu thất bại Taive=0
Với các trang chống bot:
- Delay time
- Dùng cookies đăng nhập bằng tài khoản
- Lý tưởng nhất là crawl trong Windows với trình duyệt thật
Script dưới đây là ví dụ chạy trong wsl Ubuntu, chromium headless. Cách làm này ổn với đa số site thông thường.
Nếu gặp khó khăn cân nhắc xem hướng dẫn crawl trong Windows nhé.
# -*- coding: utf-8 -*-import osimport reimport csvimport timeimport jsonimport randomimport shutilimport mimetypesimport requestsimport pandas as pdfrom urllib.parse import urljoin, unquotefrom datetime import datetime
# Selenium + UCimport undetected_chromedriver as ucfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.chrome.options import Options as ChromeOptionsfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import ( TimeoutException, WebDriverException, NoSuchWindowException,)
# ============== CẤU HÌNH ==============LOG_FILE = "3LayFiles.log"OUTPUT_CSV = "3LayFiles_Output.csv"COOKIE_FILE = "3LayFiles.json"INPUT_CSV = "3LayFiles_Input.csv"BASE_URL = "https://WebSiteNguonLayData.vn"SAVE_ROOT = "files"
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36"HEADLESS = os.environ.get("HEADLESS", "1") != "0" # set HEADLESS=0 để chạy headful (nếu có X server)
# ============== TIỆN ÍCH ==============def write_log(msg: str): ts = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") line = f"{ts} {msg}" with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(line + "\n") print(line)
def to01(x): try: return 1 if int(str(x).strip() or "0") == 1 else 0 except: return 1 if str(x).strip().lower() in ("true", "yes", "y") else 0
def load_cookies_from_json(path: str): with open(path, "r", encoding="utf-8") as f: raw = json.load(f) # giữ nguyên name/value; domain sẽ set khi add_cookie return [{"name": c["name"], "value": c["value"], "domain": "WebSiteNguonLayData.vn"} for c in raw]
def guess_extension(href: str, content_type: str): ext = mimetypes.guess_extension(content_type or "") if ext and ext != ".bin": return ext href = unquote((href or "").lower()) if ".pdf" in href: return ".pdf" if ".docx" in href: return ".docx" if ".doc" in href: return ".doc" return ".bin"
def clean_filename(text: str): text = re.sub(r"[^\w\s\-]", "", text or "") text = re.sub(r"\s+", "_", text.strip()) return (text or "file")[:80]
def unique_path(path: str): if not os.path.exists(path): return path base, ext = os.path.splitext(path) i = 1 while True: cand = f"{base}_{i}{ext}" if not os.path.exists(cand): return cand i += 1
# ============== DRIVER (undetected-chromedriver) ==============def make_driver(): opts: ChromeOptions = uc.ChromeOptions() if HEADLESS: opts.add_argument("--headless=new") opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-gpu") opts.add_argument("--window-size=1920,1080") opts.add_argument("--no-first-run") opts.add_argument("--no-default-browser-check") opts.add_argument("--disable-background-networking") opts.add_argument("--disable-sync") opts.add_argument("--disable-extensions") opts.add_argument("--disable-features=TranslateUI,AutomationControlled") opts.add_argument(f"--user-agent={UA}")
# Nếu có Chrome .deb -> dùng (ổn định hơn snap Chromium) chrome_bin = os.environ.get("CHROME_BIN", "").strip() or "/usr/bin/google-chrome-stable" if os.path.exists(chrome_bin): opts.binary_location = chrome_bin write_log(f"🧭 Dùng trình duyệt: {chrome_bin}") else: write_log("🧭 Dùng trình duyệt mặc định của UC (có thể là snap Chromium)")
# use_subprocess giúp ổn định trong WSL driver = uc.Chrome(options=opts, use_subprocess=True) caps = driver.capabilities write_log(f"✅ UC WebDriver OK: {caps.get('browserName')} {caps.get('browserVersion')}, headless={HEADLESS}") return driver
# ============== CLOUDFLARE ==============def wait_cloudflare_clearance(driver, max_wait=60): """Đợi qua màn Cloudflare. Trả về True nếu qua được (title đổi & có cf_clearance).""" start = time.time() while time.time() - start < max_wait: title = (driver.title or "").lower() has_clearance = any(c.get("name") == "cf_clearance" for c in driver.get_cookies()) write_log(f"🔍 CF check | title='{driver.title}' | cf_clearance={has_clearance}") if "just a moment" not in title and "cloudflare" not in title and has_clearance: return True time.sleep(1.5) return False
# ============== MỞ TAB / TÌM LINK ==============def open_download_tab_or_fallback(driver, uuid, timeout_tab=12): """ Cố mở tab 'Tải về' bằng nhiều selector. Nếu không thấy, fallback: quét link download trên toàn trang. Trả về: (webelements, used_fallback) """ selectors = [ (By.ID, "aTabTaiVe"), (By.CSS_SELECTOR, 'a[href="#tab8"]'), (By.CSS_SELECTOR, 'a[data-target="#tab8"]'), (By.XPATH, "//a[contains(normalize-space(.),'Tải về') or contains(@title,'Tải về')]"), (By.XPATH, "//li[a[contains(normalize-space(.),'Tải về')]]/a"), ] for by, sel in selectors: try: el = WebDriverWait(driver, 6).until(EC.element_to_be_clickable((by, sel))) driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el) time.sleep(0.2) driver.execute_script("arguments[0].click();", el) try: WebDriverWait(driver, timeout_tab).until(EC.presence_of_element_located((By.CSS_SELECTOR, '#tab8'))) except TimeoutException: pass links = driver.find_elements(By.CSS_SELECTOR, '#tab8 a[href*="download.aspx"]') if links: return links, False except Exception: continue
# Fallback toàn trang links_all = driver.find_elements(By.CSS_SELECTOR, 'a[href*="download.aspx"]') if links_all: write_log(f"🔎 {uuid}: Fallback CSS toàn trang: {len(links_all)} link") return links_all, True
links_all2 = driver.find_elements(By.XPATH, "//a[contains(translate(@href,'DOWNLOAD','download'),'download.aspx')]") if links_all2: write_log(f"🔎 {uuid}: Fallback XPath toàn trang: {len(links_all2)} link") return links_all2, True
return [], False
# ============== TẢI FILE ==============def classify_and_save_file(href, link_text, session, uuid, word_counter): full_url = urljoin(BASE_URL, href) headers = {"User-Agent": UA, "Referer": BASE_URL} try: resp = session.get(full_url, headers=headers, timeout=120, stream=True) if resp.status_code != 200: write_log(f"❌ {uuid}: Lỗi tải {full_url} (HTTP {resp.status_code})") return False, word_counter
cd = resp.headers.get("Content-Disposition", "") or "" filename = None m = re.search(r'filename="?([^";]+)"?', cd) if m: filename = m.group(1)
if not filename: ext = guess_extension(href, resp.headers.get("Content-Type")) href_lower = (href or "").lower() if "vietnamesehyperlink" in href_lower or (ext == ".doc" and "part=-1" in href_lower): word_counter += 1 filename = f"w{uuid}_{word_counter}{ext}" else: filename = clean_filename(link_text) + ext
ext = os.path.splitext(filename)[-1].lower() subfolder = "pdf" if ext == ".pdf" else ("doc" if ext in [".doc", ".docx"] else "other") folder = os.path.join(SAVE_ROOT, str(uuid), subfolder) os.makedirs(folder, exist_ok=True) path = unique_path(os.path.join(folder, filename))
with open(path, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk)
write_log(f"✅ {uuid}: Đã tải {path}") return True, word_counter except Exception as e: write_log(f"❌ {uuid}: Exception khi tải {full_url} - {e}") return False, word_counter
# ============== LUỒNG 1 DÒNG ==============def download_files_for_row(uuid, url, cookies): error_count = 0 word_counter = 0 driver = make_driver() success = True try: # 1) Vào BASE + nạp cookie (nếu có cf_clearance từ trình duyệt thật thì càng tốt) driver.get(BASE_URL) time.sleep(1.0) for c in cookies: try: driver.add_cookie({"name": c["name"], "value": c["value"]}) except Exception: pass write_log(f"🍪 Sau khi nạp cookie: total={len(driver.get_cookies())} | cf_clearance=" f"{any(x['name']=='cf_clearance' for x in driver.get_cookies())}")
# 2) Vào URL văn bản driver.get(url) WebDriverWait(driver, 20).until(EC.presence_of_element_located((By.TAG_NAME, "body"))) write_log(f"📄 Title sau khi mở URL: {driver.title}")
# 3) Chờ vượt Cloudflare nếu có if not wait_cloudflare_clearance(driver, max_wait=60): write_log(f"❌ {uuid}: Bị chặn Cloudflare (không qua được trong 60s)") return False write_log(f"✅ {uuid}: Qua Cloudflare")
# 4) Lấy link tải (tab hoặc fallback) links, used_fallback = open_download_tab_or_fallback(driver, uuid) if not links: write_log(f"❌ {uuid}: Không tìm thấy tab/link Tải về (kể cả fallback)") return False source = "fallback" if used_fallback else "tab8" write_log(f"🔗 {uuid}: Tìm thấy {len(links)} link (nguồn: {source})")
# 5) Chuyển cookie từ WebDriver sang requests.Session để tải file session = requests.Session() session.headers.update({"User-Agent": UA, "Referer": BASE_URL}) for ck in driver.get_cookies(): try: session.cookies.set(ck["name"], ck["value"], domain=".WebSiteNguonLayData.vn") except Exception: pass
# 6) Tải lần lượt for link in links: href = link.get_attribute("href") text = (link.text or "").strip() ok, word_counter = classify_and_save_file(href, text, session, uuid, word_counter) if not ok: error_count += 1 if error_count >= 3: write_log(f"⚠️ {uuid}: Lỗi liên tiếp 3 file – bỏ dòng này") success = False break
except (NoSuchWindowException, WebDriverException, TimeoutException) as e: write_log(f"❌ {uuid}: Lỗi WebDriver - {e}") success = False except Exception as e: write_log(f"❌ {uuid}: Lỗi không xác định - {e}") success = False finally: try: driver.quit() except Exception: pass return success
# ============== LUỒNG CHÍNH ==============def run(): # Input df = pd.read_csv(INPUT_CSV, encoding="utf-8") df = df.loc[:, ~df.columns.str.contains('^Unnamed', case=False, na=False)] for col in ["uuid", "url", "TaiVe"]: if col not in df.columns: raise RuntimeError("Thiếu cột bắt buộc: uuid/url/TaiVe") df["uuid"] = df["uuid"].astype(str).str.strip() df["url"] = df["url"].astype(str).str.strip() df["TaiVe"] = df["TaiVe"].map(to01).fillna(0).astype(int)
cookies = load_cookies_from_json(COOKIE_FILE) write_log(f"📊 Tổng dòng: {len(df)} | Sẽ xử lý TaiVe==0: {(df['TaiVe'] == 0).sum()}")
# Output fieldnames = list(df.columns) with open(OUTPUT_CSV, "w", newline='', encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader()
# Loop for _, row in df.iterrows(): uuid = str(row["uuid"]).strip() url = str(row["url"]).strip()
if row["TaiVe"] == 0: write_log(f"🚀 Bắt đầu xử lý {uuid}") ok = download_files_for_row(uuid, url, cookies) row["TaiVe"] = 1 if ok else 0 write_log(f"📝 Cập nhật {uuid}: TaiVe={row['TaiVe']}") # Nghỉ ngẫu nhiên sleep_time = random.uniform(60, 150) write_log(f"🕒 Nghỉ {sleep_time:.2f} giây...") time.sleep(sleep_time) else: write_log(f"⏭️ Bỏ qua {uuid} (TaiVe=1) – ghi nguyên trạng")
# Ghi ra OUTPUT with open(OUTPUT_CSV, "a", newline='', encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writerow({k: row.get(k, "") for k in fieldnames})
if __name__ == "__main__": # Dọn thư mục lưu (tạo nếu chưa có) os.makedirs(SAVE_ROOT, exist_ok=True) run()
- Đăng nhập để gửi ý kiến