Email Scraper Python

In diesem praxisnahen Tutorial zeige ich dir, wie du mit einem maßgeschneiderten Email Scraper auf Basis von Python, Scrapy und Playwright automatisch E-Mail-Adressen von Webseiten extrahierst – inklusive direkter Speicherung in Google Drive

Du willst automatisiert E-Mail-Adressen von Websites extrahieren – z. B. von Kontaktseiten – und die Ergebnisse direkt in Google Drive speichern?
Dann ist dieses Setup genau das Richtige für dich. In diesem Tutorial zeige ich dir, wie du mit Python, Scrapy und Playwright einen vollautomatischen Email Scraper baust – inklusive Google Drive Upload.

Was bringt ein Email Scraper?

  • Du sparst dir das manuelle Durchforsten von Websites.
  • Du bekommst saubere Listen mit E-Mail-Adressen als CSV.
  • Alles läuft automatisch: Von der URL-Liste bis zur Speicherung in Google Drive.
  • Perfekt für Sales, Recruiting, Outreach oder interne Datenaufbereitung.


Um die passenden Webadressen für den Scraper zu bekommen, verwende ich das Tool Scraptable. Damit kannst du gezielt nach Unternehmen in einem bestimmten Umkreis suchen – zum Beispiel alle Restaurants im Umkreis von 10 km um Berlin. Scraptable greift dabei auf die öffentlich sichtbaren Informationen aus der Google-Suche bzw. Google Maps zu und liefert dir strukturierte Daten wie Website-URL, Telefonnummer, Adresse, Öffnungszeiten & Branchenkategorie.

Was in den Ergebnissen jedoch nicht enthalten ist, sind E-Mail-Adressen – und genau hier kommt mein E-Mail-Scraper ins Spiel: Er nimmt die gesammelten Websites aus Scraptable (z. B. per urls.txt) und durchsucht jede davon automatisch nach öffentlich sichtbaren E-Mails. So entsteht eine vollständige CSV-Liste, perfekt für gezieltes Outreach oder interne Auswertungen.

Was brauch ich dafür?

  • Python 3
  • VS Code oder beliebigen Editor
  • scrapy + scrapy-playwright
  • pydrive2 für den Google Drive Upload
  • Eine Google API mit client_secrets.json
  • Optional: SerpAPI für automatisierte Google-Suchen

 

Vorbereitung: Pakete installieren

Öffne dein Terminal im Projektordner und führe Folgendes aus:


pip install scrapy scrapy-playwright pydrive2 python-dotenv
playwright install

  

Du brauchst außerdem eine Datei urls.txt, in der du alle Ziel-Websites einträgst (eine URL pro Zeile).

Der Email Scraper mit Playwright

Das ist der Kern: Ein Scrapy-Spider mit Playwright, der E-Mails erkennt – auch verschleierte Schreibweisen wie max (at) firma.de.
Er öffnet automatisch Seiten, klickt z. B. auf „Kontakt“-Links und versucht, E-Mails zu extrahieren.


import scrapy
import re
import os
import datetime
import csv
import asyncio
from urllib.parse import urlparse, urlsplit
from scrapy_playwright.page import PageMethod


