Common api reference¶
- class nasdaq_protocols.common.message_queue.DispatchableMessageQueue(session_id, on_msg_coro=None)¶
Bases:
Stoppable
A message queue that dispatches messages to a coro.
- Parameters:
session_id (Any)
on_msg_coro (Callable[[Any], Awaitable[None]])
- async put(msg)¶
put an entry into the queue. :param msg: Any
- Parameters:
msg (Any)
- Return type:
None
- async get()¶
get an entry from the queue.
if the underlying queue contains entries, the first element is fetched and returned, if the queue contains no entries, a coro is awaited, which will wait until queue contains an entry.
- Raises:
StateError - if the dispatcher is already in progress EndOfQueue - if queue is stopped the underlying queue contains no entries.
- put_nowait(msg)¶
put an entry into the queue. :param msg: Any
- Parameters:
msg (Any)
- get_nowait()¶
get an entry from the queue. This is a non-blocking call. :return: entry from the queue or None if queue is empty.
- Return type:
Any | None
- pause_dispatching()¶
This is a context manager that pauses the dispatcher:
queue = DispatchableMessageQueue(session_id, on_msg_coro) queue.get() # will raise an exception async with queue.pause_dispatching(): queue.get() # will not raise an exception
- is_dispatching()¶
Check is message queue is actively dispatching.
- Return type:
bool
- start_dispatching(on_msg_coro)¶
Start dispatching messages from the queue to the coro.
- Parameters:
on_msg_coro (Callable[[Any], Awaitable[None]])
- Return type:
None
- async stop()¶
Stop the queue.
- Return type:
None
- is_stopped()¶
- Returns:
True if the queue is stopped.
- Return type:
bool
- class nasdaq_protocols.common.session.HeartbeatMonitor(session_id, interval, on_no_activity_coro, *, stop_when_no_activity=True, tolerate_missed_heartbeats=1, name='monitor')¶
Bases:
Stoppable
Monitor that trips the on_no_activity_coro if no activity is detected.
Currently, activity is externally signalled by calling the ping method.
- Parameters:
session_id (Any) – The session id.
interval (float) – interval in seconds at which the monitor checks for activity.
on_no_activity_coro (Callable[[], Coroutine]) – coroutine to be called when no activity is detected.
stop_when_no_activity (bool) – If True, the monitor stops when no activity is detected.
tolerate_missed_heartbeats (int) – number of missed heartbeats to tolerate.
name (str)
- ping()¶
Ping the monitor.
- Return type:
None
- is_running()¶
Returns True if the monitor is running.
- Return type:
bool
- async stop()¶
Stop the monitor.
- Return type:
None
- class nasdaq_protocols.common.session.Reader(session_id, on_msg_coro, on_close_coro)¶
Bases:
Stoppable
Abstract Base class for readers.
A reader is responsible for parsing the received data from the transport and dispatching it to the on_msg_coro.
- Parameters:
session_id (Any) – The session id.
on_msg_coro (Callable[[Any], Coroutine]) – coroutine to be called for every message parsed.
on_close_coro (Callable[[], Coroutine]) – coroutine to be called when the reader detects end of session.
- abstract async on_data(data)¶
Called when data is received from the transport.
- Parameters:
data (bytes)
- class nasdaq_protocols.common.session.AsyncSession(*, session_id, reader_factory, on_msg_coro=None, on_close_coro=None, dispatch_on_connect=True)¶
Bases:
Protocol
,ABC
,Generic
[T
]Abstract base class for async sessions.
Once the transport is available, the session creates a new reader using the reader_factory and starts parsing the incoming bytes.
By default, the session starts in a dispatching mode, meaning the incoming messages are dispatched to the on_msg_coro. This can be changed by setting dispatch_on_connect=False.
- Parameters:
session_id (SessionId) – The session id.
reader_factory (Callable[[Any, Callable[[Any], Coroutine], Callable[[], Coroutine]], Reader]) – A callable that returns a reader.
on_msg_coro (Callable[[Any], Coroutine]) – coroutine to be called when a message is received.
on_close_coro (Callable[[], Coroutine]) – coroutine to be called when the session is closed.
dispatch_on_connect (bool) – If True, the session starts with dispatching once connected.
- async receive_msg()¶
Receive a message from the peer. This is a blocking call. This call blocks until a new message is available.
If the session is dispatching messages, then this call raises an exception.
- Return Any:
The message received.
- Return type:
Type[T]
- receive_msg_nowait()¶
Receive a message from the peer. This is a non-blocking call.
- Return Any:
The message received.
- Return type:
Type[T] | None
- is_active()¶
Returns True if the session is not closed or in closing state.
- Return type:
bool
- is_closed()¶
Returns True if the session is closed. :return:
- Return type:
bool
- initiate_close()¶
Initiate close of the session. An asynchronous task is created which will close the session and all its associates.
Poll is_closed to check if the session is closed or use the on_close_coro callback to be notified when the session is closed.
- Return type:
None
- async close()¶
Close the session, the session cannot be used after this call.
- start_heartbeats(local_hb_interval, remote_hb_interval)¶
Starts the heartbeats for the session.
if the remote failed heartbeats, then the session is closed.
if the local heartbeat timer expires, then send_heartbeat is called.
- Parameters:
local_hb_interval (int | float)
remote_hb_interval (int | float)
- start_dispatching()¶
By default, the session starts with dispatching switched-on.
If, the session is created with dispatching-off, then at any point in time during the lifetime of this session, dispatching can be switched-on by calling this method.
- abstract send_msg(msg)¶
Send a message to the peer. :param msg: Any message that is serializable.
- Parameters:
msg (Serializable[T])
- Return type:
None
- abstract async send_heartbeat()¶
Callback to send a heartbeat to the peer. :meta private:
- class nasdaq_protocols.common.session.SessionId(host='nohost', port=0)¶
Bases:
object
A basic session id.
- Parameters:
host (str)
port (int)
- set_transport(transport)¶
Once the transport is available, the host and port are updated.
- Parameters:
transport (Transport)
- Return type:
None
- class nasdaq_protocols.common.types.Serializable¶
Bases:
ABC
,Generic
[T
]Abstract Base class for serializable objects.
- abstract to_bytes()¶
pack the object to binary format.
- Return type:
tuple[int, bytes]
- abstract classmethod from_bytes(bytes_)¶
unpack the object from binary format.
- Parameters:
bytes_ (bytes)
- Return type:
tuple[int, Type[T]]
- class nasdaq_protocols.common.types.Stoppable¶
Bases:
ABC
- exception nasdaq_protocols.common.types.StateError¶
Bases:
RuntimeError
Raised when an operation is attempted in an invalid state.
- exception nasdaq_protocols.common.types.EndOfQueue¶
Bases:
EOFError
Raised when the end of the queue is reached.
- class nasdaq_protocols.common.types.TypeDefinition¶
Bases:
object
Class to hold all type definitions
- Parameters:
to_str (Callable[[Any], str]) – Function to convert value to string
from_str (Callable[[str], Any]) – Function to convert string to value
to_bytes (Callable[[Any], bytes]) – Function to convert value to bytes
from_bytes (Callable[[bytes], Tuple[int, Any]]) – Function to convert bytes to value
- Returns:
str
- Returns:
Any
- Returns:
Tuple[int, bytes]
- Returns:
Tuple[int, Any]
- nasdaq_protocols.common.utils.logable(target)¶
decorator that adds a log object to the class.
- async nasdaq_protocols.common.utils.stop_task(tasks)¶
Cancel a task and wait for it to finish
- async nasdaq_protocols.common.utils.start_server(remote, session_factory, spin_timeout=0.001, *, name='server:serve', **kwargs)¶
start a socket server and return the server and the task that runs it
- Return type:
Tuple[Server, Task]
- class nasdaq_protocols.common.message.types.Boolean¶
Bases:
TypeDefinition
- type_cls¶
alias of
bool
- class nasdaq_protocols.common.message.types.Int¶
Bases:
TypeDefinition
- to_str¶
alias of
str
- from_str¶
alias of
int
- type_cls¶
alias of
int
- class nasdaq_protocols.common.message.types.UnsignedIntBE¶
Bases:
UnsignedInt
- class nasdaq_protocols.common.message.types.CharAscii¶
Bases:
TypeDefinition
- to_str¶
alias of
str
- from_str¶
alias of
str
- type_cls¶
alias of
str
- class nasdaq_protocols.common.message.types.AsciiString¶
Bases:
CharAscii
- Parameters:
to_bytes (Callable[[str], Tuple[int, bytes]]) – encodes the string to bytes and prepends the 2 digit size of the string
from_bytes (Callable[[bytes], Tuple[int, str]]) – decodes the bytes to string and returns the size of the string and the string
- class nasdaq_protocols.common.message.types.FixedAsciiString(length, right_justified=False)¶
Bases:
TypeDefinition
- to_str¶
alias of
str
- from_str¶
alias of
str
- type_cls¶
alias of
str
- class nasdaq_protocols.common.message.types.FixedIsoString(length, right_justified=False)¶
Bases:
TypeDefinition
- to_str¶
alias of
str
- from_str¶
alias of
str
- type_cls¶
alias of
str