2
0
mirror of https://github.com/fhem/fhem-mirror.git synced 2025-03-10 09:16:53 +00:00

49_Arlo.pm: delete old 2FA mails before login

git-svn-id: https://svn.fhem.de/fhem/trunk@28062 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
maluk 2023-10-17 18:10:00 +00:00
parent abc7491b44
commit 1680e2cb17

View File

@ -1,232 +1,235 @@
import base64 import base64
import sys import sys
import time import time
import cloudscraper import cloudscraper
import email import email
import imaplib import imaplib
import re import re
from html.parser import HTMLParser from html.parser import HTMLParser
class Arlo: class Arlo:
def __init__(self, tfa_mail_check) -> None: def __init__(self, tfa_mail_check) -> None:
self._tfa_mail_check = tfa_mail_check self._tfa_mail_check = tfa_mail_check
browser = { browser = {
'browser': 'chrome', 'browser': 'chrome',
'platform': 'linux', 'platform': 'linux',
'mobile': False 'mobile': False
} }
self._session = cloudscraper.create_scraper(browser=browser) self._session = cloudscraper.create_scraper(browser=browser)
self._baseUrl = "https://ocapi-app.arlo.com/api/" self._baseUrl = "https://ocapi-app.arlo.com/api/"
self._headers = { self._headers = {
'Access-Control-Request-Headers': 'content-type,source,x-user-device-id,x-user-device-name,x-user-device-type', 'Access-Control-Request-Headers': 'content-type,source,x-user-device-id,x-user-device-name,x-user-device-type',
'Access-Control-Request-Method': 'POST', 'Access-Control-Request-Method': 'POST',
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"Pragma": "no-cache", "Pragma": "no-cache",
"Referer": "https://my.arlo.com", "Referer": "https://my.arlo.com",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.58', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.58',
} }
self._session.options(self._baseUrl + "auth", headers=self._headers) self._session.options(self._baseUrl + "auth", headers=self._headers)
self._headers = { self._headers = {
"DNT": "1", "DNT": "1",
"schemaVersion": "1", "schemaVersion": "1",
"Auth-Version": "2", "Auth-Version": "2",
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"Content-Type": "application/json; charset=UTF-8", "Content-Type": "application/json; charset=UTF-8",
"Origin": "https://my.arlo.com", "Origin": "https://my.arlo.com",
"Pragma": "no-cache", "Pragma": "no-cache",
"Referer": "https://my.arlo.com/", "Referer": "https://my.arlo.com/",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.58', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.58',
"Source": "arloCamWeb" "Source": "arloCamWeb"
} }
self._token = None self._token = None
def login(self, username, password): def login(self, username, password):
status("login") status("login")
json = {"email": username, "password": encode(password), "language": "en", "EnvSource": "prod"} json = {"email": username, "password": encode(password), "language": "en", "EnvSource": "prod"}
attempt = 0 attempt = 0
while attempt < 3: while attempt < 3:
attempt += 1 attempt += 1
try: try:
r = self._session.post(self._baseUrl + "auth", json=json, headers=self._headers) r = self._session.post(self._baseUrl + "auth", json=json, headers=self._headers)
if r.status_code == 200: if r.status_code == 200:
data = _get_data(r) data = _get_data(r)
if data is not None: if data is not None:
self._request_factors(data) self._request_factors(data)
return return
if r.status_code == 400: if r.status_code == 400:
error("Bad auth request - probably the credentials are wrong.") error("Bad auth request - probably the credentials are wrong.")
return return
except Exception as e: except Exception as e:
log(e) log(e)
time.sleep(3) time.sleep(3)
status("loginFailed") status("loginFailed")
def _request_factors(self, data): def _request_factors(self, data):
status("getFactors") status("getFactors")
self._token = data["token"] self._token = data["token"]
self._headers["Authorization"] = encode(self._token) self._headers["Authorization"] = encode(self._token)
r = self._session.get(self._baseUrl + "getFactors?data=" + str(data["authenticated"]), headers=self._headers) r = self._session.get(self._baseUrl + "getFactors?data=" + str(data["authenticated"]), headers=self._headers)
data = _get_data(r) data = _get_data(r)
if data is None: if data is None:
error("getFactors not successful, response code " + str(r.status_code)) error("getFactors not successful, response code " + str(r.status_code))
return return
for factor in data["items"]: for factor in data["items"]:
if factor["factorType"] == "EMAIL": if factor["factorType"] == "EMAIL":
self._auth_tfa(factor["factorId"]) self._auth_tfa(factor["factorId"])
return return
error("email factor not found.") error("email factor not found.")
def _auth_tfa(self, factor_id): def _auth_tfa(self, factor_id):
status("startAuth") status("startAuth")
self._tfa_mail_check.open() self._tfa_mail_check.open()
json = {"factorId": factor_id} json = {"factorId": factor_id}
r = self._session.post(self._baseUrl + "startAuth", json=json, headers=self._headers) r = self._session.post(self._baseUrl + "startAuth", json=json, headers=self._headers)
data = _get_data(r) data = _get_data(r)
if data is None: if data is None:
error("startAuth not successful, response code " + str(r.status_code)) error("startAuth not successful, response code " + str(r.status_code))
return return
factor_auth_code = data["factorAuthCode"] factor_auth_code = data["factorAuthCode"]
status("waitFor2FA") status("waitFor2FA")
code = self._tfa_mail_check.get() code = self._tfa_mail_check.get()
self._tfa_mail_check.close() self._tfa_mail_check.close()
log("Try to login with code " + code) log("Try to login with code " + code)
status("finishAuth") status("finishAuth")
json = {"factorAuthCode": factor_auth_code, "otp": code} json = {"factorAuthCode": factor_auth_code, "otp": code}
r = self._session.post(self._baseUrl + "finishAuth", json=json, headers=self._headers) r = self._session.post(self._baseUrl + "finishAuth", json=json, headers=self._headers)
data = _get_data(r) data = _get_data(r)
if data is None: if data is None:
error("finishAuth not successful, response code " + str(r.status_code)) error("finishAuth not successful, response code " + str(r.status_code))
return return
self._token = data["token"] self._token = data["token"]
self._headers["Authorization"] = encode(self._token) self._headers["Authorization"] = encode(self._token)
r = self._session.get(self._baseUrl + "validateAccessToken?data=" + str(data["authenticated"]), r = self._session.get(self._baseUrl + "validateAccessToken?data=" + str(data["authenticated"]),
headers=self._headers) headers=self._headers)
if r.status_code != 200: if r.status_code != 200:
error("validateAccessToken not successful, response code " + str(r.status_code)) error("validateAccessToken not successful, response code " + str(r.status_code))
return return
print("cookies:", self._get_cookie_header()) print("cookies:", self._get_cookie_header())
print("token:", self._token) print("token:", self._token)
print("userId:", data["userId"]) print("userId:", data["userId"])
print("end") print("end")
def _get_cookie_header(self): def _get_cookie_header(self):
cookie_header = "" cookie_header = ""
for cookie in self._session.cookies: for cookie in self._session.cookies:
if cookie_header != "": if cookie_header != "":
cookie_header += "; " cookie_header += "; "
cookie_header += cookie.name + "=" + cookie.value cookie_header += cookie.name + "=" + cookie.value
return cookie_header return cookie_header
def _get_data(r): def _get_data(r):
if r.status_code != 200: if r.status_code != 200:
return None return None
try: try:
body = r.json() body = r.json()
except Exception as e: except Exception as e:
log(r.content) log(r.content)
error(e) error(e)
return None return None
if "meta" in body: if "meta" in body:
if body["meta"]["code"] == 200: if body["meta"]["code"] == 200:
return body["data"] return body["data"]
elif "success" in body: elif "success" in body:
if body["success"]: if body["success"]:
if "data" in body: if "data" in body:
return body["data"] return body["data"]
log(r.json()) log(r.json())
return None return None
class TfaMailCheck: class TfaMailCheck:
def __init__(self, mail_server, username, password) -> None: def __init__(self, mail_server, username, password) -> None:
self._imap = None self._imap = None
self._mail_server = mail_server self._mail_server = mail_server
self._username = username self._username = username
self._password = password self._password = password
def open(self): def open(self):
self._imap = imaplib.IMAP4_SSL(self._mail_server) self._imap = imaplib.IMAP4_SSL(self._mail_server)
res, status = self._imap.login(self._username, self._password) res, status = self._imap.login(self._username, self._password)
if res.lower() != "ok": if res.lower() != "ok":
return False return False
res, status = self._imap.select() res, status = self._imap.select()
if res.lower() != "ok": if res.lower() != "ok":
return False return False
res, ids = self._imap.search(None, "FROM", "do_not_reply@arlo.com") res, ids = self._imap.search(None, "FROM", "do_not_reply@arlo.com")
for msg_id in ids[0].split(): for msg_id in ids[0].split():
self._determine_code_and_delete_mail(msg_id) self._determine_code_and_delete_mail(msg_id)
if res.lower() != "ok": if res.lower() == "ok" and len(ids) > 0:
return False self._imap.close()
res, status = self._imap.select()
def get(self): if res.lower() != "ok":
timeout = time.time() + 100 return False
while True:
time.sleep(5) def get(self):
if time.time() > timeout: timeout = time.time() + 100
return None while True:
time.sleep(5)
try: if time.time() > timeout:
self._imap.check() return None
res, ids = self._imap.search(None, "FROM", "do_not_reply@arlo.com")
for msg_id in ids[0].split(): try:
code = self._determine_code_and_delete_mail(msg_id) self._imap.check()
if code is not None: res, ids = self._imap.search(None, "FROM", "do_not_reply@arlo.com")
return code for msg_id in ids[0].split():
code = self._determine_code_and_delete_mail(msg_id)
except Exception as e: if code is not None:
return None return code
def _determine_code_and_delete_mail(self, msg_id): except Exception as e:
res, msg = self._imap.fetch(msg_id, "(RFC822)") return None
for part in email.message_from_bytes(msg[0][1]).walk():
if part.get_content_type() == "text/html": def _determine_code_and_delete_mail(self, msg_id):
code_filter = CodeFilter() res, msg = self._imap.fetch(msg_id, "(RFC822)")
code_filter.feed(part.get_payload()) for part in email.message_from_bytes(msg[0][1]).walk():
if code_filter.code: if part.get_content_type() == "text/html":
self._imap.store(msg_id, "+FLAGS", "\\Deleted") code_filter = CodeFilter()
return code_filter.code code_filter.feed(part.get_payload())
if code_filter.code:
def close(self): self._imap.store(msg_id, "+FLAGS", "\\Deleted")
self._imap.close() return code_filter.code
self._imap.logout()
def close(self):
class CodeFilter(HTMLParser): self._imap.close()
code = None self._imap.logout()
def handle_data(self, data):
if self.code: class CodeFilter(HTMLParser):
return code = None
line = data.strip().replace("=09", "") def handle_data(self, data):
match = re.match(r"\d{6}", line) if self.code:
if match: return
self.code = match.group(0) line = data.strip().replace("=09", "")
match = re.match(r"\d{6}", line)
if match:
def status(status): self.code = match.group(0)
print("status:", status, flush=True)
def log(msg): def status(status):
print("log:", msg, flush=True) print("status:", status, flush=True)
def error(msg): def log(msg):
print("error:", msg, flush=True) print("log:", msg, flush=True)
def encode(s): def error(msg):
return base64.b64encode(s.encode()).decode() print("error:", msg, flush=True)
def encode(s):
if __name__ == '__main__': return base64.b64encode(s.encode()).decode()
if len(sys.argv) < 6:
error("5 arguments expected: arlo user, arlo password, imap server, email user, email password")
tfa_mail_check = TfaMailCheck(sys.argv[3], sys.argv[4], sys.argv[5]) if __name__ == '__main__':
arlo = Arlo(tfa_mail_check) if len(sys.argv) < 6:
arlo.login(sys.argv[1], sys.argv[2]) error("5 arguments expected: arlo user, arlo password, imap server, email user, email password")
tfa_mail_check = TfaMailCheck(sys.argv[3], sys.argv[4], sys.argv[5])
arlo = Arlo(tfa_mail_check)
arlo.login(sys.argv[1], sys.argv[2])