Email Scraper Python

In diesem praxisnahen Tutorial zeige ich dir, wie du mit einem maßgeschneiderten Email Scraper auf Basis von Python, Scrapy und Playwright automatisch E-Mail-Adressen von Webseiten extrahierst – inklusive direkter Speicherung in Google Drive

Du willst automatisiert E-Mail-Adressen von Websites extrahieren – z. B. von Kontaktseiten – und die Ergebnisse direkt in Google Drive speichern?
Dann ist dieses Setup genau das Richtige für dich. In diesem Tutorial zeige ich dir, wie du mit Python, Scrapy und Playwright einen vollautomatischen Email Scraper baust – inklusive Google Drive Upload.

Was bringt ein Email Scraper?

  • Du sparst dir das manuelle Durchforsten von Websites.
  • Du bekommst saubere Listen mit E-Mail-Adressen als CSV.
  • Alles läuft automatisch: Von der URL-Liste bis zur Speicherung in Google Drive.
  • Perfekt für Sales, Recruiting, Outreach oder interne Datenaufbereitung.

Was brauch ich dafür?

  • Python 3
  • VS Code oder beliebigen Editor
  • scrapy + scrapy-playwright
  • pydrive2 für den Google Drive Upload
  • Eine Google API mit client_secrets.json
  • Optional: SerpAPI für automatisierte Google-Suchen

 

Vorbereitung: Pakete installieren

Öffne dein Terminal im Projektordner und führe Folgendes aus:


pip install scrapy scrapy-playwright pydrive2 python-dotenv
playwright install

  

Du brauchst außerdem eine Datei urls.txt, in der du alle Ziel-Websites einträgst (eine URL pro Zeile).

Der Email Scraper mit Playwright

Das ist der Kern: Ein Scrapy-Spider mit Playwright, der E-Mails erkennt – auch verschleierte Schreibweisen wie max (at) firma.de.
Er öffnet automatisch Seiten, klickt z. B. auf „Kontakt“-Links und versucht, E-Mails zu extrahieren.


import scrapy
import re
import os
import datetime
import csv
from scrapy_playwright.page import PageMethod