class EmailSpider(scrapy.Spider):
    name = "email_spider"

    custom_settings = {
        "ROBOTSTXT_OBEY": False,
        "DOWNLOAD_DELAY": 0.3,
        "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
        "DEFAULT_REQUEST_HEADERS": {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
            "Accept-Encoding": "gzip, deflate, br",
        },
        "PLAYWRIGHT_BROWSER_TYPE": "chromium",
        "PLAYWRIGHT_LAUNCH_OPTIONS": {
            "headless": False,
            "args": [
                "--no-sandbox",
                "--disable-blink-features=AutomationControlled",
                "--disable-infobars",
                "--disable-extensions",
                "--disable-dev-shm-usage",
                "--disable-gpu",
                "--window-size=1920,1080",
            ],
        },
        "DOWNLOAD_HANDLERS": {
            "http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
            "https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
        },
        "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
        # harte Obergrenzen – wir wollen lieber überspringen
        "PLAYWRIGHT_DEFAULT_NAVIGATION_TIMEOUT": 6000,  # 6s
        "DOWNLOAD_TIMEOUT": 8,
        # "LOG_LEVEL": "INFO",
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.found_emails = set()
        self.rows = []

    # Aggressives Blocking: ALLE Scripts, Assets, Fremd-Domains kappen
    async def _smart_block(self, route):
        req = route.request
        url = req.url
        rtype = req.resource_type

        # Wir brauchen nur das reine HTML
        if rtype in {"image", "media", "font", "stylesheet", "websocket", "eventsource", "ping", "script"}:
            return await route.abort()

        host = urlsplit(url).netloc
        frame_url = getattr(req.frame, "url", "") or url
        frame_host = urlsplit(frame_url).netloc
        same_origin = (host == frame_host)

        deny_hosts = (
            "doubleclick.net", "googletagmanager.com", "google-analytics.com",
            "analytics.google.com", "g.doubleclick.net", "bat.bing.com",
            "clarity.ms", "sentry-cdn.com", "cookiebot.com",
            "quantummetric.com", "thehotelsnetwork.com", "hotelchamp.com",
        )
        if any(d in host for d in deny_hosts):
            return await route.abort()

        # Cross-Origin XHR/Fetch auch blocken
        if not same_origin and rtype in {"xhr", "fetch"}:
            return await route.abort()

        return await route.continue_()

    def start_requests(self):
        try:
            with open("urls.txt", "r") as f:
                urls = [line.strip() for line in f if line.strip()]
        except FileNotFoundError:
            raise ValueError("urls.txt nicht gefunden. Lege eine Datei mit URLs im Projektordner an.")

        for url in urls:
            self.logger.info(f"Starte URL: {url}")
            yield scrapy.Request(
                url=url,
                callback=self.parse_kontakt_impressum_only,
                errback=self.handle_error,
                meta={
                    "playwright": True,
                    "playwright_include_page": True,  # wir schließen den Tab selbst
                    "playwright_page_methods": [
                        PageMethod("add_init_script", self._stealth_script()),
                        PageMethod("route", "**/*", self._smart_block),

                        # kurze Timeouts direkt an der Page
                        PageMethod("set_default_timeout", 5000),              # 5s
                        PageMethod("set_default_navigation_timeout", 5000),   # 5s

                        # Basis-Seite laden
                        PageMethod("goto", url, wait_until="domcontentloaded"),

                        # simple Overlays killen
                        PageMethod(
                            "eval_on_selector_all",
                            "[id*='cookie'], [id*='consent'], [id*='overlay']",
                            "els => els.forEach(el => el.remove())"
                        ),

                        # Internen Kontakt/Impressum-Link klicken (Kontakt bevorzugt)
                        PageMethod("evaluate", """
                        () => {
                          const origin = location.origin;
                          const links = Array.from(document.querySelectorAll('a[href]'));
                          const find = (re) => links.find(a => {
                            const href = a.getAttribute('href') || '';
                            const url = new URL(href, location.href);
                            return url.origin === origin && re.test(url.pathname);
                          });
                          const kontakt = find(/kontakt/i);
                          const impressum = find(/impressum/i);
                          const target = kontakt || impressum;
                          if (target) { target.click(); return true; }
                          return false;
                        }
                        """),

                        # minimales Warten + Stop – verhindert endlose XHR-Schleifen
                        PageMethod("wait_for_load_state", "domcontentloaded"),
                        PageMethod("wait_for_timeout", 600),
                        PageMethod("evaluate", "window.stop();"),
                    ],
                }
            )

    def parse_kontakt_impressum_only(self, response):
        path = urlparse(response.url).path.lower()
        if not ("/kontakt" in path or "/impressum" in path):
            self.logger.info(f"➡️ Keine Kontakt/Impressum-Seite auf {response.request.url} – weiter.")
            page = response.meta.get("playwright_page")
            if page:
                try:
                    asyncio.get_event_loop().create_task(page.close())
                except Exception:
                    pass
            return

        self.logger.info(f"🔎 Analysiere NUR: {response.url}")

        text_nodes = response.xpath('//main//text() | //body//text()').getall()
        text_content = " ".join(t.strip() for t in text_nodes if t.strip())[:200_000]

        raw_emails = re.findall(
            r"[a-zA-Z0-9._%+-]+\s?(?:@|[\[\(]{1}at[\]\)]{1}|\s+at\s+)\s?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            text_content
        )
        cleaned_emails = set(
            re.sub(r"\s?(?:\[\s*at\s*\]|\(at\)|\s+at\s+)", "@", e.replace(" ", ""))
            for e in raw_emails
        )

        new_emails = cleaned_emails - self.found_emails
        if new_emails:
            self.logger.info(f"✅ {len(new_emails)} neue E-Mails auf {response.url} gefunden")
        else:
            self.logger.info(f"❌ Keine E-Mails auf {response.url} gefunden")

        for email in new_emails:
            self.found_emails.add(email)
            self.rows.append({"source": response.url, "email": email})
            yield {"source": response.url, "email": email}

        # Tab sicher schließen (async)
        page = response.meta.get("playwright_page")
        if page:
            try:
                asyncio.get_event_loop().create_task(page.close())
            except Exception:
                pass

    def handle_error(self, failure):
        self.logger.warning(f"❌ Fehler bei {failure.request.url}: {failure.value}")
        # Seite schließen, falls vorhanden
        page = failure.request.meta.get("playwright_page")
        if page:
            try:
                asyncio.get_event_loop().create_task(page.close())
            except Exception:
                pass

    def _stealth_script(self):
        return """
        () => {
            Object.defineProperty(navigator, 'webdriver', {get: () => false});
            Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
            Object.defineProperty(navigator, 'languages', {get: () => ['de-DE', 'de', 'en-US']});
            Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
            Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications'
                    ? Promise.resolve({ state: Notification.permission })
                    : originalQuery(parameters)
            );
            return true;
        }
        """

    def closed(self, reason):
        self.logger.info("🚀 Scraper abgeschlossen, lade direkt als Google Sheet hoch ...")
        try:
            from drive_upload import upload_to_drive
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
            filename = f"emails_{timestamp}.csv"

            with open(filename, "w", newline="", encoding="utf-8") as f:
                writer = csv.DictWriter(f, fieldnames=["source", "email"])
                writer.writeheader()
                for row in self.rows:
                    writer.writerow(row)

            upload_to_drive(filename)
            os.remove(filename)

        except Exception as e:
            self.logger.error(f"❌ Fehler beim Google Drive Upload: {e}")


    

Upload nach Google Drive

Nach dem Scrape wird die CSV automatisch in Google Drive hochgeladen – mit Zeitstempel.
Das funktioniert über PyDrive2 und deine gespeicherten Credentials (client_secrets.json + mycreds.txt).


from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive

import os

def upload_to_drive(local_file_path, remote_filename=None):
    print("▶️ Upload gestartet …")

    if not os.path.exists(local_file_path):
        print(f"⚠️ Datei nicht gefunden: {local_file_path}")
        return

    try:
        print("🔐 Authentifiziere mit Google …")

        gauth = GoogleAuth()
        gauth.settings['client_config_file'] = 'client_secrets.json'
        gauth.settings['oauth_scope'] = ['https://www.googleapis.com/auth/drive.file']

        gauth.LoadCredentialsFile("mycreds.txt")

        if gauth.credentials is None:
            print("🌐 Öffne Browser zur Anmeldung …")
            gauth.LocalWebserverAuth()
        elif gauth.access_token_expired:
            print("♻️ Token abgelaufen, erneuere …")
            gauth.Refresh()
        else:
            print("✅ Token gültig, verwende gespeicherte Credentials …")
            gauth.Authorize()

        gauth.SaveCredentialsFile("mycreds.txt")
    except Exception as e:
        print(f"❌ Authentifizierungsfehler: {e}")
        return


    

Bonus: Google Search Scraper

Wenn du gar keine URL-Liste hast, kannst du mit SerpAPI automatisiert bei Google suchen – z. B. nach „Eventagenturen München“ – und die Treffer als CSV speichern.


import scrapy
import requests
import csv
import os

class GoogleSearchSpider(scrapy.Spider):
    name = "google_search"

    def start_requests(self):
        self.visited_urls = []
        query = "Eventagenturen München"
        urls = self.get_google_results(query, pages=3)
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def get_google_results(self, query, pages=3):
        api_key = "HIER"  # 🔑 Deinen API Key eintragen
        results = []
        for page in range(pages):
            start = page * 10
            params = {
                "engine": "google",
                "q": query,
                "start": start,
                "api_key": api_key
            }
            response = requests.get("https://serpapi.com/search", params=params)
            data = response.json()
            for result in data.get("organic_results", []):
                link = result.get("link")
                if link:
                    results.append(link)
        return results

    def parse(self, response):
        url = response.url
        self.logger.info(f"✅ Besucht: {url}")
        self.visited_urls.append(url)
        yield {"url": url}

    def closed(self, reason):
        # Datei speichern beim Beenden
        filename = "google_urls.csv"
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["url"])
            for url in self.visited_urls:
                writer.writerow([url])
        self.logger.info(f"📁 URLs gespeichert in: {os.path.abspath(filename)}")


    

