import abc
import os
import socket
import sys
import pathlib
from multiprocessing import Process, Queue, set_start_method
from loguru import logger
from typing import Optional, Tuple, Dict, Callable, Any, Type, Union
from src.beemon.configuration import setup_config, Config
from src.beemon.orchestrator import Orchestrator
IPAddress = Tuple[str, int]
"""
Tuple[str, int]: An IPAddress takes the form ('127.0.0.1', 8080) where the first element is the IP Address and the
second element is the port to connect to.
"""
__author__ = 'Chris Campell <[email protected]>; Alek Ratzloff <[email protected]>'
[docs]
class Server(abc.ABC):
[docs]
def __init__(self, host_address: Optional[str] = '127.0.0.1',
listening_port: Optional[int] = 8080, socket_timeout_seconds: Optional[int] = 3,
receive_size_bytes: Optional[int] = 1024):
"""
Represents a bare-bones/basic Server in Python. The Server class (when instantiated) will enter into an infinite
loop awaiting incoming client connections. In order to stay available for inbound client requests, the
:class:`~server.Server` class leverages a single instance of the :class:`~orchestrator.Orchestrator`
:class:`multiprocessing.Process` to execute the received commands. All commands received by the server are
pushed onto a :class:`multiprocessing.Queue` where they are then ingested by the
:class:`orchestrator.Orchestrator`.
Args:
host_address (Optional[str]): The public IP address of this Server which incoming clients will connect to.
listening_port (Optional[int]): The network port which is open on this Server for incoming requests.
socket_timeout_seconds (Optional[int]): The number of seconds which will be allowed to elapse without a
response from the client, before the opened socket is declared to be in a timeout state.
receive_size_bytes (Optional[int]): The size of the Server's buffer for incoming client packets (in bytes).
"""
self._socket: socket = None
self._host_address: str = host_address
self._listening_port: int = listening_port
self._socket_timeout_seconds: int = socket_timeout_seconds
self._receive_size_bytes: int = receive_size_bytes
self._is_listening: bool = False
self._command_method_hooks: Dict[str, Callable] = {}
self._command_queue: Queue = Queue()
self._orchestrator_process: Optional[Process] = Orchestrator(command_queue=self._command_queue)
[docs]
def start(self) -> None:
"""
Starts the server process by opening a socket for incoming client requests, binding to the specified IP address
and listening port.
Raises:
OSError: This method raises an :exc:`OSError` in the event that the specified host address and listening
port cannot be bound to.
"""
self._socket = socket.socket()
self._socket.settimeout(self._socket_timeout_seconds)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._socket.bind((self._host_address, self._listening_port))
self._socket.listen(1)
self._is_listening = True
except OSError as err:
logger.critical(f'Server could not bind to {self._host_address}:{self._listening_port}: {err}')
raise err
self._accept_loop()
[docs]
def _accept_loop(self):
"""
Private helper method that accepts incoming clients in an infinite loop. This method ignores socket timeouts
as they are quite common for our use case. When a new connection is established, the
:func:`server.Server.on_accept` method is invoked.
"""
logger.info("Starting server accept loop")
while self.is_listening:
# noinspection PyBroadException
try:
socket_connection, ip_address = self._socket.accept()
logger.debug(f'Established a connection to {ip_address}')
self.on_accept(socket_connection, ip_address)
except socket.timeout as err:
pass
except socket.error as err:
logger.error(f'Error during connection attempt: {err}')
logger.debug("Accept loop ended")
[docs]
def stop(self):
"""
Shuts down the server's infinite loop which listens for incoming client connection requests.
"""
logger.info('Stopping sever accept loop')
self._is_listening = False
# .. todo:: This method should also ensure that the orchestrator process was halted gracefully, and if not
# force kill it.
[docs]
def on_accept(self, socket_connection: socket.socket, ip_address: IPAddress) -> None:
"""
Handles what happens when a new connection is accepted by the Server. Generally this involves waiting for an
incoming connection and then reading from the buffer the specified amount of bytes. The raw (encoded in binary)
message which is read from the socket is sent to the server's :func:`on_receive` method.
Args:
socket_connection (socket.socket): A :class:`socket.socket` object which encapsulates the connection between
the server process and the client.
ip_address (IPAddress): A size-two tuple containing the server's host address and port to which the current
client is connected to.
Returns:
(Tuple[socket.socket, IPAddress]): A size-2 tuple containing:
- *socket* (**socket**):
- *IPAddress* (**IPAddress**):
"""
# Receive the message:
message = socket_connection.recv(self._receive_size_bytes)
logger.debug('Received message. Now handling...')
# handle the message:
self.on_receive(message=message, socket_connection=socket_connection, ip_address=ip_address)
# close the connection:
socket_connection.close()
# raise NotImplementedError(f'Concrete subclasses of {__class__} must implement {__name__}')
[docs]
def on_receive(self, message: bytes, socket_connection: socket.socket, ip_address: IPAddress) -> None:
"""
Handles the receipt of an encoded/binary message from a client of the server. Upon receiving a message from a
client, the server will decode the message into UTF-8 and then parse the received command. If the command is a
known command for the server, an affirmative response will be issued back to the client process. Otherwise an
error message will be dispatched in response to the client. All recognized commands are sent decoded (verbatim)
to the :class:`orchestrator.Orchestrator` process via an IPC channel (:class:`multiprocessing.Queue`).
Notes:
The :class:`orchestrator.Orchestrator` performs all operations associated with the received command. The
:class:`server.Server` class performs no executing of commands.
Args:
message (bytes): The raw/binary encoded message received from the client.
socket_connection (socket.socket): An instantiated :class:`socket.socket` object which sent the received
message.
ip_address (IPAddress): A size-two tuple containing both: the IP address (and port number for the IP
address) which dispatched the received message.
"""
# If there is no received data, there is nothing to do:
if len(message) == 0:
return
"""
Parse the input command:
"""
message = message.decode('utf-8')
split_message = message.split()
# get the first word of the message, and use that to figure out what the command was
command = split_message[0]
logger.info(f'Command received: {message}')
recognized_server_commands = {'help', 'start', 'stop'}
if command in recognized_server_commands:
result_message = b"OK...Running command"
logger.info('Command result: OK')
#This should be implemented in a better way at some point
if command == 'help':
result_message = b"OK...Running command" \
b'\nstart: Instructs the bmon server to begin functions and enter its \n' \
b'main recording loop' \
b'\nstart <sensor>: Instructs the targeted sensor to begin functions and enter ' \
b'its main recording loop\n' \
b'stop: Kills the process for the bmon server. No more commands will be able to \n' \
b'be issued\n' \
b'stop <sensor>: Instructs the targeted sensor to halt if not currently \n' \
b'recording. This will first purge the multiprocessing queue, then abort the \n' \
b'scheduled recording operation at the scheduled time, terminating.\n' \
b'help: prints a list of the valid commands'
socket_connection.send(result_message)
if len(split_message) == 1:
# No command postfix.
if command == 'stop':
# Command to halt the server.
self._is_listening = False
else:
# This was a "start <sensor>", "stop <sensor>", or "help <sensor>" command.
pass
else:
# Not a root-level command for the Server to process, delegate to concrete subclasses method hooks to
# process:
if command in self._command_method_hooks.keys():
result_message = b"OK...Running command"
logger.info('Command result: OK')
socket_connection.send(result_message)
self._command_method_hooks[command]()
else:
result_message = b'Bad command'
logger.info('Command result: BAD Command')
socket_connection.send(result_message)
# Add the command to the multiprocessing queue that the single orchestrator process reads from.
self._command_queue.put_nowait(message)
@property
def socket(self) -> socket:
return self._socket
@property
def host_address(self) -> str:
return self._host_address
@property
def listening_port(self) -> int:
return self._listening_port
@property
def socket_timeout_seconds(self) -> int:
return self._socket_timeout_seconds
@property
def is_listening(self) -> bool:
return self._is_listening
@property
def receive_size_bytes(self) -> int:
return self._receive_size_bytes
@property
def command_method_hooks(self) -> Dict[str, Callable]:
return self._command_method_hooks
class BeemonServer(Server):
__version = '3.0.0-beta'
def __init__(self, host_address: Optional[str] = '127.0.0.1',
listening_port: Optional[int] = 65000):
super().__init__(host_address=host_address, listening_port=listening_port)
logger.info(
"""
===============================================================================
= Starting Beemon server %s
===============================================================================
""" % self.__version)
# self._sensors: Dict[str, Sensor] = sensors
# Register command-method hooks from all sensors with the server:
self._command_method_hooks: Dict[str, Callable] = {}
# for sensor_name, sensor in self._sensors.items():
# sensor_commands_and_method_hooks: Dict[str, Callable] = sensor.command_method_hooks()
# self._command_method_hooks.update(sensor_commands_and_method_hooks)
# def start(self):
# super().start()
# # .. todo: Spin up threads for all sensors in the collection whose autostart property is set to true.
# for sensor_name, sensor in self.sensors.items():
# sensor.run()
def main():
# tail: bool = args.tail
# if tail:
# logger.info('Tailing the process...')
# raise NotImplementedError("Tail/hang functionality not yet implemented.")
# TODO : get configuration path from parsing arguments
repository_root_path = pathlib.Path(__file__).parent.resolve()
# repository_root_path = os.path.abspath(os.path.join(os.path.abspath(__file__), '../'))
relative_config_file_path = os.path.abspath(os.path.join(repository_root_path, '../../beemon-config.ini'))
if setup_config(relative_config_file_path) is None:
# error message is printed in the setup_config method; it's the only place that makes any sense
sys.exit(1)
# Parse config file:
# config = Config()
# local_output_directory: str = config.glob('local_output_directory')
# capture_duration_seconds: int = int(config.glob('capture_duration_seconds'))
# raw_capture_window_start_time_military_format: str = str(
# config.glob('capture_window_start_time')) # 24 hour military time HHMM format.
# capture_window_start_datetime_edt: datetime = datetimeutils.military_time_to_edt_datetime_aware(
# military_time=raw_capture_window_start_time_military_format
# )
# raw_capture_window_end_time_military_format: str = str(
# config.glob('capture_window_end_time')) # 24 hour military time HHMM format.
# capture_window_end_datetime_edt: datetime = datetimeutils.military_time_to_edt_datetime_aware(
# military_time=raw_capture_window_end_time_military_format
# )
# Instantiate sensors:
# audio_sensor: Sensor = Audio(
# config=config,
# local_output_directory=os.path.join(local_output_directory, 'audio'),
# capture_duration_seconds=capture_duration_seconds,
# capture_window_start_time=capture_window_start_datetime_edt,
# capture_window_end_time=capture_window_end_datetime_edt
# )
# sensors['audio'] = audio_sensor
beemon_server = BeemonServer(host_address='127.0.0.1', listening_port=65000)
beemon_server.start()
if __name__ == '__main__':
program_executable_name = sys.argv[0]
"""
Notes:
Beemon *must* be run as root in order for the program to move files from /tmp/ to /home/bee/bee_tmp/. If files
are not moved to this location, the uploader process will not find them. Hence we exit if the user is detected
to be a non-root user.
"""
# Run checks for root permissions to move files:
if os.getuid() != 0:
logger.critical(f"{program_executable_name} must be run as root")
sys.exit(1)
# Store parsed vars:
# parser = argparse.ArgumentParser(description='Beemon server argument parser.')
# parser.add_argument('-t', '--tail', action='store_true', dest='tail', required=False, default=False,
# help='Whether or not to tail the process.')
#command_line_args = parser.parse_args()
# Set the :module:`multiprocessing.Process` start method type to 'fork':
set_start_method('fork')
main()