Source code for mujoco_warp._src.history

# Copyright 2026 The Newton Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import warp as wp

from mujoco_warp._src.types import MJ_MAXVAL
from mujoco_warp._src.types import MJ_MINVAL
from mujoco_warp._src.types import Data
from mujoco_warp._src.types import Model

wp.set_module_options({"enable_backward": False})


@wp.func
def _history_physical_index(cursor: int, n: int, logical: int) -> int:
  """Convert logical index (0=oldest, n-1=newest) to physical index."""
  return (cursor + 1 + logical) % n


@wp.func
def _history_find_index(
  # In:
  buf: wp.array2d[float],
  worldid: int,
  buf_offset: int,
  n: int,
  cursor: int,
  t: float,
) -> int:
  """Find logical index i such that times[i-1] < t <= times[i].

  Returns 0 if t <= times[oldest], n if t > times[newest].
  Uses circular binary search matching MuJoCo C historyFindIndex.
  """
  times_offset = buf_offset + 2

  oldest_phys = _history_physical_index(cursor, n, 0)
  newest_phys = _history_physical_index(cursor, n, n - 1)
  t_oldest = buf[worldid, times_offset + oldest_phys]
  t_newest = buf[worldid, times_offset + newest_phys]

  # before or at first element
  if t <= t_oldest:
    return 0

  # after last element
  if t > t_newest:
    return n

  # circular binary search: find smallest logical i such that times[phys(i)] >= t
  lo = int(0)
  hi = int(n - 1)
  while hi - lo > 1:
    mid = int((lo + hi) >> 1)
    mid_phys = _history_physical_index(cursor, n, mid)
    if buf[worldid, times_offset + mid_phys] < t:
      lo = mid
    else:
      hi = mid

  return hi


@wp.func
def _history_read_scalar(
  # In:
  buf: wp.array2d[float],
  worldid: int,
  buf_offset: int,
  n: int,
  t: float,
  interp: int,
) -> float:
  """Read a scalar value from history buffer at time t.

  interp: 0=zero-order-hold, 1=linear, 2=cubic (Catmull-Rom spline)
  """
  cursor = int(buf[worldid, buf_offset + 1])
  times_offset = buf_offset + 2
  values_offset = buf_offset + 2 + n

  oldest_phys = _history_physical_index(cursor, n, 0)
  newest_phys = _history_physical_index(cursor, n, n - 1)
  t_oldest = buf[worldid, times_offset + oldest_phys]
  t_newest = buf[worldid, times_offset + newest_phys]

  # extrapolate before oldest
  if t <= t_oldest + 1e-6:
    return buf[worldid, values_offset + oldest_phys]

  # extrapolate after newest
  if t >= t_newest - 1e-6:
    return buf[worldid, values_offset + newest_phys]

  # find bracketing index
  i = _history_find_index(buf, worldid, buf_offset, n, cursor, t)
  phys_i = _history_physical_index(cursor, n, i)

  # exact match
  if wp.abs(t - buf[worldid, times_offset + phys_i]) < 1e-6:
    return buf[worldid, values_offset + phys_i]

  phys_lo = _history_physical_index(cursor, n, i - 1)
  phys_hi = phys_i

  # zero-order hold
  if interp == 0:
    return buf[worldid, values_offset + phys_lo]

  dt = buf[worldid, times_offset + phys_hi] - buf[worldid, times_offset + phys_lo]
  alpha = (t - buf[worldid, times_offset + phys_lo]) / dt
  v_lo = buf[worldid, values_offset + phys_lo]
  v_hi = buf[worldid, values_offset + phys_hi]

  # linear interpolation
  if interp == 1:
    return v_lo + alpha * (v_hi - v_lo)

  # cubic spline interpolation (Catmull-Rom)
  alpha2 = alpha * alpha
  alpha3 = alpha2 * alpha
  h00 = 2.0 * alpha3 - 3.0 * alpha2 + 1.0
  h10 = alpha3 - 2.0 * alpha2 + alpha
  h01 = -2.0 * alpha3 + 3.0 * alpha2
  h11 = alpha3 - alpha2

  # finite-differenced Catmull-Rom slopes, 0 at endpoints
  m_lo = 0.0
  if i > 1:
    phys_lo_prev = _history_physical_index(cursor, n, i - 2)
    dt_lo = buf[worldid, times_offset + phys_hi] - buf[worldid, times_offset + phys_lo_prev]
    m_lo = (v_hi - buf[worldid, values_offset + phys_lo_prev]) / dt_lo

  m_hi = 0.0
  if i < n - 1:
    phys_hi_next = _history_physical_index(cursor, n, i + 1)
    dt_hi = buf[worldid, times_offset + phys_hi_next] - buf[worldid, times_offset + phys_lo]
    m_hi = (buf[worldid, values_offset + phys_hi_next] - v_lo) / dt_hi

  return h00 * v_lo + h10 * dt * m_lo + h01 * v_hi + h11 * dt * m_hi