class EmailSpider(scrapy.Spider):
    name = "email_spider"

    custom_settings = {
        "ROBOTSTXT_OBEY": False,
        "DOWNLOAD_DELAY": 1,
        "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
        "DEFAULT_REQUEST_HEADERS": {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
        },
        "PLAYWRIGHT_BROWSER_TYPE": "chromium",
        "PLAYWRIGHT_LAUNCH_OPTIONS": {
            "headless": False,
            "args": [
                "--no-sandbox",
                "--disable-blink-features=AutomationControlled",
                "--disable-infobars",
                "--disable-extensions",
                "--disable-dev-shm-usage",
                "--disable-gpu",
                "--window-size=1920,1080",
            ],
        },
        "DOWNLOAD_HANDLERS": {
            "http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
            "https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
        },
        "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
        "PLAYWRIGHT_DEFAULT_NAVIGATION_TIMEOUT": 60000,
        "DOWNLOAD_TIMEOUT": 90,
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.found_emails = set()
        self.rows = []

    def start_requests(self):
        try:
            with open("urls.txt", "r") as f:
                urls = [line.strip() for line in f if line.strip()]
        except FileNotFoundError:
            raise ValueError("urls.txt nicht gefunden. Lege eine Datei mit URLs im Projektordner an.")

        for url in urls:
            self.logger.info(f"Starte URL: {url}")
            yield scrapy.Request(
                url=url,
                callback=self.parse,
                errback=self.handle_error,
                meta={
                    "playwright": True,
                    "playwright_include_page": True,
                    "playwright_page_methods": [
                        PageMethod("add_init_script", self._stealth_script()),
                        PageMethod("eval_on_selector_all", "[id*='cookie'], [id*='consent'], [id*='overlay']", "els => els.forEach(el => el.remove())"),
                        PageMethod("wait_for_selector", "body"),
                        PageMethod("evaluate", """() => {
                            const link = Array.from(document.querySelectorAll("a")).find(a =>
                                a.href.toLowerCase().includes("kontakt") ||
                                a.href.toLowerCase().includes("impressum")
                            );
                            if (link) link.click();
                        }"""),
                        PageMethod("wait_for_timeout", 2000),
                    ],
                }
            )

    def parse(self, response):
        self.logger.info(f"Analysiere Seite: {response.url}")
        text_content = " ".join(response.xpath('//body//text()').getall())

        raw_emails = re.findall(
            r"[a-zA-Z0-9._%+-]+\s?(?:@|[\[\(]{1}at[\]\)]{1}|\s+at\s+)\s?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            text_content
        )
        cleaned_emails = set(
            re.sub(r"\s?(?:\[\s*at\s*\]|\(at\)|\sat\s)", "@", e.replace(" ", ""))
            for e in raw_emails
        )

        new_emails = cleaned_emails - self.found_emails
        if new_emails:
            self.logger.info(f"✅ {len(new_emails)} neue E-Mails auf {response.url} gefunden")
        else:
            self.logger.info(f"❌ Keine E-Mails auf {response.url} gefunden")

        for email in new_emails:
            self.found_emails.add(email)
            self.rows.append({"source": response.url, "email": email})
            yield {"source": response.url, "email": email}

    def handle_error(self, failure):
        self.logger.warning(f"❌ Fehler bei {failure.request.url}: {failure.value}")

    def _stealth_script(self):
        return """
        () => {
            Object.defineProperty(navigator, 'webdriver', {get: () => false});
            Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
            Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
            Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
            Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications'
                    ? Promise.resolve({ state: Notification.permission })
                    : originalQuery(parameters)
            );
            return true;
        }
        """

    def closed(self, reason):
        self.logger.info("🚀 Scraper abgeschlossen, lade direkt als Google Sheet hoch ...")
        try:
            from drive_upload import upload_to_drive
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
            filename = f"emails_{timestamp}.csv"

            with open(filename, "w", newline="", encoding="utf-8") as f:
                writer = csv.DictWriter(f, fieldnames=["source", "email"])
                writer.writeheader()
                for row in self.rows:
                    writer.writerow(row)

            upload_to_drive(filename)
            os.remove(filename)

        except Exception as e:
            self.logger.error(f"❌ Fehler beim Google Drive Upload: {e}")

    

Upload nach Google Drive

Nach dem Scrape wird die CSV automatisch in Google Drive hochgeladen – mit Zeitstempel.
Das funktioniert über PyDrive2 und deine gespeicherten Credentials (client_secrets.json + mycreds.txt).


from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive

import os

def upload_to_drive(local_file_path, remote_filename=None):
    print("▶️ Upload gestartet …")

    if not os.path.exists(local_file_path):
        print(f"⚠️ Datei nicht gefunden: {local_file_path}")
        return

    try:
        print("🔐 Authentifiziere mit Google …")

        gauth = GoogleAuth()
        gauth.settings['client_config_file'] = 'client_secrets.json'
        gauth.settings['oauth_scope'] = ['https://www.googleapis.com/auth/drive.file']

        gauth.LoadCredentialsFile("mycreds.txt")

        if gauth.credentials is None:
            print("🌐 Öffne Browser zur Anmeldung …")
            gauth.LocalWebserverAuth()
        elif gauth.access_token_expired:
            print("♻️ Token abgelaufen, erneuere …")
            gauth.Refresh()
        else:
            print("✅ Token gültig, verwende gespeicherte Credentials …")
            gauth.Authorize()

        gauth.SaveCredentialsFile("mycreds.txt")
    except Exception as e:
        print(f"❌ Authentifizierungsfehler: {e}")
        return


    

Bonus: Google Search Scraper

Wenn du gar keine URL-Liste hast, kannst du mit SerpAPI automatisiert bei Google suchen – z. B. nach „Eventagenturen München“ – und die Treffer als CSV speichern.


import scrapy
import requests
import csv
import os

class GoogleSearchSpider(scrapy.Spider):
    name = "google_search"

    def start_requests(self):
        self.visited_urls = []
        query = "Eventagenturen München"
        urls = self.get_google_results(query, pages=3)
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def get_google_results(self, query, pages=3):
        api_key = "HIER"  # 🔑 Deinen API Key eintragen
        results = []
        for page in range(pages):
            start = page * 10
            params = {
                "engine": "google",
                "q": query,
                "start": start,
                "api_key": api_key
            }
            response = requests.get("https://serpapi.com/search", params=params)
            data = response.json()
            for result in data.get("organic_results", []):
                link = result.get("link")
                if link:
                    results.append(link)
        return results

    def parse(self, response):
        url = response.url
        self.logger.info(f"✅ Besucht: {url}")
        self.visited_urls.append(url)
        yield {"url": url}

    def closed(self, reason):
        # Datei speichern beim Beenden
        filename = "google_urls.csv"
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["url"])
            for url in self.visited_urls:
                writer.writerow([url])
        self.logger.info(f"📁 URLs gespeichert in: {os.path.abspath(filename)}")


    

Wie lass ich den Crawler durchlaufen?

  1. URLs in urls.txt eintragen
  2. Spider starten über den folgenden Befehl
  3. Beim ersten Lauf wirst du in deinem Browser zur Google-Freigabe weitergeleitet. Danach ist das Token gespeichert.

 


scrapy crawl email_spider

  
a

Du brauchst Unterstützung?

Wenn du Unterstützung brauchst oder dein Setup nicht funktioniert, schreib mir einfach.
Ich helfe gerne – oder bau dir den Scraper so, wie du ihn brauchst.

Name *
Telefonnummer *
E-Mail-Adresse *
Deine Nachricht
Picture of Stefan

Stefan

Contentrise Media

About me

Mein Name ist Stefan & ich bin die Person hinter Contentrise. Ganz nach dem Motto „here to create“ produziere ich hochwertige Videos für die unterschiedlichsten Use Cases.

Recent Posts

Recent Projects