Source code for server

import abc
import os
import socket
import sys
import pathlib
from multiprocessing import Process, Queue, set_start_method
from loguru import logger
from typing import Optional, Tuple, Dict, Callable, Any, Type, Union
from src.beemon.configuration import setup_config, Config
from src.beemon.orchestrator import Orchestrator

IPAddress = Tuple[str, int]
"""
Tuple[str, int]: An IPAddress takes the form ('127.0.0.1', 8080) where the first element is the IP Address and the
second element is the port to connect to.
"""

__author__ = 'Chris Campell <[email protected]>; Alek Ratzloff <[email protected]>'


[docs] class Server(abc.ABC):
[docs] def __init__(self, host_address: Optional[str] = '127.0.0.1', listening_port: Optional[int] = 8080, socket_timeout_seconds: Optional[int] = 3, receive_size_bytes: Optional[int] = 1024): """ Represents a bare-bones/basic Server in Python. The Server class (when instantiated) will enter into an infinite loop awaiting incoming client connections. In order to stay available for inbound client requests, the :class:`~server.Server` class leverages a single instance of the :class:`~orchestrator.Orchestrator` :class:`multiprocessing.Process` to execute the received commands. All commands received by the server are pushed onto a :class:`multiprocessing.Queue` where they are then ingested by the :class:`orchestrator.Orchestrator`. Args: host_address (Optional[str]): The public IP address of this Server which incoming clients will connect to. listening_port (Optional[int]): The network port which is open on this Server for incoming requests. socket_timeout_seconds (Optional[int]): The number of seconds which will be allowed to elapse without a response from the client, before the opened socket is declared to be in a timeout state. receive_size_bytes (Optional[int]): The size of the Server's buffer for incoming client packets (in bytes). """ self._socket: socket = None self._host_address: str = host_address self._listening_port: int = listening_port self._socket_timeout_seconds: int = socket_timeout_seconds self._receive_size_bytes: int = receive_size_bytes self._is_listening: bool = False self._command_method_hooks: Dict[str, Callable] = {} self._command_queue: Queue = Queue() self._orchestrator_process: Optional[Process] = Orchestrator(command_queue=self._command_queue)
[docs] def start(self) -> None: """ Starts the server process by opening a socket for incoming client requests, binding to the specified IP address and listening port. Raises: OSError: This method raises an :exc:`OSError` in the event that the specified host address and listening port cannot be bound to. """ self._socket = socket.socket() self._socket.settimeout(self._socket_timeout_seconds) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self._socket.bind((self._host_address, self._listening_port)) self._socket.listen(1) self._is_listening = True except OSError as err: logger.critical(f'Server could not bind to {self._host_address}:{self._listening_port}: {err}') raise err self._accept_loop()
[docs] def _accept_loop(self): """ Private helper method that accepts incoming clients in an infinite loop. This method ignores socket timeouts as they are quite common for our use case. When a new connection is established, the :func:`server.Server.on_accept` method is invoked. """ logger.info("Starting server accept loop") while self.is_listening: # noinspection PyBroadException try: socket_connection, ip_address = self._socket.accept() logger.debug(f'Established a connection to {ip_address}') self.on_accept(socket_connection, ip_address) except socket.timeout as err: pass except socket.error as err: logger.error(f'Error during connection attempt: {err}') logger.debug("Accept loop ended")
[docs] def stop(self): """ Shuts down the server's infinite loop which listens for incoming client connection requests. """ logger.info('Stopping sever accept loop') self._is_listening = False
# .. todo:: This method should also ensure that the orchestrator process was halted gracefully, and if not # force kill it.
[docs] def on_accept(self, socket_connection: socket.socket, ip_address: IPAddress) -> None: """ Handles what happens when a new connection is accepted by the Server. Generally this involves waiting for an incoming connection and then reading from the buffer the specified amount of bytes. The raw (encoded in binary) message which is read from the socket is sent to the server's :func:`on_receive` method. Args: socket_connection (socket.socket): A :class:`socket.socket` object which encapsulates the connection between the server process and the client. ip_address (IPAddress): A size-two tuple containing the server's host address and port to which the current client is connected to. Returns: (Tuple[socket.socket, IPAddress]): A size-2 tuple containing: - *socket* (**socket**): - *IPAddress* (**IPAddress**): """ # Receive the message: message = socket_connection.recv(self._receive_size_bytes) logger.debug('Received message. Now handling...') # handle the message: self.on_receive(message=message, socket_connection=socket_connection, ip_address=ip_address) # close the connection: socket_connection.close()
# raise NotImplementedError(f'Concrete subclasses of {__class__} must implement {__name__}')
[docs] def on_receive(self, message: bytes, socket_connection: socket.socket, ip_address: IPAddress) -> None: """ Handles the receipt of an encoded/binary message from a client of the server. Upon receiving a message from a client, the server will decode the message into UTF-8 and then parse the received command. If the command is a known command for the server, an affirmative response will be issued back to the client process. Otherwise an error message will be dispatched in response to the client. All recognized commands are sent decoded (verbatim) to the :class:`orchestrator.Orchestrator` process via an IPC channel (:class:`multiprocessing.Queue`). Notes: The :class:`orchestrator.Orchestrator` performs all operations associated with the received command. The :class:`server.Server` class performs no executing of commands. Args: message (bytes): The raw/binary encoded message received from the client. socket_connection (socket.socket): An instantiated :class:`socket.socket` object which sent the received message. ip_address (IPAddress): A size-two tuple containing both: the IP address (and port number for the IP address) which dispatched the received message. """ # If there is no received data, there is nothing to do: if len(message) == 0: return """ Parse the input command: """ message = message.decode('utf-8') split_message = message.split() # get the first word of the message, and use that to figure out what the command was command = split_message[0] logger.info(f'Command received: {message}') recognized_server_commands = {'help', 'start', 'stop'} if command in recognized_server_commands: result_message = b"OK...Running command" logger.info('Command result: OK') #This should be implemented in a better way at some point if command == 'help': result_message = b"OK...Running command" \ b'\nstart: Instructs the bmon server to begin functions and enter its \n' \ b'main recording loop' \ b'\nstart <sensor>: Instructs the targeted sensor to begin functions and enter ' \ b'its main recording loop\n' \ b'stop: Kills the process for the bmon server. No more commands will be able to \n' \ b'be issued\n' \ b'stop <sensor>: Instructs the targeted sensor to halt if not currently \n' \ b'recording. This will first purge the multiprocessing queue, then abort the \n' \ b'scheduled recording operation at the scheduled time, terminating.\n' \ b'help: prints a list of the valid commands' socket_connection.send(result_message) if len(split_message) == 1: # No command postfix. if command == 'stop': # Command to halt the server. self._is_listening = False else: # This was a "start <sensor>", "stop <sensor>", or "help <sensor>" command. pass else: # Not a root-level command for the Server to process, delegate to concrete subclasses method hooks to # process: if command in self._command_method_hooks.keys(): result_message = b"OK...Running command" logger.info('Command result: OK') socket_connection.send(result_message) self._command_method_hooks[command]() else: result_message = b'Bad command' logger.info('Command result: BAD Command') socket_connection.send(result_message) # Add the command to the multiprocessing queue that the single orchestrator process reads from. self._command_queue.put_nowait(message)
@property def socket(self) -> socket: return self._socket @property def host_address(self) -> str: return self._host_address @property def listening_port(self) -> int: return self._listening_port @property def socket_timeout_seconds(self) -> int: return self._socket_timeout_seconds @property def is_listening(self) -> bool: return self._is_listening @property def receive_size_bytes(self) -> int: return self._receive_size_bytes @property def command_method_hooks(self) -> Dict[str, Callable]: return self._command_method_hooks
class BeemonServer(Server): __version = '3.0.0-beta' def __init__(self, host_address: Optional[str] = '127.0.0.1', listening_port: Optional[int] = 65000): super().__init__(host_address=host_address, listening_port=listening_port) logger.info( """ =============================================================================== = Starting Beemon server %s =============================================================================== """ % self.__version) # self._sensors: Dict[str, Sensor] = sensors # Register command-method hooks from all sensors with the server: self._command_method_hooks: Dict[str, Callable] = {} # for sensor_name, sensor in self._sensors.items(): # sensor_commands_and_method_hooks: Dict[str, Callable] = sensor.command_method_hooks() # self._command_method_hooks.update(sensor_commands_and_method_hooks) # def start(self): # super().start() # # .. todo: Spin up threads for all sensors in the collection whose autostart property is set to true. # for sensor_name, sensor in self.sensors.items(): # sensor.run() def main(): # tail: bool = args.tail # if tail: # logger.info('Tailing the process...') # raise NotImplementedError("Tail/hang functionality not yet implemented.") # TODO : get configuration path from parsing arguments repository_root_path = pathlib.Path(__file__).parent.resolve() # repository_root_path = os.path.abspath(os.path.join(os.path.abspath(__file__), '../')) relative_config_file_path = os.path.abspath(os.path.join(repository_root_path, '../../beemon-config.ini')) if setup_config(relative_config_file_path) is None: # error message is printed in the setup_config method; it's the only place that makes any sense sys.exit(1) # Parse config file: # config = Config() # local_output_directory: str = config.glob('local_output_directory') # capture_duration_seconds: int = int(config.glob('capture_duration_seconds')) # raw_capture_window_start_time_military_format: str = str( # config.glob('capture_window_start_time')) # 24 hour military time HHMM format. # capture_window_start_datetime_edt: datetime = datetimeutils.military_time_to_edt_datetime_aware( # military_time=raw_capture_window_start_time_military_format # ) # raw_capture_window_end_time_military_format: str = str( # config.glob('capture_window_end_time')) # 24 hour military time HHMM format. # capture_window_end_datetime_edt: datetime = datetimeutils.military_time_to_edt_datetime_aware( # military_time=raw_capture_window_end_time_military_format # ) # Instantiate sensors: # audio_sensor: Sensor = Audio( # config=config, # local_output_directory=os.path.join(local_output_directory, 'audio'), # capture_duration_seconds=capture_duration_seconds, # capture_window_start_time=capture_window_start_datetime_edt, # capture_window_end_time=capture_window_end_datetime_edt # ) # sensors['audio'] = audio_sensor beemon_server = BeemonServer(host_address='127.0.0.1', listening_port=65000) beemon_server.start() if __name__ == '__main__': program_executable_name = sys.argv[0] """ Notes: Beemon *must* be run as root in order for the program to move files from /tmp/ to /home/bee/bee_tmp/. If files are not moved to this location, the uploader process will not find them. Hence we exit if the user is detected to be a non-root user. """ # Run checks for root permissions to move files: if os.getuid() != 0: logger.critical(f"{program_executable_name} must be run as root") sys.exit(1) # Store parsed vars: # parser = argparse.ArgumentParser(description='Beemon server argument parser.') # parser.add_argument('-t', '--tail', action='store_true', dest='tail', required=False, default=False, # help='Whether or not to tail the process.') #command_line_args = parser.parse_args() # Set the :module:`multiprocessing.Process` start method type to 'fork': set_start_method('fork') main()