@wp.func
def _history_read_vector(
  # In:
  adr: int,
  buf: wp.array2d[float],
  worldid: int,
  buf_offset: int,
  n: int,
  dim: int,
  t: float,
  interp: int,
  # Data out:
  sensordata_out: wp.array2d[float],
) -> int:
  """Read a vector value from history buffer at time t into sensordata.

  Returns 1 on success (value written to sensordata).
  interp: 0=zero-order-hold, 1=linear, 2=cubic (Catmull-Rom spline)
  """
  cursor = int(buf[worldid, buf_offset + 1])
  times_offset = buf_offset + 2
  values_offset = buf_offset + 2 + n

  oldest_phys = _history_physical_index(cursor, n, 0)
  newest_phys = _history_physical_index(cursor, n, n - 1)
  t_oldest = buf[worldid, times_offset + oldest_phys]
  t_newest = buf[worldid, times_offset + newest_phys]

  # extrapolate before oldest: copy oldest
  if t <= t_oldest + 1e-6:
    for d in range(dim):
      sensordata_out[worldid, adr + d] = buf[worldid, values_offset + oldest_phys * dim + d]
    return 1

  # extrapolate after newest: copy newest
  if t >= t_newest - 1e-6:
    for d in range(dim):
      sensordata_out[worldid, adr + d] = buf[worldid, values_offset + newest_phys * dim + d]
    return 1

  # find bracketing index
  i = _history_find_index(buf, worldid, buf_offset, n, cursor, t)
  phys_i = _history_physical_index(cursor, n, i)

  # exact match
  if wp.abs(t - buf[worldid, times_offset + phys_i]) < 1e-6:
    for d in range(dim):
      sensordata_out[worldid, adr + d] = buf[worldid, values_offset + phys_i * dim + d]
    return 1

  phys_lo = _history_physical_index(cursor, n, i - 1)
  phys_hi = phys_i

  # zero-order hold
  if interp == 0:
    for d in range(dim):
      sensordata_out[worldid, adr + d] = buf[worldid, values_offset + phys_lo * dim + d]
    return 1

  dt = buf[worldid, times_offset + phys_hi] - buf[worldid, times_offset + phys_lo]
  alpha = (t - buf[worldid, times_offset + phys_lo]) / dt

  # linear interpolation
  if interp == 1:
    for d in range(dim):
      v_lo = buf[worldid, values_offset + phys_lo * dim + d]
      v_hi = buf[worldid, values_offset + phys_hi * dim + d]
      sensordata_out[worldid, adr + d] = v_lo + alpha * (v_hi - v_lo)
    return 1

  # cubic spline interpolation (Catmull-Rom)
  alpha2 = alpha * alpha
  alpha3 = alpha2 * alpha
  h00 = 2.0 * alpha3 - 3.0 * alpha2 + 1.0
  h10 = alpha3 - 2.0 * alpha2 + alpha
  h01 = -2.0 * alpha3 + 3.0 * alpha2
  h11 = alpha3 - alpha2

  for d in range(dim):
    v_lo = buf[worldid, values_offset + phys_lo * dim + d]
    v_hi = buf[worldid, values_offset + phys_hi * dim + d]

    # finite-differenced Catmull-Rom slopes, 0 at endpoints
    m_lo = 0.0
    if i > 1:
      phys_lo_prev = _history_physical_index(cursor, n, i - 2)
      dt_lo = buf[worldid, times_offset + phys_hi] - buf[worldid, times_offset + phys_lo_prev]
      m_lo = (v_hi - buf[worldid, values_offset + phys_lo_prev * dim + d]) / dt_lo

    m_hi = 0.0
    if i < n - 1:
      phys_hi_next = _history_physical_index(cursor, n, i + 1)
      dt_hi = buf[worldid, times_offset + phys_hi_next] - buf[worldid, times_offset + phys_lo]
      m_hi = (buf[worldid, values_offset + phys_hi_next * dim + d] - v_lo) / dt_hi

    sensordata_out[worldid, adr + d] = h00 * v_lo + h10 * dt * m_lo + h01 * v_hi + h11 * dt * m_hi
  return 1


