Source code for gala.dynamics.mockstream.core
import astropy.units as u
import numpy as np
from scipy.spatial.transform import Rotation
from ...io import quantity_from_hdf5, quantity_to_hdf5
from .. import PhaseSpacePosition
__all__ = ["MockStream"]
[docs]
class MockStream(PhaseSpacePosition):
@u.quantity_input(release_time=u.Myr)
def __init__(
self, pos, vel=None, frame=None, release_time=None, lead_trail=None, copy=True
):
super().__init__(pos=pos, vel=vel, frame=frame, copy=copy)
if release_time is not None:
release_time = u.Quantity(release_time)
if len(release_time) != self.pos.shape[0]:
msg = (
"shape mismatch: input release time array "
"must have the same shape as the input "
"phase-space data, minus the component "
f"dimension. expected {self.pos.shape[0]}, got {len(release_time)}"
)
raise ValueError(msg)
self.release_time = release_time
if lead_trail is not None:
lead_trail = np.array(lead_trail)
if len(lead_trail) != self.pos.shape[0]:
msg = (
"shape mismatch: input leading/trailing array "
"must have the same shape as the input "
"phase-space data, minus the component "
f"dimension. expected {self.pos.shape[0]}, got {len(lead_trail)}"
)
raise ValueError(msg)
self.lead_trail = lead_trail
[docs]
def rotate_to_progenitor_plane(self, prog_w):
"""Rotate the mock stream to align with the progenitor's orbital plane
This method transforms the mock stream into a new coordinate system where the
progenitor's orbital plane is aligned with the xy-plane, the stream and
progenitor are centered at (0, 0), and the stream primarily extends in the x
direction (leading tail at positive x and trailing tail at negative x). This is
useful for visualizing streams in their natural orbital plane.
Parameters
----------
prog_w : `~gala.dynamics.PhaseSpacePosition`
The phase-space position of the progenitor at the same time as the stream.
This defines the center and orientation of the rotated coordinate system.
Returns
-------
rotated_stream : `~gala.dynamics.MockStream`
A new MockStream instance with positions and velocities transformed to the
rotated coordinate system. The progenitor is at the origin with velocity
aligned along the positive x-axis. The release times and lead/trail flags
are preserved from the original stream.
"""
if prog_w.shape == ():
pass # scalar, good
elif prog_w.shape == (1,):
prog_w = prog_w[0]
else:
raise ValueError(
"prog_w must be a single phase-space position, not an array of "
"positions"
)
lon = prog_w.spherical.lon.to_value(u.rad)
lat = prog_w.spherical.lat.to_value(u.rad)
R1 = Rotation.from_euler("z", -lon)
R2 = Rotation.from_euler("y", lat)
Rtmp = R2.as_matrix() @ R1.as_matrix()
vtmp = Rtmp @ prog_w.v_xyz
R3 = Rotation.from_euler("x", -np.arctan2(vtmp[2], vtmp[1]).value)
R4 = Rotation.from_euler("z", -np.pi / 2)
R = R4.as_matrix() @ R3.as_matrix() @ Rtmp
prog_rot = PhaseSpacePosition(prog_w.data.transform(R))
R_final = Rotation.from_euler(
"z", -np.arctan2(prog_rot.v_y, prog_rot.v_x)
).as_matrix()
tmp = PhaseSpacePosition(self.data.transform(R))
return MockStream(
pos=R_final @ (tmp.xyz - prog_rot.xyz[:, None]),
vel=R_final @ tmp.v_xyz,
release_time=self.release_time,
lead_trail=self.lead_trail,
)
# ------------------------------------------------------------------------
# Input / output
#
[docs]
def to_hdf5(self, f):
"""Serialize this object to an HDF5 file.
Requires ``h5py``.
Parameters
----------
f : str, :class:`h5py.File`
Either the filename or an open HDF5 file.
"""
f = super().to_hdf5(f)
# if self.potential is not None:
# import yaml
# from ..potential.potential.io import to_dict
# f['potential'] = yaml.dump(to_dict(self.potential)).encode('utf-8')
if self.release_time:
quantity_to_hdf5(f, "release_time", self.release_time)
if self.lead_trail is not None:
f["lead_trail"] = self.lead_trail.astype("S1") # TODO HACK
return f
[docs]
@classmethod
def from_hdf5(cls, f):
"""Load an object from an HDF5 file.
Requires ``h5py``.
Parameters
----------
f : str, :class:`h5py.File`
Either the filename or an open HDF5 file.
"""
# TODO: this is duplicated code from PhaseSpacePosition
if isinstance(f, str):
import h5py
f = h5py.File(f, mode="r")
obj = PhaseSpacePosition.from_hdf5(f)
t = quantity_from_hdf5(f["release_time"]) if "release_time" in f else None
lt = f["lead_trail"][:] if "lead_trail" in f else None
return cls(
pos=obj.pos, vel=obj.vel, release_time=t, lead_trail=lt, frame=obj.frame
)