Du willst automatisiert E-Mail-Adressen von Websites extrahieren – z. B. von Kontaktseiten – und die Ergebnisse direkt in Google Drive speichern?
Dann ist dieses Setup genau das Richtige für dich. In diesem Tutorial zeige ich dir, wie du mit Python, Scrapy und Playwright einen vollautomatischen Email Scraper baust – inklusive Google Drive Upload.
Was bringt ein Email Scraper?
- Du sparst dir das manuelle Durchforsten von Websites.
- Du bekommst saubere Listen mit E-Mail-Adressen als CSV.
- Alles läuft automatisch: Von der URL-Liste bis zur Speicherung in Google Drive.
- Perfekt für Sales, Recruiting, Outreach oder interne Datenaufbereitung.
Um die passenden Webadressen für den Scraper zu bekommen, verwende ich das Tool Scraptable. Damit kannst du gezielt nach Unternehmen in einem bestimmten Umkreis suchen – zum Beispiel alle Restaurants im Umkreis von 10 km um Berlin. Scraptable greift dabei auf die öffentlich sichtbaren Informationen aus der Google-Suche bzw. Google Maps zu und liefert dir strukturierte Daten wie Website-URL, Telefonnummer, Adresse, Öffnungszeiten & Branchenkategorie.
Was in den Ergebnissen jedoch nicht enthalten ist, sind E-Mail-Adressen – und genau hier kommt mein E-Mail-Scraper ins Spiel: Er nimmt die gesammelten Websites aus Scraptable (z. B. per urls.txt) und durchsucht jede davon automatisch nach öffentlich sichtbaren E-Mails. So entsteht eine vollständige CSV-Liste, perfekt für gezieltes Outreach oder interne Auswertungen.
Was brauch ich dafür?
- Python 3
- VS Code oder beliebigen Editor
- scrapy + scrapy-playwright
- pydrive2 für den Google Drive Upload
- Eine Google API mit client_secrets.json
- Optional: SerpAPI für automatisierte Google-Suchen
Vorbereitung: Pakete installieren
Öffne dein Terminal im Projektordner und führe Folgendes aus:
pip install scrapy scrapy-playwright pydrive2 python-dotenv
playwright install
Du brauchst außerdem eine Datei urls.txt
, in der du alle Ziel-Websites einträgst (eine URL pro Zeile).
Der Email Scraper mit Playwright
Das ist der Kern: Ein Scrapy-Spider mit Playwright, der E-Mails erkennt – auch verschleierte Schreibweisen wie max (at) firma.de
.
Er öffnet automatisch Seiten, klickt z. B. auf „Kontakt“-Links und versucht, E-Mails zu extrahieren.
import scrapy
import re
import os
import datetime
import csv
import asyncio
from urllib.parse import urlparse, urlsplit
from scrapy_playwright.page import PageMethod
class EmailSpider(scrapy.Spider):
name = "email_spider"
custom_settings = {
"ROBOTSTXT_OBEY": False,
"DOWNLOAD_DELAY": 0.3,
"USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"DEFAULT_REQUEST_HEADERS": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.9,en-US;q=0.8,en;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
},
"PLAYWRIGHT_BROWSER_TYPE": "chromium",
"PLAYWRIGHT_LAUNCH_OPTIONS": {
"headless": False,
"args": [
"--no-sandbox",
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-extensions",
"--disable-dev-shm-usage",
"--disable-gpu",
"--window-size=1920,1080",
],
},
"DOWNLOAD_HANDLERS": {
"http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
"https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
},
"TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
# harte Obergrenzen – wir wollen lieber überspringen
"PLAYWRIGHT_DEFAULT_NAVIGATION_TIMEOUT": 6000, # 6s
"DOWNLOAD_TIMEOUT": 8,
# "LOG_LEVEL": "INFO",
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.found_emails = set()
self.rows = []
# Aggressives Blocking: ALLE Scripts, Assets, Fremd-Domains kappen
async def _smart_block(self, route):
req = route.request
url = req.url
rtype = req.resource_type
# Wir brauchen nur das reine HTML
if rtype in {"image", "media", "font", "stylesheet", "websocket", "eventsource", "ping", "script"}:
return await route.abort()
host = urlsplit(url).netloc
frame_url = getattr(req.frame, "url", "") or url
frame_host = urlsplit(frame_url).netloc
same_origin = (host == frame_host)
deny_hosts = (
"doubleclick.net", "googletagmanager.com", "google-analytics.com",
"analytics.google.com", "g.doubleclick.net", "bat.bing.com",
"clarity.ms", "sentry-cdn.com", "cookiebot.com",
"quantummetric.com", "thehotelsnetwork.com", "hotelchamp.com",
)
if any(d in host for d in deny_hosts):
return await route.abort()
# Cross-Origin XHR/Fetch auch blocken
if not same_origin and rtype in {"xhr", "fetch"}:
return await route.abort()
return await route.continue_()
def start_requests(self):
try:
with open("urls.txt", "r") as f:
urls = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
raise ValueError("urls.txt nicht gefunden. Lege eine Datei mit URLs im Projektordner an.")
for url in urls:
self.logger.info(f"Starte URL: {url}")
yield scrapy.Request(
url=url,
callback=self.parse_kontakt_impressum_only,
errback=self.handle_error,
meta={
"playwright": True,
"playwright_include_page": True, # wir schließen den Tab selbst
"playwright_page_methods": [
PageMethod("add_init_script", self._stealth_script()),
PageMethod("route", "**/*", self._smart_block),
# kurze Timeouts direkt an der Page
PageMethod("set_default_timeout", 5000), # 5s
PageMethod("set_default_navigation_timeout", 5000), # 5s
# Basis-Seite laden
PageMethod("goto", url, wait_until="domcontentloaded"),
# simple Overlays killen
PageMethod(
"eval_on_selector_all",
"[id*='cookie'], [id*='consent'], [id*='overlay']",
"els => els.forEach(el => el.remove())"
),
# Internen Kontakt/Impressum-Link klicken (Kontakt bevorzugt)
PageMethod("evaluate", """
() => {
const origin = location.origin;
const links = Array.from(document.querySelectorAll('a[href]'));
const find = (re) => links.find(a => {
const href = a.getAttribute('href') || '';
const url = new URL(href, location.href);
return url.origin === origin && re.test(url.pathname);
});
const kontakt = find(/kontakt/i);
const impressum = find(/impressum/i);
const target = kontakt || impressum;
if (target) { target.click(); return true; }
return false;
}
"""),
# minimales Warten + Stop – verhindert endlose XHR-Schleifen
PageMethod("wait_for_load_state", "domcontentloaded"),
PageMethod("wait_for_timeout", 600),
PageMethod("evaluate", "window.stop();"),
],
}
)
def parse_kontakt_impressum_only(self, response):
path = urlparse(response.url).path.lower()
if not ("/kontakt" in path or "/impressum" in path):
self.logger.info(f"➡️ Keine Kontakt/Impressum-Seite auf {response.request.url} – weiter.")
page = response.meta.get("playwright_page")
if page:
try:
asyncio.get_event_loop().create_task(page.close())
except Exception:
pass
return
self.logger.info(f"🔎 Analysiere NUR: {response.url}")
text_nodes = response.xpath('//main//text() | //body//text()').getall()
text_content = " ".join(t.strip() for t in text_nodes if t.strip())[:200_000]
raw_emails = re.findall(
r"[a-zA-Z0-9._%+-]+\s?(?:@|[\[\(]{1}at[\]\)]{1}|\s+at\s+)\s?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
text_content
)
cleaned_emails = set(
re.sub(r"\s?(?:\[\s*at\s*\]|\(at\)|\s+at\s+)", "@", e.replace(" ", ""))
for e in raw_emails
)
new_emails = cleaned_emails - self.found_emails
if new_emails:
self.logger.info(f"✅ {len(new_emails)} neue E-Mails auf {response.url} gefunden")
else:
self.logger.info(f"❌ Keine E-Mails auf {response.url} gefunden")
for email in new_emails:
self.found_emails.add(email)
self.rows.append({"source": response.url, "email": email})
yield {"source": response.url, "email": email}
# Tab sicher schließen (async)
page = response.meta.get("playwright_page")
if page:
try:
asyncio.get_event_loop().create_task(page.close())
except Exception:
pass
def handle_error(self, failure):
self.logger.warning(f"❌ Fehler bei {failure.request.url}: {failure.value}")
# Seite schließen, falls vorhanden
page = failure.request.meta.get("playwright_page")
if page:
try:
asyncio.get_event_loop().create_task(page.close())
except Exception:
pass
def _stealth_script(self):
return """
() => {
Object.defineProperty(navigator, 'webdriver', {get: () => false});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
Object.defineProperty(navigator, 'languages', {get: () => ['de-DE', 'de', 'en-US']});
Object.defineProperty(navigator, 'platform', {get: () => 'Win32'});
Object.defineProperty(navigator, 'vendor', {get: () => 'Google Inc.'});
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters)
);
return true;
}
"""
def closed(self, reason):
self.logger.info("🚀 Scraper abgeschlossen, lade direkt als Google Sheet hoch ...")
try:
from drive_upload import upload_to_drive
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
filename = f"emails_{timestamp}.csv"
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["source", "email"])
writer.writeheader()
for row in self.rows:
writer.writerow(row)
upload_to_drive(filename)
os.remove(filename)
except Exception as e:
self.logger.error(f"❌ Fehler beim Google Drive Upload: {e}")
Upload nach Google Drive
Nach dem Scrape wird die CSV automatisch in Google Drive hochgeladen – mit Zeitstempel.
Das funktioniert über PyDrive2 und deine gespeicherten Credentials (client_secrets.json
+ mycreds.txt
).
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import os
def upload_to_drive(local_file_path, remote_filename=None):
print("▶️ Upload gestartet …")
if not os.path.exists(local_file_path):
print(f"⚠️ Datei nicht gefunden: {local_file_path}")
return
try:
print("🔐 Authentifiziere mit Google …")
gauth = GoogleAuth()
gauth.settings['client_config_file'] = 'client_secrets.json'
gauth.settings['oauth_scope'] = ['https://www.googleapis.com/auth/drive.file']
gauth.LoadCredentialsFile("mycreds.txt")
if gauth.credentials is None:
print("🌐 Öffne Browser zur Anmeldung …")
gauth.LocalWebserverAuth()
elif gauth.access_token_expired:
print("♻️ Token abgelaufen, erneuere …")
gauth.Refresh()
else:
print("✅ Token gültig, verwende gespeicherte Credentials …")
gauth.Authorize()
gauth.SaveCredentialsFile("mycreds.txt")
except Exception as e:
print(f"❌ Authentifizierungsfehler: {e}")
return
Bonus: Google Search Scraper
Wenn du gar keine URL-Liste hast, kannst du mit SerpAPI automatisiert bei Google suchen – z. B. nach „Eventagenturen München“ – und die Treffer als CSV speichern.
import scrapy
import requests
import csv
import os
class GoogleSearchSpider(scrapy.Spider):
name = "google_search"
def start_requests(self):
self.visited_urls = []
query = "Eventagenturen München"
urls = self.get_google_results(query, pages=3)
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def get_google_results(self, query, pages=3):
api_key = "HIER" # 🔑 Deinen API Key eintragen
results = []
for page in range(pages):
start = page * 10
params = {
"engine": "google",
"q": query,
"start": start,
"api_key": api_key
}
response = requests.get("https://serpapi.com/search", params=params)
data = response.json()
for result in data.get("organic_results", []):
link = result.get("link")
if link:
results.append(link)
return results
def parse(self, response):
url = response.url
self.logger.info(f"✅ Besucht: {url}")
self.visited_urls.append(url)
yield {"url": url}
def closed(self, reason):
# Datei speichern beim Beenden
filename = "google_urls.csv"
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["url"])
for url in self.visited_urls:
writer.writerow([url])
self.logger.info(f"📁 URLs gespeichert in: {os.path.abspath(filename)}")
Wie lass ich den Crawler durchlaufen?
- URLs in
urls.txt
eintragen - Spider starten über den folgenden Befehl
- Beim ersten Lauf wirst du in deinem Browser zur Google-Freigabe weitergeleitet. Danach ist das Token gespeichert.
scrapy crawl email_spider
Ablauf der Google-Authentifizierung
Wenn du mit Google Drive arbeitest, solltest du wissen, dass der Authentifizierungsprozess in regelmäßigen Abständen erneuert werden muss – meistens alle paar Tage. Das liegt daran, dass das von Google generierte Zugangstoken (gespeichert in der Datei mycreds.txt
) nach einer gewissen Zeit abläuft. Um die Authentifizierung zu erneuern, musst du einfach die Datei mycreds.txt
löschen und den Scraper wie gewohnt starten – zum Beispiel mit einer Test-URL. Sobald der Upload-Vorgang aufgerufen wird, öffnet sich automatisch ein Browserfenster, in dem du dich bei deinem Google-Konto anmeldest und den Zugriff bestätigst. Danach wird eine neue mycreds.txt
erzeugt und alle künftigen Uploads funktionieren wieder automatisch im Hintergrund.
Du brauchst Unterstützung?
Wenn du Unterstützung brauchst oder dein Setup nicht funktioniert, schreib mir einfach.
Ich helfe gerne – oder bau dir den Scraper so, wie du ihn brauchst.