@wp.func
def _history_insert_scalar(
  # In:
  worldid: int,
  buf_offset: int,
  n: int,
  t: float,
  value: float,
  # Out:
  buf_out: wp.array2d[float],
):
  """Insert a scalar value into history buffer at time t."""
  cursor = int(buf_out[worldid, buf_offset + 1])
  times_offset = buf_offset + 2
  values_offset = buf_offset + 2 + n

  i = _history_find_index(buf_out, worldid, buf_offset, n, cursor, t)

  # exact match
  if i < n:
    phys_i = _history_physical_index(cursor, n, i)
    if wp.abs(t - buf_out[worldid, times_offset + phys_i]) < 1e-6:
      buf_out[worldid, values_offset + phys_i] = value
      return

  # older than oldest: replace oldest
  if i == 0:
    oldest_phys = _history_physical_index(cursor, n, 0)
    buf_out[worldid, times_offset + oldest_phys] = t
    buf_out[worldid, values_offset + oldest_phys] = value
    return

  # newer than newest: advance cursor
  if i == n:
    cursor = (cursor + 1) % n
    buf_out[worldid, buf_offset + 1] = float(cursor)
    buf_out[worldid, times_offset + cursor] = t
    buf_out[worldid, values_offset + cursor] = value
    return

  # out-of-order: shift [1, i-1] left, insert at i-1
  for j in range(i - 1):
    src_phys = _history_physical_index(cursor, n, j + 1)
    dst_phys = _history_physical_index(cursor, n, j)
    buf_out[worldid, times_offset + dst_phys] = buf_out[worldid, times_offset + src_phys]
    buf_out[worldid, values_offset + dst_phys] = buf_out[worldid, values_offset + src_phys]
  insert_phys = _history_physical_index(cursor, n, i - 1)
  buf_out[worldid, times_offset + insert_phys] = t
  buf_out[worldid, values_offset + insert_phys] = value


@wp.func
def _history_insert_vector(
  # In:
  worldid: int,
  buf_offset: int,
  n: int,
  dim: int,
  t: float,
  src: wp.array2d[float],
  src_adr: int,
  # Out:
  buf_out: wp.array2d[float],
):
  """Insert a vector value from src[worldid, src_adr:src_adr+dim] into history buffer at time t."""
  cursor = int(buf_out[worldid, buf_offset + 1])
  times_offset = buf_offset + 2
  values_offset = buf_offset + 2 + n

  i = _history_find_index(buf_out, worldid, buf_offset, n, cursor, t)

  slot_phys = -1

  # exact match
  if i < n:
    phys_i = _history_physical_index(cursor, n, i)
    if wp.abs(t - buf_out[worldid, times_offset + phys_i]) < 1e-6:
      slot_phys = phys_i

  if slot_phys < 0:
    if i == 0:
      # older than oldest: replace oldest
      slot_phys = _history_physical_index(cursor, n, 0)
      buf_out[worldid, times_offset + slot_phys] = t
    elif i == n:
      # newer than newest: advance cursor
      cursor = (cursor + 1) % n
      buf_out[worldid, buf_offset + 1] = float(cursor)
      slot_phys = cursor
      buf_out[worldid, times_offset + slot_phys] = t
    else:
      # out-of-order: shift [1, i-1] left, insert at i-1
      for j in range(i - 1):
        src_phys = _history_physical_index(cursor, n, j + 1)
        dst_phys = _history_physical_index(cursor, n, j)
        buf_out[worldid, times_offset + dst_phys] = buf_out[worldid, times_offset + src_phys]
        for d in range(dim):
          buf_out[worldid, values_offset + dst_phys * dim + d] = buf_out[worldid, values_offset + src_phys * dim + d]
      slot_phys = _history_physical_index(cursor, n, i - 1)
      buf_out[worldid, times_offset + slot_phys] = t

  # copy values
  for d in range(dim):
    buf_out[worldid, values_offset + slot_phys * dim + d] = src[worldid, src_adr + d]


