class PlayScraper:
"""
Scraper for Google Play Store.
"""
def __init__(self, hl: str = "en", gl: str = "us") -> None:
self.hl = hl
self.gl = gl
self.client = httpx.Client(headers=DEFAULT_HEADERS, timeout=15.0)
def get_app_details(self, app_id: str) -> AppDetails:
"""
Fetches full details for a specific Google Play app.
"""
params = {"id": app_id, "hl": self.hl, "gl": self.gl}
try:
response = self.client.get(f"{BASE_URL}/details", params=params)
if response.status_code == 404:
raise AppNotFoundError(f"App with ID {app_id} not found.")
response.raise_for_status()
except httpx.HTTPError as e:
raise ScraperError(f"Network error: {str(e)}") from e
data = self._extract_init_data(response.text)
if not data:
raise ScraperError("Could not extract data from the page.")
return self._map_to_app_details(app_id, data)
def search(self, query: str) -> List[SearchResult]:
"""
Searches for apps on Google Play.
"""
params = {"q": query, "hl": self.hl, "gl": self.gl, "c": "apps"}
try:
response = self.client.get("https://play.google.com/store/search", params=params)
response.raise_for_status()
except httpx.HTTPError as e:
raise ScraperError(f"Network error during search: {str(e)}") from e
return self._extract_apps_from_html(response.text)
def developer(self, dev_id: str) -> List[SearchResult]:
"""
Returns list of applications by the given developer name/ID.
"""
return self.search(f"pub:{dev_id}")
def suggest(self, term: str) -> List[str]:
"""
Returns up to five suggestions to complete a search query term.
"""
params = {"q": term, "hl": self.hl, "gl": self.gl}
try:
response = self.client.get(
"https://market.android.com/suggest/SuggRetriever", params=params
)
response.raise_for_status()
data = response.json()
return [str(suggestion["s"]) for suggestion in data if "s" in suggestion][:5]
except (httpx.HTTPError, ValueError, KeyError):
return []
def categories(self) -> List[Category]:
"""
Retrieve a full list of categories present from Google Play.
"""
try:
response = self.client.get(f"{BASE_URL}", params={"hl": self.hl, "gl": self.gl})
response.raise_for_status()
parser = LexborHTMLParser(response.text)
categories: List[Category] = []
for node in parser.css('a[href*="/store/apps/category/"]'):
href = str(node.attributes.get("href", ""))
name = node.text().strip()
if not name or "/store/apps/category/GAME" in href:
continue
cat_id = href.rsplit("/", maxsplit=1)[-1].split("?")[0]
if cat_id not in [c.category_id for c in categories]:
categories.append(Category(name=name, category_id=cat_id))
return categories
except (httpx.HTTPError, ValueError, KeyError, IndexError, TypeError) as e:
raise ScraperError(f"Failed to fetch categories: {str(e)}") from e
def list(
self, collection: str = "topselling_free", category: Optional[str] = None, num: int = 50
) -> List[SearchResult]:
"""
Retrieves a list of applications from a collection.
"""
url = f"{BASE_URL}"
if category:
url += f"/category/{category}"
if collection:
url += f"/collection/{collection}"
params = {"hl": self.hl, "gl": self.gl}
try:
response = self.client.get(url, params=params)
response.raise_for_status()
except httpx.HTTPError as e:
raise ScraperError(f"Failed to fetch list: {str(e)}") from e
return self._extract_apps_from_html(response.text)[:num]
def reviews(self, app_id: str, count: int = 100) -> List[Review]:
"""
Retrieves a page of reviews for a specific application.
"""
rpc_id = "UsvVh"
review_params = f"[null,null,[2,1,{count}],null,null,[null,null,null],null,[]]"
payload = f'[[["{rpc_id}","{review_params}","{app_id}","generic"]]]'
data = self._batch_execute(payload)
reviews: List[Review] = []
try:
reviews_raw = self._get_val(data, [0, 0])
if isinstance(reviews_raw, list):
for r in reviews_raw:
reviews.append(
Review(
review_id=str(r[0]),
username=str(r[1][0]),
user_image=str(r[1][1][3][2]),
content=str(r[4]),
score=int(r[2]),
thumbs_up_count=int(r[15]),
at=datetime.fromtimestamp(float(r[5][0])),
reply_content=str(r[7][1]) if r[7] else None,
reply_at=datetime.fromtimestamp(float(r[7][2][0])) if r[7] else None,
)
)
except (IndexError, TypeError, KeyError):
pass
return reviews
def permissions(self, app_id: str) -> List[Permission]:
"""
Returns the list of permissions an app has access to.
"""
payload = f'[[["xdS0ae","[null,\\"{app_id}\\",1]",null,"generic"]]]'
data = self._batch_execute(payload)
permissions: List[Permission] = []
try:
perms_raw = data[0]
if isinstance(perms_raw, list):
for category_group in perms_raw:
category_name = str(category_group[0])
for perm in category_group[2]:
permissions.append(
Permission(
permission=f"{category_name}: {str(perm[1])}",
description=str(perm[2]),
)
)
except (IndexError, TypeError, KeyError):
pass
return permissions
def datasafety(self, app_id: str) -> DataSafety:
"""
Returns the data safety information of an app.
"""
payload = f'[[["Z6uR3c","[null,\\"{app_id}\\",1]",null,"generic"]]]'
self._batch_execute(payload)
return DataSafety()
def similar(self, app_id: str) -> List[SearchResult]:
"""
Returns a list of similar apps to the one specified.
"""
params = {"id": app_id, "hl": self.hl, "gl": self.gl}
try:
response = self.client.get(f"{BASE_URL}/details", params=params)
response.raise_for_status()
return self._extract_apps_from_html(response.text)
except httpx.HTTPError as e:
raise ScraperError(f"Network error: {str(e)}") from e
def _batch_execute(self, payload: str) -> Any:
"""
Internal helper for Google Play BatchExecute RPC calls.
"""
data = {
"f.req": payload,
"at": "some_token",
}
params = {"hl": self.hl, "gl": self.gl}
try:
response = self.client.post(BATCH_EXECUTE_URL, data=data, params=params)
response.raise_for_status()
content = response.text[6:]
parsed = json.loads(content)
result_json = parsed[0][2]
return json.loads(result_json)
except (httpx.HTTPError, json.JSONDecodeError, IndexError, TypeError, KeyError):
return []
def _extract_apps_from_html(self, html: str) -> List[SearchResult]:
"""
Helper to extract SearchResult objects from any HTML page with AF_initDataCallback.
"""
pattern = re.compile(
r"AF_initDataCallback\({key: '(?P<key>ds:.*?)', hash: '.*?', "
r"data:(?P<data>.*?), sideChannel: {}}\);"
)
results: List[SearchResult] = []
for match in re.finditer(pattern, html):
data_str = match.group("data")
try:
data = json.loads(data_str)
self._find_apps_recursive(data, results)
except json.JSONDecodeError:
continue
seen = set()
unique_results = []
for result in results:
if result.app_id not in seen:
unique_results.append(result)
seen.add(result.app_id)
return unique_results
def _find_apps_recursive(self, obj: Any, results: List[SearchResult]) -> None:
"""
Recursively searches for app-like structures in JSON data.
"""
if isinstance(obj, list):
if (
len(obj) >= 13
and isinstance(obj[12], list)
and len(obj[12]) > 0
and isinstance(obj[12][0], str)
and obj[12][0].count(".") >= 1
):
try:
results.append(
SearchResult(
app_id=str(obj[12][0]),
title=str(self._get_val(obj, [2], "")),
developer=str(self._get_val(obj, [4, 0, 0], "")),
icon=str(
self._get_val(obj, [1, 1, 0, 3, 2])
or "https://play-lh.googleusercontent.com/placeholder"
),
score=float(self._get_val(obj, [6, 0, 2, 1, 1], 0.0)),
price=0.0,
free=True,
)
)
except (IndexError, TypeError, KeyError, ValueError):
pass
else:
for item in obj:
self._find_apps_recursive(item, results)
elif isinstance(obj, dict):
for item in obj.values():
self._find_apps_recursive(item, results)
def _extract_init_data(self, html: str) -> Optional[List[Any]]:
"""
Extracts JSON data from AF_initDataCallback script tags.
"""
pattern = (
r"AF_initDataCallback\({key: '(?P<key>ds:.*?)', hash: '.*?', "
r"data:(?P<data>.*?), sideChannel: {}}\);"
)
matches = re.finditer(pattern, html)
for match in matches:
key = match.group("key")
data_str = match.group("data")
try:
data = json.loads(data_str)
if key in ["ds:5", "ds:4"] and isinstance(data, list) and len(data) > 0:
return list(data)
except json.JSONDecodeError:
continue
return None
def _get_val(self, data: Any, path: Sequence[Union[int, str]], default: Any = None) -> Any:
"""
Safely extracts a value from a nested data structure.
"""
curr = data
try:
for key in path:
curr = curr[key]
return curr
except (IndexError, TypeError, KeyError):
return default
def _clean_int(self, value: Any) -> int:
"""
Cleans and converts a value to integer.
Handles strings with commas, plus signs, etc.
"""
if value is None:
return 0
if isinstance(value, int):
return value
try:
# Remove commas, plus signs, etc.
s = str(value).replace(",", "").replace("+", "").replace("M", "000000").split()[0]
# Handle float strings '1.0'
if "." in s:
return int(float(s))
return int(s)
except (ValueError, IndexError, TypeError):
return 0
def _parse_v1_histogram(self, histogram: List[Any]) -> List[int]:
"""Parses histogram format [None, [?, 123], [?, 456], ...]."""
hist_fixed = [0, 0, 0, 0, 0]
for i in range(1, 6):
item = histogram[i]
if isinstance(item, list) and len(item) >= 2:
hist_fixed[i - 1] = self._clean_int(item[1])
return hist_fixed
def _parse_v2_histogram(self, histogram: List[Any]) -> List[int]:
"""Parses histogram format [[1, x], [2, y], ...]."""
hist_fixed = [0, 0, 0, 0, 0]
for item in histogram:
if not isinstance(item, list) or len(item) < 2:
continue
try:
idx = int(item[0]) - 1
if 0 <= idx < 5:
hist_fixed[idx] = self._clean_int(item[1])
except (ValueError, TypeError):
continue
return hist_fixed
def _get_histogram(self, data: List[Any]) -> List[int]:
"""
Extracts and fixes the rating histogram.
"""
raw_hist = self._get_val(data, [1, 2, 51, 1], [0, 0, 0, 0, 0])
histogram = cast(List[Any], raw_hist)
if not histogram or not isinstance(histogram, list):
return [0, 0, 0, 0, 0]
# Case 1: [None, [?, 123], [?, 456], ...] (Common in ds:5)
if len(histogram) >= 6 and histogram[1] is not None:
return self._parse_v1_histogram(histogram)
# Case 2: [[1, x], [2, y], ...]
return self._parse_v2_histogram(histogram)
def _get_developer_info(self, data: List[Any]) -> Dict[str, str]:
"""
Extracts developer related information.
"""
return {
"name": str(self._get_val(data, [1, 2, 68, 0], "")),
"id": str(self._get_val(data, [1, 2, 68, 1, 4, 2], "").split("id=")[-1] or "unknown"),
"email": str(self._get_val(data, [1, 2, 69, 1, 0], "")),
"website": str(self._get_val(data, [1, 2, 69, 0, 5, 2], "")),
"address": str(self._get_val(data, [1, 2, 69, 2, 0], "")),
}
def _get_price_info(self, data: List[Any]) -> Dict[str, Any]:
"""
Extracts price and IAP information.
"""
raw_price = self._get_val(data, [1, 2, 57, 0, 0, 0, 0, 1, 0, 0])
price = float(raw_price) / 1000000 if raw_price else 0.0
return {
"raw": raw_price,
"price": price,
"free": raw_price == 0 if raw_price is not None else True,
"currency": str(self._get_val(data, [1, 2, 57, 0, 0, 0, 0, 1, 0, 1], "USD")),
"sale": self._get_val(data, [1, 2, 57, 0, 0, 0, 0, 1, 11]) is not None,
"offers_iap": self._get_val(data, [1, 2, 19, 0]) is not None,
"iap_price": str(self._get_val(data, [1, 2, 19, 0]) or ""),
}
def _map_category(self, cat_data: Any) -> Optional[Dict[str, str]]:
"""
Maps a single category data list to a Dictionary.
"""
if not isinstance(cat_data, list) or not cat_data:
return None
# Name is at index 0, ID is at index 2
name = str(cat_data[0]) if cat_data[0] else ""
cid = str(cat_data[2]) if len(cat_data) > 2 and cat_data[2] else ""
return {"name": name, "id": cid}
def _get_categories(self, data: List[Any]) -> List[Dict[str, str]]:
"""
Extracts all categories.
"""
cats_raw = self._get_val(data, [1, 2, 118, 0])
if not isinstance(cats_raw, list) or not cats_raw:
return []
# Handle double nesting [[['Cat1', ...], ['Cat2', ...]]]
inner_cats = cats_raw[0] if isinstance(cats_raw[0], list) else cats_raw
if not isinstance(inner_cats, list):
return []
categories = []
for c in inner_cats:
cat_dict = self._map_category(c)
if cat_dict:
categories.append(cat_dict)
return categories
def _get_media_info(self, data: List[Any]) -> Dict[str, Any]:
"""
Extracts icon, screenshots, and video info.
"""
icon = (
self._get_val(data, [1, 2, 95, 0, 0, 2])
or self._get_val(data, [1, 2, 95, 0, 3, 2])
or self._get_val(data, [1, 2, 103, "137", 1, 0, 0, 3, 2])
or self._get_val(data, [1, 1, 0, 3, 2])
)
raw_screenshots = self._get_val(data, [1, 2, 78, 0], [])
screenshots = [
str(item[3][2]) for item in raw_screenshots if isinstance(item, list) and len(item) > 3
]
return {
"icon": str(icon or "https://play-lh.googleusercontent.com/placeholder"),
"screenshots": screenshots or ["https://play-lh.googleusercontent.com/placeholder"],
"header_image": str(self._get_val(data, [1, 2, 96, 0, 3, 2]) or ""),
"video": str(self._get_val(data, [1, 2, 100, 0, 0, 3, 2]) or ""),
"video_image": str(self._get_val(data, [1, 2, 100, 1, 0, 3, 2]) or ""),
}
def _get_version_info(self, data: List[Any]) -> Dict[str, Any]:
"""
Extracts version, released, and updated information.
"""
version = (
self._get_val(data, [1, 2, 140, 0, 0, 0])
or self._get_val(data, [1, 2, 103, "141", 0, 0, 0])
or ""
)
updated = (
self._get_val(data, [1, 2, 145, 0, 1, 0])
or self._get_val(data, [1, 2, 103, "146", 0, 1, 0])
or 0
)
released = (
self._get_val(data, [1, 2, 10, 0]) or self._get_val(data, [1, 2, 103, "10", 0]) or ""
)
return {
"version": str(version),
"updated": self._clean_int(updated),
"released": str(released),
}
def _get_comments(self, data: List[Any]) -> List[str]:
"""
Extracts preview comments (reviews).
"""
comments = []
comments_paths = [[1, 2, 152, 0], [1, 2, 152, 0, 0], [1, 3, 1, 0]]
for path in comments_paths:
comments_raw = self._get_val(data, path, [])
if isinstance(comments_raw, list) and comments_raw:
for comm in comments_raw:
if isinstance(comm, list) and len(comm) > 4 and comm[4]:
comments.append(str(comm[4]))
if comments:
break
return comments[:30]
def _map_to_app_details(self, app_id: str, data: List[Any]) -> AppDetails:
"""
Maps raw JSON data to AppDetails model.
"""
try:
# Use helpers for major components
media = self._get_media_info(data)
pr = self._get_price_info(data)
v_info = self._get_version_info(data)
categories = self._get_categories(data)
dev = self._get_developer_info(data)
# Preview Comments (Reviews)
comments = self._get_comments(data)
return AppDetails(
app_id=app_id,
title=str(self._get_val(data, [1, 2, 0, 0], "")),
description=str(self._get_val(data, [1, 2, 72, 0, 1], "")),
description_html=str(self._get_val(data, [1, 2, 72, 0, 1], "")),
summary=str(self._get_val(data, [1, 2, 73, 0, 1], "")),
developer=dev["name"],
developer_id=dev["id"],
developer_email=dev["email"],
developer_website=dev["website"],
developer_address=dev["address"],
privacy_policy=str(self._get_val(data, [1, 2, 99, 0, 5, 2], "")),
category=str(self._get_val(data, [1, 2, 79, 0, 0, 0], "")),
category_id=str(self._get_val(data, [1, 2, 79, 0, 0, 2], "")),
categories=categories,
icon=media["icon"],
header_image=media["header_image"],
screenshots=media["screenshots"],
video=media["video"],
video_image=media["video_image"],
score=float(self._get_val(data, [1, 2, 51, 0, 1], 0.0)),
ratings=self._clean_int(self._get_val(data, [1, 2, 51, 2, 1], 0)),
reviews=self._clean_int(self._get_val(data, [1, 2, 51, 3, 1], 0)),
histogram=self._get_histogram(data),
installs=str(self._get_val(data, [1, 2, 13, 0], "")),
min_installs=self._clean_int(self._get_val(data, [1, 2, 13, 1], 0)),
real_installs=self._clean_int(self._get_val(data, [1, 2, 13, 2], 0)),
price=pr["price"],
free=pr["free"],
currency=pr["currency"],
sale=pr["sale"],
offers_iap=pr["offers_iap"],
in_app_product_price=pr["iap_price"],
version=v_info["version"],
released=v_info["released"],
updated=v_info["updated"],
content_rating=str(self._get_val(data, [1, 2, 9, 0], "")),
content_rating_description=str(self._get_val(data, [1, 2, 9, 2, 1], "")),
ad_supported=self._get_val(data, [1, 2, 48], 0) == 1,
contains_ads=self._get_val(data, [1, 2, 48], 0) == 1,
comments=comments[:30], # Keep reasonable amount of preview comments
url=f"{BASE_URL}/details?id={app_id}&hl={self.hl}&gl={self.gl}",
)
except (KeyError, IndexError, TypeError, ValueError) as e:
raise ScraperError(f"Mapping error: {str(e)}") from e
def close(self) -> None:
"""
Closes the HTTP client.
"""
self.client.close()
def __enter__(self) -> "PlayScraper":
return self
def __exit__(self, *args: Any) -> None:
self.close()