import re
import sys
import threading
import time
import warnings
import weakref
from collections import OrderedDict
from collections.abc import Mapping
from datetime import datetime
from functools import partial
from urllib.parse import quote
import attr
import httpx
from logbook import Logger
from represent import ReprHelper, ReprHelperMixin
from rush.limiters.periodic import PeriodicLimiter
from rush.quota import Quota
from rush.stores.dictionary import DictionaryStore as RushDictionaryStore
from rush.throttle import Throttle
from .operators import _stringify_predicate_value
if sys.version_info >= (3, 11):
isoparse = datetime.fromisoformat
else:
from dateutil.parser import isoparse
logger = Logger("spacetrack")
type_re = re.compile(r"(\w+)")
enum_re = re.compile(
r"""
enum\(
'(\w+)' # First value
(?:, # Subsequent values optional
'(\w+)' # Capture string
)*
\)
""",
re.VERBOSE,
)
BASE_URL = "https://www.space-track.org/"
[docs]
class AuthenticationError(Exception):
"""Space-Track authentication error."""
[docs]
class UnknownPredicateTypeWarning(RuntimeWarning):
"""Used to warn when a predicate type is unknown."""
class Event:
pass
@attr.s(slots=True)
class NormalRequest(Event):
request = attr.ib()
stream = attr.ib(default=False)
follow_redirects = attr.ib(default=False)
@attr.s(slots=True)
class ReadResponse(Event):
response = attr.ib()
@attr.s(slots=True)
class IterLines(Event):
response = attr.ib()
@attr.s(slots=True)
class IterContent(Event):
response = attr.ib()
decode = attr.ib()
@attr.s(slots=True)
class RateLimitWait(Event):
duration = attr.ib()
[docs]
class Predicate(ReprHelperMixin):
"""Hold Space-Track predicate information.
The current goal of this class is to print the repr for the user.
"""
def __init__(self, name, type_, nullable=False, default=None, values=None):
self.name = name
self.type_ = type_
self.nullable = nullable
self.default = default
# Values can be set e.g. for enum predicates
self.values = values
def _repr_helper_(self, r):
r.keyword_from_attr("name")
r.keyword_from_attr("type_")
r.keyword_from_attr("nullable")
r.keyword_from_attr("default")
if self.values is not None:
r.keyword_from_attr("values")
def parse(self, value):
if value is None:
return value
if self.type_ == "float":
return float(value)
elif self.type_ == "int":
return int(value)
elif self.type_ == "datetime":
return isoparse(value)
elif self.type_ == "date":
return isoparse(value).date()
else:
return value
[docs]
class SpaceTrackClient:
"""SpaceTrack client class.
Parameters:
identity: Space-Track username.
password: Space-Track password.
base_url: May be overridden to use e.g. https://testing.space-track.org/
rush_store: A :mod:`rush` storage backend. By default, a
:class:`~rush.stores.dictionary.DictionaryStore` is used. You may
wish to use :class:`~rush.stores.dictionary.RedisStore` to
follow rate limits from multiple instances.
rush_key_prefix: You may choose a prefix for the keys that will be
stored in `rush_store`, e.g. to avoid conflicts in a redis db.
httpx_client: Provide a custom ``httpx.Client` instance.
``SpaceTrackClient`` takes ownership of the httpx client. You should
only provide your own client if you need to configure it first (e.g.
for a proxy).
additional_rate_limit: Optionally, a :class:`rush.quota.Quota` if you
want to restrict the rate limit further than the defaults.
For more information, refer to the `Space-Track documentation`_.
.. _`Space-Track documentation`: https://www.space-track.org/documentation
#api-requestClasses
.. data:: request_controllers
Ordered dictionary of request controllers and their request classes in
the following order.
- `basicspacedata`
- `expandedspacedata`
- `fileshare`
- `spephemeris`
For example, if the ``spacetrack.file`` method is used without
specifying which controller, the client will choose the `fileshare`
controller (which comes before `spephemeris`).
.. note::
If new request classes and/or controllers are added to the
Space-Track API but not yet to this library, you can safely
subclass :class:`SpaceTrackClient` with a copy of this ordered
dictionary to add them.
That said, please open an issue on `GitHub`_ for me to add them to
the library.
.. _`GitHub`: https://github.com/python-astrodynamics/spacetrack
"""
# "request class" methods will be looked up by request controller in this
# order
request_controllers = OrderedDict.fromkeys(
[
"basicspacedata",
"expandedspacedata",
"fileshare",
"spephemeris",
"publicfiles",
]
)
request_controllers["basicspacedata"] = {
"announcement",
"boxscore",
"cdm_public",
"decay",
"gp",
"gp_history",
"launch_site",
"omm",
"satcat",
"satcat_change",
"satcat_debut",
"tip",
"tle",
"tle_latest",
"tle_publish",
}
request_controllers["expandedspacedata"] = {
"car",
"cdm",
"maneuver",
"maneuver_history",
"organization",
"satellite",
}
request_controllers["fileshare"] = {
"delete",
"download",
"file",
"folder",
"upload",
}
request_controllers["spephemeris"] = {
"download",
"file",
"file_history",
}
request_controllers["publicfiles"] = {
"dirs",
"download",
}
# List of (class, controller) tuples for
# requests which do not return a modeldef
offline_predicates = {
("download", "fileshare"): {"file_id", "folder_id", "recursive"},
("upload", "fileshare"): {"folder_id", "file"},
("download", "spephemeris"): set(),
("dirs", "publicfiles"): set(),
("download", "publicfiles"): set(),
}
param_fields = {
("download", "publicfiles"): {"name"},
}
# These predicates are available for every request class.
rest_predicates = {
Predicate("predicates", "str"),
Predicate("metadata", "enum", values=("true", "false")),
Predicate("limit", "str"),
Predicate("orderby", "str"),
Predicate("distinct", "enum", values=("true", "false")),
Predicate(
"format",
"enum",
values=("json", "xml", "html", "csv", "tle", "3le", "kvn", "stream"),
),
Predicate("emptyresult", "enum", values=("show",)),
Predicate("favorites", "str"),
}
def __init__(
self,
identity,
password,
base_url=BASE_URL,
rush_store=None,
rush_key_prefix="",
httpx_client=None,
additional_rate_limit=None,
):
if httpx_client is None:
httpx_client = httpx.Client()
self.client = httpx_client
self.identity = identity
self.password = password
self.client.base_url = base_url
# If set, this will be called when we sleep for the rate limit.
self.callback = None
self._authenticated = False
self._predicates = dict()
self._controller_proxies = dict()
# From https://www.space-track.org/documentation#/api:
# Space-track throttles API use in order to maintain consistent
# performance for all users. To avoid error messages, please limit
# your query frequency.
# Limit API queries to less than 30 requests per minute / 300 requests
# per hour
if rush_store is None:
rush_store = RushDictionaryStore()
limiter = PeriodicLimiter(rush_store)
self._per_minute_throttle = Throttle(
limiter=limiter,
rate=Quota.per_minute(30),
)
self._per_hour_throttle = Throttle(
limiter=limiter,
rate=Quota.per_hour(300),
)
self._per_minute_key = rush_key_prefix + "st_req_min"
self._per_hour_key = rush_key_prefix + "st_req_hr"
self._additional_key = rush_key_prefix + "st_req_custom"
if isinstance(additional_rate_limit, Quota):
self._additional_throttle = Throttle(
limiter=limiter, rate=additional_rate_limit
)
elif additional_rate_limit is None:
self._additional_throttle = None
else:
raise TypeError("additional_rate_limit must be a rush.quota.Quota")
@property
def base_url(self):
return str(self.client.base_url)
@base_url.setter
def base_url(self, url):
self.client.base_url = url
def _handle_event(self, event):
if isinstance(event, NormalRequest):
return self.client.send(
event.request,
follow_redirects=event.follow_redirects,
stream=event.stream,
)
elif isinstance(event, ReadResponse):
return event.response.read()
elif isinstance(event, IterLines):
return _iter_lines_generator(event.response)
elif isinstance(event, IterContent):
return _iter_content_generator(event.response, event.decode)
elif isinstance(event, RateLimitWait):
self._ratelimit_wait(event.duration)
else:
raise RuntimeError(f"Unknown event type: {type(event)}")
def _run_event_generator(self, g):
# Start generator by sending in None
ret = None
while True:
try:
event = g.send(ret)
except StopIteration as exc:
if isinstance(exc.value, Event):
return self._handle_event(exc.value)
return exc.value
ret = self._handle_event(event)
def _auth_generator(self):
if not self._authenticated:
data = {"identity": self.identity, "password": self.password}
resp = yield NormalRequest(
self.client.build_request("POST", "ajaxauth/login", data=data)
)
_raise_for_status(resp)
# If login failed, we get a JSON response with {'Login': 'Failed'}
resp_data = resp.json()
if isinstance(resp_data, Mapping):
if resp_data.get("Login", None) == "Failed":
raise AuthenticationError()
self._authenticated = True
[docs]
def authenticate(self):
"""Authenticate with Space-Track.
Raises:
spacetrack.base.AuthenticationError: Incorrect login details.
.. note::
This method is called automatically when required.
"""
self._run_event_generator(self._auth_generator())
def _generic_request_generator(
self,
class_,
iter_lines=False,
iter_content=False,
controller=None,
parse_types=False,
**kwargs,
):
if iter_lines and iter_content:
raise ValueError("iter_lines and iter_content cannot both be True")
if "format" in kwargs and parse_types:
raise ValueError("parse_types can only be used if format is unset.")
if controller is None:
controller = self._find_controller(class_)
else:
classes = self.request_controllers.get(controller, None)
if classes is None:
raise ValueError(f"Unknown request controller {controller!r}")
if class_ not in classes:
raise ValueError(
f"Unknown request class {class_!r} for controller {controller!r}"
)
# Decode unicode unless class == download, including conversion of
# CRLF newlines to LF.
decode = class_ != "download"
if not decode and iter_lines:
error = (
"iter_lines disabled for binary data, since CRLF newlines "
"split over chunk boundaries would yield extra blank lines. "
"Use iter_content=True instead."
)
raise ValueError(error)
offline_check = (class_, controller) in self.offline_predicates
valid_fields = {p.name for p in self.rest_predicates}
predicates = None
yield from self._auth_generator()
if not offline_check:
# Validate keyword argument names by querying valid predicates from
# Space-Track
predicates = yield from self._get_predicates_generator(class_, controller)
predicate_fields = {p.name for p in predicates}
valid_fields |= predicate_fields
else:
valid_fields |= self.offline_predicates[(class_, controller)]
valid_params = self.param_fields.get((class_, controller), set())
params = dict()
url = f"{controller}/query/class/{class_}"
for key, value in kwargs.items():
if key not in valid_fields | valid_params:
raise TypeError(f"'{class_}' got an unexpected argument '{key}'")
if class_ == "upload" and key == "file":
continue
if key in valid_params:
params[key] = value
continue
value = quote(_stringify_predicate_value(value), safe="")
url += f"/{key}/{value}"
if class_ == "upload":
if "file" not in kwargs:
raise TypeError("missing keyword argument: 'file'")
request = self.client.build_request(
"POST",
url,
files={"file": kwargs["file"]},
params=params,
)
logger.debug(request.url)
resp = yield from self._ratelimited_send_generator(request)
else:
request = self.client.build_request("GET", url, params=params)
logger.debug(request.url)
resp = yield from self._ratelimited_send_generator(
request, stream=iter_lines or iter_content
)
if httpx.codes.is_error(resp.status_code):
# If we're about to raise an error, fetch the full response in case
# we're streaming
yield ReadResponse(resp)
_raise_for_status(resp)
if resp.encoding is None:
resp.encoding = "UTF-8"
if iter_lines:
return IterLines(resp)
elif iter_content:
return IterContent(resp, decode)
else:
# If format is specified, return that format unparsed. Otherwise,
# parse the default JSON response.
if "format" in kwargs:
if decode:
data = resp.text
# Replace CRLF newlines with LF, Python will handle platform
# specific newlines if written to file.
data = data.replace("\r\n", "\n")
else:
data = resp.content
return data
else:
data = resp.json()
if predicates is None or not parse_types:
return data
else:
return self._parse_types(data, predicates)
[docs]
def generic_request(
self,
class_,
iter_lines=False,
iter_content=False,
controller=None,
parse_types=False,
**kwargs,
):
r"""Generic Space-Track query.
The request class methods use this method internally; the public
API is as follows:
.. code-block:: python
st.tle_publish(*args, **kw)
st.basicspacedata.tle_publish(*args, **kw)
st.file(*args, **kw)
st.fileshare.file(*args, **kw)
st.spephemeris.file(*args, **kw)
They resolve to the following calls respectively:
.. code-block:: python
st.generic_request('tle_publish', *args, **kw)
st.generic_request('tle_publish', *args, controller='basicspacedata', **kw)
st.generic_request('file', *args, **kw)
st.generic_request('file', *args, controller='fileshare', **kw)
st.generic_request('file', *args, controller='spephemeris', **kw)
Parameters:
class\_: Space-Track request class name
iter_lines: Yield result line by line
iter_content: Yield result in 100 KiB chunks.
controller: Optionally specify request controller to use.
parse_types: Parse string values in response according to type given
in predicate information, e.g. ``'2017-01-01'`` ->
``datetime.date(2017, 1, 1)``.
**kwargs: These keywords must match the predicate fields on
Space-Track. You may check valid keywords with the following
snippet:
.. code-block:: python
spacetrack = SpaceTrackClient(...)
spacetrack.tle.get_predicates()
# or
spacetrack.get_predicates('tle')
See :func:`~spacetrack.operators._stringify_predicate_value` for
which Python objects are converted appropriately.
Yields:
Lines—stripped of newline characters—if ``iter_lines=True``
Yields:
100 KiB chunks if ``iter_content=True``
Returns:
Parsed JSON object, unless ``format`` keyword argument is passed.
.. warning::
Passing ``format='json'`` will return the JSON **unparsed**. Do
not set ``format`` if you want the parsed JSON object returned!
"""
return self._run_event_generator(
self._generic_request_generator(
class_=class_,
iter_lines=iter_lines,
iter_content=iter_content,
controller=controller,
parse_types=parse_types,
**kwargs,
)
)
@staticmethod
def _parse_types(data, predicates):
predicate_map = {p.name: p for p in predicates}
for obj in data:
for key, value in obj.items():
if key.lower() in predicate_map:
obj[key] = predicate_map[key.lower()].parse(value)
return data
def _ratelimited_send_generator(self, request, *, stream=False):
"""Send a request, handling rate limiting."""
minute_limit = self._per_minute_throttle.check(self._per_minute_key, 1)
hour_limit = self._per_hour_throttle.check(self._per_hour_key, 1)
sleep_time = 0
if minute_limit.limited:
sleep_time = minute_limit.retry_after.total_seconds()
if hour_limit.limited:
sleep_time = max(sleep_time, hour_limit.retry_after.total_seconds())
if self._additional_throttle is not None:
additional_limit = self._additional_throttle.check(self._additional_key, 1)
sleep_time = max(sleep_time, additional_limit.retry_after.total_seconds())
if sleep_time > 0:
yield RateLimitWait(sleep_time)
req_event = NormalRequest(request, stream=stream, follow_redirects=True)
resp = yield req_event
# It's possible that Space-Track will return HTTP status 500 with a
# query rate limit violation. This can happen if a script is cancelled
# before it has finished sleeping to satisfy the rate limit and it is
# started again.
#
# Let's catch this specific instance and retry once if it happens.
if resp.status_code == 500:
# Let's only retry if the error page tells us it's a rate limit
# violation.
yield ReadResponse(resp)
if "violated your query rate limit" in resp.text:
# It seems that only the per-minute rate limit causes an HTTP
# 500 error. Breaking the per-hour limit seems to result in an
# email from Space-Track instead.
yield RateLimitWait(
self._per_minute_throttle.rate.period.total_seconds()
)
resp = yield req_event
return resp
def _ratelimit_callback(self, until):
duration = int(round(until - time.monotonic()))
logger.info("Rate limit reached. Sleeping for {:d} seconds.", duration)
if self.callback is not None:
self.callback(until)
def _ratelimit_wait(self, duration):
until = time.monotonic() + duration
t = threading.Thread(target=self._ratelimit_callback, args=(until,))
t.daemon = True
t.start()
time.sleep(duration)
def __getattr__(self, attr):
if attr in self.request_controllers:
controller_proxy = self._controller_proxies.get(attr)
if controller_proxy is None:
controller_proxy = _ControllerProxy(self, attr)
self._controller_proxies[attr] = controller_proxy
return controller_proxy
try:
controller = self._find_controller(attr)
except ValueError:
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{attr}'"
)
# generic_request can resolve the controller itself, but we
# pass it because we have to check if the class_ is owned
# by a controller here anyway.
function = partial(self.generic_request, class_=attr, controller=controller)
function.get_predicates = partial(
self.get_predicates, class_=attr, controller=controller
)
return function
def __dir__(self):
"""Include request controllers and request classes."""
attrs = list(self.__dict__)
attrs.append("base_url") # property
request_classes = {
class_
for classes in self.request_controllers.values()
for class_ in classes
}
attrs += list(request_classes)
attrs += list(self.request_controllers)
return sorted(attrs)
def _find_controller(self, class_):
"""Find first controller that matches given request class.
Order is specified by the keys of
``SpaceTrackClient.request_controllers``
(:class:`~collections.OrderedDict`)
"""
for controller, classes in self.request_controllers.items():
if class_ in classes:
return controller
else:
raise ValueError(f"Unknown request class {class_!r}")
def _download_predicate_data_generator(self, class_, controller):
yield from self._auth_generator()
url = f"{controller}/modeldef/class/{class_}"
req = self.client.build_request("GET", url)
logger.debug(req.url)
resp = yield NormalRequest(req)
_raise_for_status(resp)
return resp.json()["data"]
def _get_predicates_generator(self, class_, controller):
if class_ not in self._predicates:
if controller is None:
controller = self._find_controller(class_)
else:
classes = self.request_controllers.get(controller, None)
if classes is None:
raise ValueError(f"Unknown request controller {controller!r}")
if class_ not in classes:
raise ValueError(f"Unknown request class {class_!r}")
download = self._download_predicate_data_generator(class_, controller)
predicates_data = yield from download
predicate_objects = self._parse_predicates_data(predicates_data)
self._predicates[class_] = predicate_objects
return self._predicates[class_]
[docs]
def get_predicates(self, class_, controller=None):
"""Get full predicate information for given request class, and cache
for subsequent calls.
"""
return self._run_event_generator(
self._get_predicates_generator(class_, controller)
)
def _parse_predicates_data(self, predicates_data):
predicate_objects = []
for field in predicates_data:
full_type = field["Type"]
type_match = type_re.match(full_type)
if not type_match:
raise ValueError(f"Couldn't parse field type '{full_type}'")
type_name = type_match.group(1)
field_name = field["Field"].lower()
nullable = field["Null"] == "YES"
default = field["Default"]
types = {
# Strings
"char": "str",
"varchar": "str",
"longtext": "str",
"text": "str",
# varbinary only used for 'file' request class, for the
# 'file_link' predicate.
"varbinary": "str",
# Integers
"bigint": "int",
"int": "int",
"tinyint": "int",
"smallint": "int",
"mediumint": "int",
# Floats
"decimal": "float",
"float": "float",
"double": "float",
# Date/Times
"date": "date",
"timestamp": "datetime",
"datetime": "datetime",
# Enum
"enum": "enum",
# Bytes
"longblob": "bytes",
}
if type_name not in types:
warnings.warn(
f"Unknown predicate type {type_name!r}",
UnknownPredicateTypeWarning,
)
predicate = Predicate(
name=field_name,
type_=types.get(type_name, type_name),
nullable=nullable,
default=default,
)
if type_name == "enum":
enum_match = enum_re.match(full_type)
if not enum_match:
raise ValueError(f"Couldn't parse enum type '{full_type}'")
# match.groups() doesn't work for repeating groups, use findall
predicate.values = tuple(re.findall(r"'(\w+)'", full_type))
predicate_objects.append(predicate)
return predicate_objects
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
self.client.close()
def __repr__(self):
r = ReprHelper(self)
r.parantheses = ("<", ">")
r.keyword_from_attr("identity")
return str(r)
class _ControllerProxy:
"""Proxies request class methods with a preset request controller."""
def __init__(self, client, controller):
# The client will cache _ControllerProxy instances, so only store
# a weak reference to it.
self.client = weakref.proxy(client)
self.controller = controller
def __getattr__(self, attr):
if attr not in self.client.request_controllers[self.controller]:
raise AttributeError(f"'{self!r}' object has no attribute '{attr}'")
function = partial(
self.client.generic_request, class_=attr, controller=self.controller
)
function.get_predicates = partial(
self.client.get_predicates, class_=attr, controller=self.controller
)
return function
def __repr__(self):
r = ReprHelper(self)
r.parantheses = ("<", ">")
r.keyword_from_attr("controller")
return str(r)
def get_predicates(self, class_):
"""Proxy ``get_predicates`` to client with stored request
controller.
"""
return self.client.get_predicates(class_=class_, controller=self.controller)
def _iter_lines_generator(response):
for line in response.iter_lines():
yield line.rstrip("\n")
def _iter_content_generator(response, decode_unicode):
"""Generator used to yield chunks for a streamed response."""
if decode_unicode:
it = response.iter_text()
else:
it = response.iter_bytes()
for chunk in it:
if decode_unicode:
# Replace CRLF newlines with LF, Python will handle
# platform specific newlines if written to file.
chunk = chunk.replace("\r\n", "\n")
# Chunk could be ['...\r', '\n...'], strip trailing \r
chunk = chunk.rstrip("\r")
yield chunk
def _raise_for_status(response):
"""Raise the `HTTPStatusError` if one occurred.
This is the :meth:`httpx.Response.raise_for_status` method, modified to add
the response from Space-Track, if given.
"""
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
message = exc.args[0]
spacetrack_error_msg = None
try:
json = response.json()
if isinstance(json, Mapping):
spacetrack_error_msg = json["error"]
except (ValueError, KeyError, httpx.ResponseNotRead):
pass
if not spacetrack_error_msg:
try:
spacetrack_error_msg = response.text
except httpx.ResponseNotRead:
pass
if spacetrack_error_msg:
message += "\nSpace-Track response:\n" + spacetrack_error_msg
raise httpx.HTTPStatusError(
message, request=exc.request, response=exc.response
) from exc