@wp.kernel
def _read_ctrl_delayed_kernel(
  # Model:
  actuator_history: wp.array[wp.vec2i],
  actuator_historyadr: wp.array[int],
  actuator_delay: wp.array[float],
  # Data in:
  time_in: wp.array[float],
  history_in: wp.array2d[float],
  ctrl_in: wp.array2d[float],
  # Data out:
  ctrl_out: wp.array2d[float],
):
  """Read delayed ctrl for each actuator."""
  worldid, uid = wp.tid()

  hist = actuator_history[uid]
  nsample = hist[0]
  delay = actuator_delay[uid]

  if nsample == 0 or delay == 0.0:
    # no delay: direct copy
    ctrl_out[worldid, uid] = ctrl_in[worldid, uid]
  else:
    interp = hist[1]
    buf_offset = actuator_historyadr[uid]
    t = time_in[worldid] - delay
    ctrl_out[worldid, uid] = _history_read_scalar(history_in, worldid, buf_offset, nsample, t, interp)


@wp.kernel
def _insert_ctrl_history_kernel(
  # Model:
  actuator_history: wp.array[wp.vec2i],
  actuator_historyadr: wp.array[int],
  # Data in:
  time_in: wp.array[float],
  ctrl_in: wp.array2d[float],
  # Data out:
  history_out: wp.array2d[float],
):
  """Insert current ctrl into history buffers."""
  worldid, uid = wp.tid()

  hist = actuator_history[uid]
  nsample = hist[0]
  if nsample == 0:
    return

  buf_offset = actuator_historyadr[uid]
  t = time_in[worldid]
  value = ctrl_in[worldid, uid]
  _history_insert_scalar(worldid, buf_offset, nsample, t, value, history_out)


@wp.kernel
def _insert_sensor_history_stage(
  # Model:
  sensor_dim: wp.array[int],
  sensor_adr: wp.array[int],
  sensor_history: wp.array[wp.vec2i],
  sensor_historyadr: wp.array[int],
  sensor_delay: wp.array[float],
  sensor_interval: wp.array[wp.vec2],
  # Data in:
  time_in: wp.array[float],
  sensordata_in: wp.array2d[float],
  # In:
  sensor_ids: wp.array[int],
  # Data out:
  history_out: wp.array2d[float],
):
  """Insert current sensor values into history buffers for specific sensor IDs."""
  worldid, idx = wp.tid()
  sid = sensor_ids[idx]

  hist = sensor_history[sid]
  nsample = hist[0]
  if nsample == 0:
    return

  buf_offset = sensor_historyadr[sid]
  dim = sensor_dim[sid]
  interval_val = sensor_interval[sid]
  period = interval_val[0]
  t = time_in[worldid]

  if period > 0.0:
    # interval mode: check if condition is satisfied
    time_prev = history_out[worldid, buf_offset]  # user slot stores time_prev
    if time_prev + period <= t:
      # advance time_prev by exact period
      history_out[worldid, buf_offset] = time_prev + period
      # insert sensor value
      _history_insert_vector(worldid, buf_offset, nsample, dim, t, sensordata_in, sensor_adr[sid], history_out)
  else:
    _history_insert_vector(worldid, buf_offset, nsample, dim, t, sensordata_in, sensor_adr[sid], history_out)


