Common api reference

class nasdaq_protocols.common.message_queue.DispatchableMessageQueue(session_id, on_msg_coro=None)

Bases: Stoppable

A message queue that dispatches messages to a coro.

Parameters:
  • session_id (Any)

  • on_msg_coro (Callable[[Any], Awaitable[None]])

async put(msg)

put an entry into the queue. :param msg: Any

Parameters:

msg (Any)

Return type:

None

async get()

get an entry from the queue.

if the underlying queue contains entries, the first element is fetched and returned, if the queue contains no entries, a coro is awaited, which will wait until queue contains an entry.

Raises:

StateError - if the dispatcher is already in progress EndOfQueue - if queue is stopped the underlying queue contains no entries.

put_nowait(msg)

put an entry into the queue. :param msg: Any

Parameters:

msg (Any)

get_nowait()

get an entry from the queue. This is a non-blocking call. :return: entry from the queue or None if queue is empty.

Return type:

Any | None

pause_dispatching()

This is a context manager that pauses the dispatcher:

queue = DispatchableMessageQueue(session_id, on_msg_coro)

queue.get()  # will raise an exception

async with queue.pause_dispatching():
     queue.get()  # will not raise an exception
is_dispatching()

Check is message queue is actively dispatching.

Return type:

bool

start_dispatching(on_msg_coro)

Start dispatching messages from the queue to the coro.

Parameters:

on_msg_coro (Callable[[Any], Awaitable[None]])

Return type:

None

async stop()

Stop the queue.

Return type:

None

is_stopped()
Returns:

True if the queue is stopped.

Return type:

bool

class nasdaq_protocols.common.session.HeartbeatMonitor(session_id, interval, on_no_activity_coro, *, stop_when_no_activity=True, tolerate_missed_heartbeats=1)

Bases: Stoppable

Monitor that trips the on_no_activity_coro if no activity is detected.

Currently, activity is externally signalled by calling the ping method.

Parameters:
  • session_id (Any) – The session id.

  • interval (float) – interval in seconds at which the monitor checks for activity.

  • on_no_activity_coro (Callable[[], Coroutine]) – coroutine to be called when no activity is detected.

  • stop_when_no_activity (bool) – If True, the monitor stops when no activity is detected.

  • tolerate_missed_heartbeats (int) – number of missed heartbeats to tolerate.

ping()

Ping the monitor.

Return type:

None

is_running()

Returns True if the monitor is running.

Return type:

bool

async stop()

Stop the monitor.

Return type:

None

class nasdaq_protocols.common.session.Reader(session_id, on_msg_coro, on_close_coro)

Bases: Stoppable

Abstract Base class for readers.

A reader is responsible for parsing the received data from the transport and dispatching it to the on_msg_coro.

Parameters:
  • session_id (Any) – The session id.

  • on_msg_coro (Callable[[Any], Coroutine]) – coroutine to be called for every message parsed.

  • on_close_coro (Callable[[], Coroutine]) – coroutine to be called when the reader detects end of session.

abstract async on_data(data)

Called when data is received from the transport.

Parameters:

data (bytes)

class nasdaq_protocols.common.session.AsyncSession(*, session_id, reader_factory, on_msg_coro=None, on_close_coro=None, dispatch_on_connect=True)

Bases: Protocol, ABC, Generic[T]

Abstract base class for async sessions.

Once the transport is available, the session creates a new reader using the reader_factory and starts parsing the incoming bytes.

By default, the session starts in a dispatching mode, meaning the incoming messages are dispatched to the on_msg_coro. This can be changed by setting dispatch_on_connect=False.

Parameters:
  • session_id (SessionId) – The session id.

  • reader_factory (Callable[[Any, Callable[[Any], Coroutine], Callable[[], Coroutine]], Reader]) – A callable that returns a reader.

  • on_msg_coro (Callable[[Any], Coroutine]) – coroutine to be called when a message is received.

  • on_close_coro (Callable[[], Coroutine]) – coroutine to be called when the session is closed.

  • dispatch_on_connect (bool) – If True, the session starts with dispatching once connected.

async receive_msg()

Receive a message from the peer. This is a blocking call. This call blocks until a new message is available.

If the session is dispatching messages, then this call raises an exception.

Return Any:

The message received.

Return type:

Type[T]

receive_msg_nowait()

Receive a message from the peer. This is a non-blocking call.

Return Any:

The message received.

Return type:

Type[T] | None

is_active()

Returns True if the session is not closed or in closing state.

Return type:

bool

is_closed()

Returns True if the session is closed. :return:

Return type:

bool

initiate_close()

Initiate close of the session. An asynchronous task is created which will close the session and all its associates.

Poll is_closed to check if the session is closed or use the on_close_coro callback to be notified when the session is closed.

Return type:

None

async close()