Wie lass ich den Crawler durchlaufen?

  1. URLs in urls.txt eintragen
  2. Spider starten über den folgenden Befehl
  3. Beim ersten Lauf wirst du in deinem Browser zur Google-Freigabe weitergeleitet. Danach ist das Token gespeichert.

 


scrapy crawl email_spider

  

Ablauf der Google-Authentifizierung

Wenn du mit Google Drive arbeitest, solltest du wissen, dass der Authentifizierungsprozess in regelmäßigen Abständen erneuert werden muss – meistens alle paar Tage. Das liegt daran, dass das von Google generierte Zugangstoken (gespeichert in der Datei mycreds.txt) nach einer gewissen Zeit abläuft. Um die Authentifizierung zu erneuern, musst du einfach die Datei mycreds.txt löschen und den Scraper wie gewohnt starten – zum Beispiel mit einer Test-URL. Sobald der Upload-Vorgang aufgerufen wird, öffnet sich automatisch ein Browserfenster, in dem du dich bei deinem Google-Konto anmeldest und den Zugriff bestätigst. Danach wird eine neue mycreds.txt erzeugt und alle künftigen Uploads funktionieren wieder automatisch im Hintergrund.

 

Du brauchst Unterstützung?

Wenn du Unterstützung brauchst oder dein Setup nicht funktioniert, schreib mir einfach.
Ich helfe gerne – oder bau dir den Scraper so, wie du ihn brauchst.

Name *
Telefonnummer *
E-Mail-Adresse *
Deine Nachricht
Picture of Stefan

Stefan

Contentrise Media

About me

Mein Name ist Stefan & ich bin die Person hinter Contentrise. Ganz nach dem Motto „here to create“ produziere ich hochwertige Videos für die unterschiedlichsten Use Cases.

Recent Posts

Recent Projects