@wp.kernel
def _apply_sensor_delay_kernel(
  # Model:
  sensor_dim: wp.array[int],
  sensor_adr: wp.array[int],
  sensor_history: wp.array[wp.vec2i],
  sensor_historyadr: wp.array[int],
  sensor_delay: wp.array[float],
  sensor_interval: wp.array[wp.vec2],
  # Data in:
  time_in: wp.array[float],
  history_in: wp.array2d[float],
  # In:
  sensor_ids: wp.array[int],
  # Data out:
  sensordata_out: wp.array2d[float],
):
  """Apply delay/interval logic for sensors after computation.

  TODO(team): Revisit always-compute decision for computationally expensive sensors
  with interval/period (e.g., raytracers)
  """
  worldid, idx = wp.tid()
  sid = sensor_ids[idx]

  hist = sensor_history[sid]
  nsample = hist[0]
  if nsample <= 0:
    return

  delay = sensor_delay[sid]
  dim = sensor_dim[sid]
  interp = hist[1]
  buf_offset = sensor_historyadr[sid]
  t = time_in[worldid]

  if delay > 0.0:
    # delay > 0: read delayed value from buffer
    _history_read_vector(sensor_adr[sid], history_in, worldid, buf_offset, nsample, dim, t - delay, interp, sensordata_out)
  else:
    # interval-only (delay == 0, interval > 0): check interval condition
    interval_val = sensor_interval[sid]
    period = interval_val[0]
    if period > 0.0:
      time_prev = history_in[worldid, buf_offset]  # user slot
      if time_prev + period > t:
        # interval condition not satisfied: read from buffer
        _history_read_vector(sensor_adr[sid], history_in, worldid, buf_offset, nsample, dim, t, interp, sensordata_out)
      # else: interval condition satisfied, keep computed value


def read_ctrl_delayed(m: Model, d: Data, ctrl: wp.array2d[float]):
  """Read delayed ctrl values for all actuators."""
  if m.nhistory == 0:
    wp.copy(ctrl, d.ctrl)
    return

  wp.launch(
    _read_ctrl_delayed_kernel,
    dim=(d.nworld, m.nu),
    inputs=[
      m.actuator_history,
      m.actuator_historyadr,
      m.actuator_delay,
      d.time,
      d.history,
      d.ctrl,
    ],
    outputs=[ctrl],
  )


def insert_ctrl_history(m: Model, d: Data):
  """Insert current ctrl values into history buffers."""
  if m.nhistory == 0 or m.nu == 0:
    return

  wp.launch(
    _insert_ctrl_history_kernel,
    dim=(d.nworld, m.nu),
    inputs=[
      m.actuator_history,
      m.actuator_historyadr,
      d.time,
      d.ctrl,
    ],
    outputs=[d.history],
  )


def apply_sensor_delay(m: Model, d: Data, sensorid: wp.array[int]):
  """Apply delay/interval logic for given sensors after computation.

  Matches MuJoCo C architecture where the delayed read (mj_sensorPos) occurs
  before the fresh value insert (mj_advance). We save fresh sensordata,
  overwrite with delayed values, then insert the saved fresh values.
  """
  if m.nhistory == 0 or sensorid.shape[0] == 0:
    return

  # Save fresh sensordata before delay overwrite
  fresh_sensordata = wp.empty_like(d.sensordata)
  wp.copy(fresh_sensordata, d.sensordata)

  # Read delayed values from buffer → overwrite sensordata
  wp.launch(
    _apply_sensor_delay_kernel,
    dim=(d.nworld, sensorid.shape[0]),
    inputs=[
      m.sensor_dim,
      m.sensor_adr,
      m.sensor_history,
      m.sensor_historyadr,
      m.sensor_delay,
      m.sensor_interval,
      d.time,
      d.history,
      sensorid,
    ],
    outputs=[d.sensordata],
  )

  # Insert saved fresh sensor values into history buffers
  wp.launch(
    _insert_sensor_history_stage,
    dim=(d.nworld, sensorid.shape[0]),
    inputs=[
      m.sensor_dim,
      m.sensor_adr,
      m.sensor_history,
      m.sensor_historyadr,
      m.sensor_delay,
      m.sensor_interval,
      d.time,
      fresh_sensordata,
      sensorid,
    ],
    outputs=[d.history],
  )