Close the session, the session cannot be used after this call.

start_heartbeats(local_hb_interval, remote_hb_interval)

Starts the heartbeats for the session.

  • if the remote failed heartbeats, then the session is closed.

  • if the local heartbeat timer expires, then send_heartbeat is called.

Parameters:
  • local_hb_interval (int | float)

  • remote_hb_interval (int | float)

start_dispatching()

By default, the session starts with dispatching switched-on.

If, the session is created with dispatching-off, then at any point in time during the lifetime of this session, dispatching can be switched-on by calling this method.

abstract send_msg(msg)

Send a message to the peer. :param msg: Any message that is serializable.

Parameters:

msg (Serializable[T])

Return type:

None

abstract async send_heartbeat()

Callback to send a heartbeat to the peer. :meta private:

class nasdaq_protocols.common.session.SessionId(host='nohost', port=0)

Bases: object

A basic session id.

Parameters:
  • host (str)

  • port (int)

set_transport(transport)

Once the transport is available, the host and port are updated.

Parameters:

transport (Transport)

Return type:

None

class nasdaq_protocols.common.types.Serializable

Bases: ABC, Generic[T]

Abstract Base class for serializable objects.

abstract to_bytes()

pack the object to binary format.

Return type:

tuple[int, bytes]

abstract classmethod from_bytes(bytes_)

unpack the object from binary format.

Parameters:

bytes_ (bytes)

Return type:

tuple[int, Type[T]]

class nasdaq_protocols.common.types.Stoppable

Bases: ABC

exception nasdaq_protocols.common.types.StateError

Bases: RuntimeError

Raised when an operation is attempted in an invalid state.

exception nasdaq_protocols.common.types.EndOfQueue

Bases: EOFError

Raised when the end of the queue is reached.

class nasdaq_protocols.common.types.TypeDefinition

Bases: object

Class to hold all type definitions

Parameters:
  • to_str (Callable[[Any], str]) – Function to convert value to string

  • from_str (Callable[[str], Any]) – Function to convert string to value

  • to_bytes (Callable[[Any], bytes]) – Function to convert value to bytes

  • from_bytes (Callable[[bytes], Tuple[int, Any]]) – Function to convert bytes to value

Returns:

str

Returns:

Any

Returns:

Tuple[int, bytes]

Returns:

Tuple[int, Any]

nasdaq_protocols.common.utils.logable(target)

decorator that adds a log object to the class.

async nasdaq_protocols.common.utils.stop_task(tasks)

Cancel a task and wait for it to finish

Parameters:

tasks (Task | Stoppable | list[Task | Stoppable])

Return type:

Task | Stoppable | None

async nasdaq_protocols.common.utils.start_server(remote, session_factory, spin_timeout=0.001, *, name='server:serve', **kwargs)

start a socket server and return the server and the task that runs it

Return type:

Tuple[Server, Task]

class nasdaq_protocols.common.message.types.Boolean

Bases: TypeDefinition

type_cls

alias of bool

class nasdaq_protocols.common.message.types.Int

Bases: TypeDefinition

to_str

alias of str

from_str

alias of int

type_cls

alias of int

class nasdaq_protocols.common.message.types.IntBE

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedInt

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedIntBE

Bases: UnsignedInt

class nasdaq_protocols.common.message.types.Byte

Bases: Int

class nasdaq_protocols.common.message.types.Short

Bases: Int

class nasdaq_protocols.common.message.types.ShortBE

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedShort

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedShortBE

Bases: Int

class nasdaq_protocols.common.message.types.Long

Bases: Int

class nasdaq_protocols.common.message.types.LongBE

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedLong

Bases: Int

class nasdaq_protocols.common.message.types.UnsignedLongBE

Bases: Int

class nasdaq_protocols.common.message.types.CharAscii

Bases: TypeDefinition

to_str

alias of str

from_str

alias of str

type_cls

alias of str

class nasdaq_protocols.common.message.types.CharIso8599

Bases: CharAscii

class nasdaq_protocols.common.message.types.AsciiString

Bases: CharAscii

Parameters:
  • to_bytes (Callable[[str], Tuple[int, bytes]]) – encodes the string to bytes and prepends the 2 digit size of the string

  • from_bytes (Callable[[bytes], Tuple[int, str]]) – decodes the bytes to string and returns the size of the string and the string

class nasdaq_protocols.common.message.types.Iso8859String

Bases: CharAscii

class nasdaq_protocols.common.message.types.FixedAsciiString(length, right_justified=False)

Bases: TypeDefinition

to_str

alias of str

from_str

alias of str

type_cls

alias of str

class nasdaq_protocols.common.message.types.FixedIsoString(length, right_justified=False)

Bases: TypeDefinition

to_str

alias of str

from_str

alias of str

type_cls

alias of str