import warnings
__all__ = ['getBSPfromJPL', 'ephem_kernel', 'ephem_horizons']
[docs]def getBSPfromJPL(identifier, initial_date, final_date, email, directory='./'):
"""Downloads BSP files from JPL database.
BSP files, which have information to generate the ephemeris of the objects,
will be downloaded and named as (without spaces): '[identifier].bsp'.
Important
---------
It is not possible to download BSP files for Planets or Satellites.
Parameters
----------
identifier : `str`, `list`
Identifier of the object(s).
It can be the `name`, `number` or `SPK ID`.
It can also be a list of objects.
Examples: ``'2137295'``, ``'1999 RB216'``, ``'137295'``,
``['2137295', '136199', '1999 RC216', 'Chariklo']``.
initial_date : `str`
Date the bsp file is to begin, within span `1900-2100`.
Examples: ``'2003-02-01'``, ``'2003-3-5'``.
final_date : `str`
Date the bsp file is to end, within span [1900-2100].
Must be more than 32 days later than `initial_date`.
Examples: ``'2006-01-12'``, ``'2006-1-12'``.
email : `str`
User's e-mail contact address.
Required by JPL web service.
Example: ``username@user.domain.name``.
directory : `str`
Directory path to save the bsp files.
"""
import pathlib
import shutil
import requests
from datetime import datetime
date1 = datetime.strptime(initial_date, '%Y-%m-%d')
date2 = datetime.strptime(final_date, '%Y-%m-%d')
diff = date2 - date1
if diff.days <= 32:
raise ValueError('The [final_date] must be more than 32 days later than [initial_date]')
else:
if isinstance(identifier, str):
identifier = [identifier]
url_jpl = 'https://ssd.jpl.nasa.gov/x/smb_spk.cgi?OPTION=Make+SPK'
lim, opt, n = 10, 'y', len(identifier)
if n > lim:
parameters = {'OBJECT': identifier[0], 'START': date1.strftime('%Y-%b-%d'),
'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'}
t0 = datetime.now()
r = requests.get(url_jpl, params=parameters, stream=True)
tf = datetime.now()
bsp_format = r.headers['Content-Type']
if r.status_code == requests.codes.ok and bsp_format == 'application/download':
size0 = int(r.headers["Content-Length"]) / 1024 / 1024 # MB
print('Estimated time to download {} ({:.3f} MB) files: {}'.
format(n, n * size0, n * (tf - t0)))
opt = input('\nAre you sure? (y/n):')
else:
raise ValueError('It was not able to download the bsp file for object {}\n'.
format(identifier[0]))
if opt in ['y', 'Y', 'YES', 'yes']:
if directory != './':
path = pathlib.Path(directory)
if not path.exists():
raise ValueError('The directory {} does not exist!'.format(path))
directory += '/'
print("\nDownloading bsp file(s) ...\n")
m, size = 0, 0.0
failed = []
t0 = datetime.now()
for obj in identifier:
filename = obj.replace(' ', '') + '.bsp'
parameters = {'OBJECT': obj, 'START': date1.strftime('%Y-%b-%d'),
'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'}
r = requests.get(url_jpl, params=parameters, stream=True)
bsp_format = r.headers['Content-Type']
if r.status_code == requests.codes.ok and bsp_format == 'application/download':
size += int(r.headers["Content-Length"]) / 1024 / 1024
m += 1
with open(directory + filename, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
else:
failed.append(obj)
tf = datetime.now()
print("{} ({:.3f} MB) file(s) was/were downloaded".format(m, size))
print("Download time: {}".format(tf - t0))
if len(failed) > 0:
raise ValueError(
'It was not able to download the bsp files for next objects: {}'.format(sorted(failed)))
[docs]def ephem_kernel(time, target, observer, kernels, output='ephemeris'):
"""Calculates the ephemeris from kernel files.
Parameters
----------
time : `str`, `astropy.time.Time`
Reference instant to calculate ephemeris. It can be a string
in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object.
target : `str`
IAU (kernel) code of the target.
observer : `str`
IAU (kernel) code of the observer.
kernels : `list`, `str`
List of paths for all the kernels.
output : `str`
The output of data. ``ephemeris`` will output the observed position,
while ``vector`` will output the Cartesian state vector, without
light time correction.
Returns
-------
coord : `astropy.coordinates.SkyCoord`
ICRS coordinate of the target.
"""
import numpy as np
import astropy.units as u
import astropy.constants as const
import spiceypy as spice
from astropy.coordinates import SkyCoord
from astropy.time import Time
from sora.observer import Observer, Spacecraft
origins = {'geocenter': '399', 'barycenter': '0'}
location = origins.get(observer)
if not location and isinstance(observer, str):
location = observer
if isinstance(observer, (Observer, Spacecraft)):
location = str(getattr(observer, "spkid", None))
if not location:
raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.")
if output not in ['ephemeris', 'vector']:
raise ValueError("output must be 'ephemeris' or 'vector'")
if type(kernels) == str:
kernels = [kernels]
for kern in kernels:
spice.furnsh(kern)
time = Time(time)
t0 = Time('J2000', scale='tdb')
dt = (time - t0)
delt = 0 * u.s
# calculates vector Solar System Barycenter -> Observer
if isinstance(observer, (Observer, Spacecraft)):
spice.kclear() # necessary because observer.get_vector() may load different kernels
position1 = observer.get_vector(time=time, origin='barycenter')
for kern in kernels:
spice.furnsh(kern)
else:
position1 = spice.spkpos(location, dt.sec, 'J2000', 'NONE', '0')[0]
position1 = SkyCoord(*position1.T * u.km, representation_type='cartesian')
while True:
# calculates new time
tempo = dt - delt
# calculates vector Solar System Barycenter -> Object
position2 = spice.spkpos(target, tempo.sec, 'J2000', 'NONE', '0')[0]
position2 = SkyCoord(*position2.T * u.km, representation_type='cartesian')
position = position2.cartesian - position1.cartesian
# calculates new light time
delt = (position.norm() / const.c).decompose()
# if difference between new and previous light time is smaller than 0.001 sec, then continue.
if output == 'vector' or np.all(np.absolute(((dt - tempo) - delt).sec) < 0.001):
break
coord = SkyCoord(position, representation_type='cartesian')
spice.kclear()
if output == 'ephemeris':
coord = SkyCoord(ra=coord.spherical.lon, dec=coord.spherical.lat,
distance=coord.spherical.distance, obstime=time)
if not coord.isscalar and len(coord) == 1:
coord = coord[0]
return coord
[docs]def ephem_horizons(time, target, observer, id_type='smallbody', output='ephemeris'):
"""Calculates the ephemeris from Horizons.
Parameters
----------
time : `str`, `astropy.time.Time`
Reference instant to calculate ephemeris. It can be a string
in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object.
target : `str`
IAU (kernel) code of the target.
observer : `str`
IAU (kernel) code of the observer.
id_type : `str`
Type of target object options: ``smallbody``, ``majorbody`` (planets but
also anything that is not a small body), ``designation``, ``name``,
``asteroid_name``, ``comet_name``, ``id`` (Horizons id number), or
``smallbody`` (find the closest match under any id_type).
output : `str`
The output of data. ``ephemeris`` will output the observed position,
while ``vector`` will output the Cartesian state vector, without
light time correction.
Returns
-------
coord : `astropy.coordinates.SkyCoord`
ICRS coordinate of the target.
Notes
-----
If the interval of time is larger than 30 days or so, a timeout error may be raised.
The maximum interval will depend on the user connection.
"""
import astropy.units as u
from astroquery.jplhorizons import Horizons
from astropy.time import Time
from astropy.coordinates import SkyCoord
from sora.observer import Observer, Spacecraft
from scipy import interpolate
origins = {'geocenter': '@399', 'barycenter': '@0'}
location = origins.get(observer)
if not location and isinstance(observer, str):
location = observer
if isinstance(observer, (Observer, Spacecraft)):
location = f'{getattr(observer, "code", "")}@{getattr(observer, "spkid", "")}'
if not location:
raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.")
if output not in ['ephemeris', 'vector']:
raise ValueError("output must be 'ephemeris' or 'vector'")
time = Time(time)
time1 = getattr(time, {'ephemeris': 'utc', 'vector': 'tdb'}[output]).jd
if not time.isscalar and len(time) > 50:
step = '10m'
if time.max() - time.min() > 30 * u.day:
warnings.warn('Time interval may be too long. A timeout error may be raised.')
if time.max() - time.min() <= 1 * u.day:
step = '1m'
time2 = {'start': (time.min() - 10 * u.min).iso.split('.')[0],
'stop': (time.max() + 10 * u.min).iso.split('.')[0],
'step': step,
}
else:
time2 = time1
if getattr(observer, 'ephem', None) not in ['horizons', None]:
warnings.warn('Ephemeris using kernel for the observer and Horizons for the target is under construction. '
'We will use only Horizons.')
ob = Horizons(id=target, id_type=id_type, location=location, epochs=time2)
if output == 'ephemeris':
eph = ob.ephemerides(extra_precision=True)
obstime = Time(eph['datetime_jd'], format='jd', scale='utc')
pos = SkyCoord(eph['RA'], eph['DEC'], eph['delta'], frame='icrs', obstime=obstime)
else:
vec = ob.vectors(refplane='earth')
obstime = Time(vec['datetime_jd'], format='jd', scale='tdb')
pos = SkyCoord(*[vec[i] for i in ['x', 'y', 'z']] * u.AU, representation_type='cartesian', obstime=obstime)
if isinstance(time2, dict):
spl_x = interpolate.CubicSpline(obstime.jd, pos.cartesian.x.to(u.km))
spl_y = interpolate.CubicSpline(obstime.jd, pos.cartesian.y.to(u.km))
spl_z = interpolate.CubicSpline(obstime.jd, pos.cartesian.z.to(u.km))
pos = SkyCoord(x=spl_x(time1), y=spl_y(time1), z=spl_z(time1), unit=u.km, representation_type='cartesian')
if output == 'ephemeris':
pos = SkyCoord(ra=pos.spherical.lon, dec=pos.spherical.lat, distance=pos.spherical.distance)
if not pos.isscalar and len(pos) == 1:
pos = pos[0]
return pos