@wp.kernel
def _read_ctrl_kernel(
  # Model:
  actuator_history: wp.array[wp.vec2i],
  actuator_historyadr: wp.array[int],
  actuator_delay: wp.array[float],
  # Data in:
  time_in: wp.array[float],
  history_in: wp.array2d[float],
  ctrl_in: wp.array2d[float],
  # In:
  uid: int,
  interp: int,
  # Out:
  result_out: wp.array[float],
):
  """Read delayed ctrl for 1 actuator across all worlds."""
  worldid = wp.tid()

  hist = actuator_history[uid]
  nsample = hist[0]

  if nsample == 0:
    result_out[worldid] = ctrl_in[worldid, uid]
  else:
    interp_val = interp
    if interp_val < 0:
      interp_val = hist[1]
    delay = actuator_delay[uid]
    buf_offset = actuator_historyadr[uid]
    t = time_in[worldid] - delay
    result_out[worldid] = _history_read_scalar(history_in, worldid, buf_offset, nsample, t, interp_val)


[docs] def read_ctrl( m: Model, d: Data, ctrlid: int, time: wp.array[float], interp: int, result: wp.array2d[float], ): """Read delayed ctrl for 1 actuator across all worlds. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. ctrlid: actuator index. time: query time per world (nworld,). interp: interpolation mode (-1=model default, 0=ZOH, 1=linear, 2=cubic). result: output buffer (nworld,). """ wp.launch( _read_ctrl_kernel, dim=(d.nworld,), inputs=[ m.actuator_history, m.actuator_historyadr, m.actuator_delay, time, d.history, d.ctrl, ctrlid, interp, ], outputs=[result], )
@wp.kernel def _read_sensor_kernel( # Model: sensor_dim: wp.array[int], sensor_adr: wp.array[int], sensor_history: wp.array[wp.vec2i], sensor_historyadr: wp.array[int], sensor_delay: wp.array[float], # Data in: time_in: wp.array[float], history_in: wp.array2d[float], sensordata_in: wp.array2d[float], # In: sid: int, interp: int, # Out: result_out: wp.array2d[float], ): """Read delayed sensor for 1 sensor across all worlds.""" worldid = wp.tid() hist = sensor_history[sid] nsample = hist[0] dim = sensor_dim[sid] adr = sensor_adr[sid] if nsample == 0: for i in range(dim): result_out[worldid, i] = sensordata_in[worldid, adr + i] else: interp_val = interp if interp_val < 0: interp_val = hist[1] delay = sensor_delay[sid] buf_offset = sensor_historyadr[sid] t = time_in[worldid] - delay _history_read_vector( 0, # write to result_out starting at index 0 (not global sensor adr) history_in, worldid, buf_offset, nsample, dim, t, interp_val, result_out, )
[docs] def read_sensor( m: Model, d: Data, sensorid: int, time: wp.array[float], interp: int, result: wp.array2d[float], ): """Read delayed sensor for 1 sensor across all worlds. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. sensorid: sensor index. time: query time per world (nworld,). interp: interpolation mode (-1=model default, 0=ZOH, 1=linear, 2=cubic). result: output buffer (nworld, dim). """ wp.launch( _read_sensor_kernel, dim=(d.nworld,), inputs=[ m.sensor_dim, m.sensor_adr, m.sensor_history, m.sensor_historyadr, m.sensor_delay, time, d.history, d.sensordata, sensorid, interp, ], outputs=[result], )
@wp.kernel def _init_ctrl_history_kernel( # kernel_analyzer: off # Model: actuator_history: wp.array[wp.vec2i], actuator_historyadr: wp.array[int], # In: ctrlid: int, times: wp.array[float], values: wp.array2d[float], has_times: int, # Data out: history_out: wp.array2d[float], # kernel_analyzer: on ): """Initialize history buffer for 1 actuator across all worlds.""" worldid = wp.tid() nsample = actuator_history[ctrlid][0] buf_offset = actuator_historyadr[ctrlid] # preserve user slot user = history_out[worldid, buf_offset] # cursor = 0 (samples in order, newest at index nsample-1) history_out[worldid, buf_offset + 1] = float(nsample - 1) times_offset = buf_offset + 2 values_offset = buf_offset + 2 + nsample for i in range(nsample): if has_times != 0: history_out[worldid, times_offset + i] = times[i] else: history_out[worldid, times_offset + i] = -MJ_MAXVAL history_out[worldid, values_offset + i] = values[worldid, i] # restore user slot history_out[worldid, buf_offset] = user
[docs] def init_ctrl_history( m: Model, d: Data, ctrlid: int, times: wp.array[float], values: wp.array2d[float], ): """Initialize history buffer for 1 actuator across all worlds. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. ctrlid: actuator index. times: timestamps or None (nsample,). values: ctrl values (nworld, nsample). Raises: ValueError: If times are not strictly increasing. """ has_times = 0 if times is None else 1 if times is not None: t_np = times.numpy() for i in range(len(t_np) - 1): if t_np[i + 1] - t_np[i] < MJ_MINVAL: raise ValueError(f"times must be strictly increasing, got times[{i}]={t_np[i]} >= times[{i + 1}]={t_np[i + 1]}") if times is None: times = wp.empty(0, dtype=float) wp.launch( _init_ctrl_history_kernel, dim=(d.nworld,), inputs=[ m.actuator_history, m.actuator_historyadr, ctrlid, times, values, has_times, ], outputs=[d.history], )
# kernel_analyzer: off @wp.kernel def _init_sensor_history_kernel( # Model: sensor_history: wp.array[wp.vec2i], sensor_historyadr: wp.array[int], sensor_dim_arr: wp.array[int], # In: sensorid: int, times: wp.array[float], values: wp.array2d[float], phase: wp.array[float], has_times: int, # Data out: history_out: wp.array2d[float], ): # kernel_analyzer: on """Initialize history buffer for 1 sensor across all worlds.""" worldid = wp.tid() nsample = sensor_history[sensorid][0] dim = sensor_dim_arr[sensorid] buf_offset = sensor_historyadr[sensorid] # set user slot (phase = last computation time for interval sensors) history_out[worldid, buf_offset] = phase[worldid] # cursor = 0 (samples in order, newest at index nsample-1) history_out[worldid, buf_offset + 1] = float(nsample - 1) times_offset = buf_offset + 2 values_offset = buf_offset + 2 + nsample for i in range(nsample): if has_times != 0: history_out[worldid, times_offset + i] = times[i] else: history_out[worldid, times_offset + i] = -MJ_MAXVAL for j in range(dim): history_out[worldid, values_offset + i * dim + j] = values[worldid, i * dim + j]
[docs] def init_sensor_history( m: Model, d: Data, sensorid: int, times: wp.array[float], values: wp.array2d[float], phase: wp.array[float], ): """Initialize history buffer for 1 sensor across all worlds. Args: m: The model containing kinematic and dynamic information. d: The data object containing the current state and output arrays. sensorid: sensor index. times: timestamps or None (nsample,). values: sensor values (nworld, nsample * dim). phase: user slot value per world (nworld,). Raises: ValueError: If times are not strictly increasing. """ has_times = 0 if times is None else 1 if times is not None: t_np = times.numpy() for i in range(len(t_np) - 1): if t_np[i + 1] - t_np[i] < MJ_MINVAL: raise ValueError(f"times must be strictly increasing, got times[{i}]={t_np[i]} >= times[{i + 1}]={t_np[i + 1]}") if times is None: times = wp.empty(0, dtype=float) wp.launch( _init_sensor_history_kernel, dim=(d.nworld,), inputs=[ m.sensor_history, m.sensor_historyadr, m.sensor_dim, sensorid, times, values, phase, has_times, ], outputs=[d.history], )