Source code for nucypher.config.storages

"""
This file is part of nucypher.

nucypher is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

nucypher is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with nucypher.  If not, see <https://www.gnu.org/licenses/>.
"""

import binascii
import os
import sqlite3
import tempfile
from abc import abstractmethod, ABC
from typing import Callable, Tuple, Union, Set, Any

import OpenSSL
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import Certificate, NameOID
from eth_utils import is_checksum_address
from twisted.logger import Logger

from nucypher.blockchain.eth.decorators import validate_checksum_address
from nucypher.blockchain.eth.registry import BaseContractRegistry
from nucypher.config.constants import DEFAULT_CONFIG_ROOT
from nucypher.crypto.api import read_certificate_pseudonym


[docs]class NodeStorage(ABC): _name = NotImplemented _TYPE_LABEL = 'storage_type' NODE_SERIALIZER = binascii.hexlify NODE_DESERIALIZER = binascii.unhexlify TLS_CERTIFICATE_ENCODING = Encoding.PEM TLS_CERTIFICATE_EXTENSION = '.{}'.format(TLS_CERTIFICATE_ENCODING.name.lower())
[docs] class NodeStorageError(Exception): pass
[docs] class UnknownNode(NodeStorageError): pass
def __init__(self, federated_only: bool, # TODO# 466 character_class=None, serializer: Callable = NODE_SERIALIZER, deserializer: Callable = NODE_DESERIALIZER, registry: BaseContractRegistry = None, ) -> None: from nucypher.characters.lawful import Ursula self.log = Logger(self.__class__.__name__) self.registry = registry self.serializer = serializer self.deserializer = deserializer self.federated_only = federated_only self.character_class = character_class or Ursula def __getitem__(self, item): return self.get(checksum_address=item, federated_only=self.federated_only) def __setitem__(self, key, value): return self.store_node_metadata(node=value) def __delitem__(self, key): self.remove(checksum_address=key) def __iter__(self): return self.all(federated_only=self.federated_only) def _read_common_name(self, certificate: Certificate): x509 = OpenSSL.crypto.X509.from_cryptography(certificate) subject_components = x509.get_subject().get_components() common_name_as_bytes = subject_components[0][1] common_name_from_cert = common_name_as_bytes.decode() return common_name_from_cert def _write_tls_certificate(self, certificate: Certificate, host: str = None, force: bool = True) -> str: # Read x509 = OpenSSL.crypto.X509.from_cryptography(certificate) subject_components = x509.get_subject().get_components() common_name_as_bytes = subject_components[0][1] common_name_on_certificate = common_name_as_bytes.decode() if not host: host = common_name_on_certificate pseudonym = certificate.subject.get_attributes_for_oid(NameOID.PSEUDONYM)[0] checksum_address = pseudonym.value if not is_checksum_address(checksum_address): # TODO: more? raise RuntimeError("Invalid certificate checksum address encountered: {}".format(checksum_address)) # Validate # TODO: It's better for us to have checked this a while ago so that this situation is impossible. #443 if host and (host != common_name_on_certificate): raise ValueError( f"You passed a hostname ('{host}') that does not match the certificate's common name.") certificate_filepath = self.generate_certificate_filepath(checksum_address=checksum_address) certificate_already_exists = os.path.isfile(certificate_filepath) if force is False and certificate_already_exists: raise FileExistsError('A TLS certificate already exists at {}.'.format(certificate_filepath)) # Write os.makedirs(os.path.dirname(certificate_filepath), exist_ok=True) with open(certificate_filepath, 'wb') as certificate_file: public_pem_bytes = certificate.public_bytes(self.TLS_CERTIFICATE_ENCODING) certificate_file.write(public_pem_bytes) self.log.debug(f"Saved TLS certificate for {checksum_address}: {certificate_filepath}") return certificate_filepath @abstractmethod def store_node_certificate(self, certificate: Certificate) -> str: raise NotImplementedError
[docs] @abstractmethod def store_node_metadata(self, node, filepath: str = None) -> str: """Save a single node's metadata and tls certificate""" raise NotImplementedError
@abstractmethod def generate_certificate_filepath(self, checksum_address: str) -> str: raise NotImplementedError @abstractmethod def payload(self) -> dict: raise NotImplementedError
[docs] @classmethod @abstractmethod def from_payload(self, data: dict, *args, **kwargs) -> 'NodeStorage': """Instantiate a storage object from a dictionary""" raise NotImplementedError
[docs] @abstractmethod def initialize(self): """One-time initialization steps to establish a node storage backend""" raise NotImplementedError
[docs] @abstractmethod def all(self, federated_only: bool, certificates_only: bool = False) -> set: """Return s set of all stored nodes""" raise NotImplementedError
[docs] @abstractmethod def get(self, checksum_address: str, federated_only: bool): """Retrieve a single stored node""" raise NotImplementedError
[docs] @abstractmethod def remove(self, checksum_address: str) -> bool: """Remove a single stored node""" raise NotImplementedError
[docs] @abstractmethod def clear(self) -> bool: """Remove all stored nodes""" raise NotImplementedError
[docs]class ForgetfulNodeStorage(NodeStorage): _name = ':memory:' __base_prefix = "nucypher-tmp-certs-" def __init__(self, parent_dir: str = None, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.__metadata = dict() # Certificates self.__certificates = dict() self.__temporary_certificates = list() self._temp_certificates_dir = tempfile.mkdtemp(prefix='nucypher-temp-certs-', dir=parent_dir) # TODO: Pending fix for 1554. # def __del__(self): # shutil.rmtree(self._temp_certificates_dir, ignore_errors=True)
[docs] def all(self, federated_only: bool, certificates_only: bool = False) -> set: return set(self.__metadata.values() if not certificates_only else self.__certificates.values())
[docs] @validate_checksum_address def get(self, federated_only: bool, host: str = None, checksum_address: str = None, certificate_only: bool = False): if not bool(checksum_address) ^ bool(host): message = "Either pass checksum_address or host; Not both. Got ({} {})".format(checksum_address, host) raise ValueError(message) if certificate_only is True: try: return self.__certificates[checksum_address or host] except KeyError: raise self.UnknownNode else: try: return self.__metadata[checksum_address or host] except KeyError: raise self.UnknownNode
def forget(self) -> bool: for temp_certificate in self.__temporary_certificates: os.remove(temp_certificate) return len(self.__temporary_certificates) == 0 def store_node_certificate(self, certificate: Certificate): checksum_address = read_certificate_pseudonym(certificate=certificate) self.__certificates[checksum_address] = certificate filepath = self._write_tls_certificate(certificate=certificate) return filepath
[docs] def store_node_metadata(self, node, filepath: str = None): self.__metadata[node.checksum_address] = node return self.__metadata[node.checksum_address]
@validate_checksum_address def generate_certificate_filepath(self, checksum_address: str) -> str: filename = '{}.pem'.format(checksum_address) filepath = os.path.join(self._temp_certificates_dir, filename) return filepath
[docs] @validate_checksum_address def remove(self, checksum_address: str, metadata: bool = True, certificate: bool = True ) -> Tuple[bool, str]: if metadata is True: del self.__metadata[checksum_address] if certificate is True: del self.__certificates[checksum_address] return True, checksum_address
[docs] def clear(self, metadata: bool = True, certificates: bool = True) -> None: """Forget all stored nodes and certificates""" if metadata is True: self.__metadata = dict() if certificates is True: self.__certificates = dict()
def payload(self) -> dict: payload = {self._TYPE_LABEL: self._name} return payload
[docs] @classmethod def from_payload(cls, payload: dict, *args, **kwargs) -> 'ForgetfulNodeStorage': """Alternate constructor to create a storage instance from JSON-like configuration""" if payload[cls._TYPE_LABEL] != cls._name: raise cls.NodeStorageError return cls(*args, **kwargs)
[docs] def initialize(self) -> bool: """Returns True if initialization was successful""" self.__metadata = dict() self.__certificates = dict() return not bool(self.__metadata or self.__certificates)
[docs]class SQLiteForgetfulNodeStorage(ForgetfulNodeStorage): """ SQLite forgetful storage of node metadata """ _name = 'sqlite' DB_FILE_NAME = 'nodes.sqlite' DEFAULT_DB_FILEPATH = os.path.join(DEFAULT_CONFIG_ROOT, DB_FILE_NAME) NODE_DB_NAME = 'node_info' NODE_DB_SCHEMA = [('staker_address', 'text primary key'), ('rest_url', 'text'), ('nickname', 'text'), ('timestamp', 'text'), ('last_seen', 'text'), ('fleet_state_icon', 'text')] def __init__(self, db_filepath: str = DEFAULT_DB_FILEPATH, *args, **kwargs): super().__init__(*args, **kwargs) self.db_filepath = db_filepath self.db_conn = sqlite3.connect(self.db_filepath) self.init_db_tables() def __del__(self): try: self.db_conn.close() finally: if os.path.exists(self.db_filepath): os.remove(self.db_filepath)
[docs] def store_node_metadata(self, node, filepath: str = None): self.__write_node_metadata(node) return super().store_node_metadata(node=node, filepath=filepath)
[docs] @validate_checksum_address def remove(self, checksum_address: str, metadata: bool = True, certificate: bool = True ) -> Tuple[bool, str]: if metadata is True: with self.db_conn: self.db_conn.execute(f"DELETE FROM {self.NODE_DB_NAME} WHERE staker_address='{checksum_address}'") return super().remove(checksum_address=checksum_address, metadata=metadata, certificate=certificate)
[docs] def clear(self, metadata: bool = True, certificates: bool = True) -> None: if metadata is True: with self.db_conn: self.db_conn.execute(f"DELETE FROM {self.NODE_DB_NAME}") super().clear(metadata=metadata, certificates=certificates)
[docs] def initialize(self) -> bool: if os.path.exists(self.db_filepath): os.remove(self.db_filepath) self.db_conn = sqlite3.connect(self.db_filepath) self.init_db_tables() return super().initialize()
def init_db_tables(self): with self.db_conn: # ensure tables are empty self.db_conn.execute(f"DROP TABLE IF EXISTS {self.NODE_DB_NAME}") # create fresh new node table (same column names as FleetStateTracker.abridged_nodes_details) node_db_schema = ", ".join(f"{schema[0]} {schema[1]}" for schema in self.NODE_DB_SCHEMA) self.db_conn.execute(f"CREATE TABLE {self.NODE_DB_NAME} ({node_db_schema})") def __write_node_metadata(self, node): node.mature() node_dict = node.abridged_node_details() db_row = (node_dict['staker_address'], node_dict['rest_url'], node_dict['nickname'], node_dict['timestamp'], node_dict['last_seen'], node_dict['fleet_state_icon']) with self.db_conn: self.db_conn.execute(f'REPLACE INTO {self.NODE_DB_NAME} VALUES(?,?,?,?,?,?)', db_row)
[docs]class LocalFileBasedNodeStorage(NodeStorage): _name = 'local' __METADATA_FILENAME_TEMPLATE = '{}.node'
[docs] class NoNodeMetadataFileFound(FileNotFoundError, NodeStorage.UnknownNode): pass
def __init__(self, config_root: str = None, storage_root: str = None, metadata_dir: str = None, certificates_dir: str = None, *args, **kwargs ) -> None: super().__init__(*args, **kwargs) self.log = Logger(self.__class__.__name__) self.root_dir = storage_root self.metadata_dir = metadata_dir self.certificates_dir = certificates_dir self._cache_storage_filepaths(config_root=config_root) @staticmethod def _generate_storage_filepaths(config_root: str = None, storage_root: str = None, metadata_dir: str = None, certificates_dir: str = None): storage_root = storage_root or os.path.join(config_root or DEFAULT_CONFIG_ROOT, 'known_nodes') metadata_dir = metadata_dir or os.path.join(storage_root, 'metadata') certificates_dir = certificates_dir or os.path.join(storage_root, 'certificates') payload = {'storage_root': storage_root, 'metadata_dir': metadata_dir, 'certificates_dir': certificates_dir} return payload def _cache_storage_filepaths(self, config_root: str = None): filepaths = self._generate_storage_filepaths(config_root=config_root, storage_root=self.root_dir, metadata_dir=self.metadata_dir, certificates_dir=self.certificates_dir) self.root_dir = filepaths['storage_root'] self.metadata_dir = filepaths['metadata_dir'] self.certificates_dir = filepaths['certificates_dir'] # # Certificates # @validate_checksum_address def __get_certificate_filename(self, checksum_address: str): return '{}.{}'.format(checksum_address, Encoding.PEM.name.lower()) def __get_certificate_filepath(self, certificate_filename: str) -> str: return os.path.join(self.certificates_dir, certificate_filename) @validate_checksum_address def generate_certificate_filepath(self, checksum_address: str) -> str: certificate_filename = self.__get_certificate_filename(checksum_address) certificate_filepath = self.__get_certificate_filepath(certificate_filename=certificate_filename) return certificate_filepath @validate_checksum_address def __read_tls_public_certificate(self, filepath: str = None, checksum_address: str = None) -> Certificate: """Deserialize an X509 certificate from a filepath""" if not bool(filepath) ^ bool(checksum_address): raise ValueError("Either pass filepath or checksum_address; Not both.") if not filepath and checksum_address is not None: filepath = self.generate_certificate_filepath(checksum_address) try: with open(filepath, 'rb') as certificate_file: cert = x509.load_pem_x509_certificate(certificate_file.read(), backend=default_backend()) return cert except FileNotFoundError: raise FileNotFoundError("No SSL certificate found at {}".format(filepath)) # # Metadata # @validate_checksum_address def __generate_metadata_filepath(self, checksum_address: str, metadata_dir: str = None) -> str: metadata_path = os.path.join(metadata_dir or self.metadata_dir, self.__METADATA_FILENAME_TEMPLATE.format(checksum_address)) return metadata_path def __read_metadata(self, filepath: str, federated_only: bool): from nucypher.characters.lawful import Ursula try: with open(filepath, "rb") as seed_file: seed_file.seek(0) node_bytes = self.deserializer(seed_file.read()) node = Ursula.from_bytes(node_bytes) except FileNotFoundError: raise self.UnknownNode return node def __write_metadata(self, filepath: str, node): os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, "wb") as f: f.write(self.serializer(bytes(node))) self.log.info("Wrote new node metadata to filesystem {}".format(filepath)) return filepath # # API #
[docs] def all(self, federated_only: bool, certificates_only: bool = False) -> Set[Union[Any, Certificate]]: filenames = os.listdir(self.certificates_dir if certificates_only else self.metadata_dir) self.log.info("Found {} known node metadata files at {}".format(len(filenames), self.metadata_dir)) known_certificates = set() if certificates_only: for filename in filenames: certificate = self.__read_tls_public_certificate(os.path.join(self.certificates_dir, filename)) known_certificates.add(certificate) return known_certificates else: known_nodes = set() for filename in filenames: metadata_path = os.path.join(self.metadata_dir, filename) node = self.__read_metadata(filepath=metadata_path, federated_only=federated_only) # TODO: 466 known_nodes.add(node) return known_nodes
[docs] @validate_checksum_address def get(self, checksum_address: str, federated_only: bool, certificate_only: bool = False): if certificate_only is True: certificate = self.__read_tls_public_certificate(checksum_address=checksum_address) return certificate metadata_path = self.__generate_metadata_filepath(checksum_address=checksum_address) node = self.__read_metadata(filepath=metadata_path, federated_only=federated_only) # TODO: 466 return node
def store_node_certificate(self, certificate: Certificate, force: bool = True): certificate_filepath = self._write_tls_certificate(certificate=certificate, force=force) return certificate_filepath
[docs] def store_node_metadata(self, node, filepath: str = None) -> str: address = node.checksum_address filepath = self.__generate_metadata_filepath(checksum_address=address, metadata_dir=filepath) self.__write_metadata(filepath=filepath, node=node) return filepath
def save_node(self, node, force) -> Tuple[str, str]: certificate_filepath = self.store_node_certificate(certificate=node.certificate, force=force) metadata_filepath = self.store_node_metadata(node=node) return metadata_filepath, certificate_filepath
[docs] @validate_checksum_address def remove(self, checksum_address: str, metadata: bool = True, certificate: bool = True) -> None: if metadata is True: metadata_filepath = self.__generate_metadata_filepath(checksum_address=checksum_address) os.remove(metadata_filepath) self.log.debug("Deleted {} from the filesystem".format(checksum_address)) if certificate is True: certificate_filepath = self.generate_certificate_filepath(checksum_address=checksum_address) os.remove(certificate_filepath) self.log.debug("Deleted {} from the filesystem".format(checksum_address)) return
[docs] def clear(self, metadata: bool = True, certificates: bool = True) -> None: """Forget all stored nodes and certificates""" def __destroy_dir_contents(path) -> None: try: paths_to_remove = os.listdir(path) except FileNotFoundError: return else: for file in paths_to_remove: file_path = os.path.join(path, file) if os.path.isfile(file_path): os.unlink(file_path) if metadata is True: __destroy_dir_contents(self.metadata_dir) if certificates is True: __destroy_dir_contents(self.certificates_dir) return
def payload(self) -> dict: payload = { 'storage_type': self._name, 'storage_root': self.root_dir, 'metadata_dir': self.metadata_dir, 'certificates_dir': self.certificates_dir } return payload
[docs] @classmethod def from_payload(cls, payload: dict, *args, **kwargs) -> 'LocalFileBasedNodeStorage': storage_type = payload[cls._TYPE_LABEL] if not storage_type == cls._name: raise cls.NodeStorageError("Wrong storage type. got {}".format(storage_type)) del payload['storage_type'] return cls(*args, **payload, **kwargs)
[docs] def initialize(self) -> bool: storage_dirs = (self.root_dir, self.metadata_dir, self.certificates_dir) for storage_dir in storage_dirs: try: os.mkdir(storage_dir, mode=0o755) except FileExistsError: message = "There are pre-existing files at {}".format(self.root_dir) self.log.info(message) except FileNotFoundError: raise self.NodeStorageError("There is no existing configuration at {}".format(self.root_dir)) return bool(all(map(os.path.isdir, (self.root_dir, self.metadata_dir, self.certificates_dir))))
[docs]class TemporaryFileBasedNodeStorage(LocalFileBasedNodeStorage): _name = 'tmp' def __init__(self, *args, **kwargs): self.__temp_metadata_dir = None self.__temp_certificates_dir = None super().__init__(metadata_dir=self.__temp_metadata_dir, certificates_dir=self.__temp_certificates_dir, *args, **kwargs) # TODO: Pending fix for 1554. # def __del__(self): # if self.__temp_metadata_dir is not None: # shutil.rmtree(self.__temp_metadata_dir, ignore_errors=True) # shutil.rmtree(self.__temp_certificates_dir, ignore_errors=True)
[docs] def initialize(self) -> bool: # Metadata self.__temp_metadata_dir = tempfile.mkdtemp(prefix="nucypher-tmp-nodes-") self.metadata_dir = self.__temp_metadata_dir # Certificates self.__temp_certificates_dir = tempfile.mkdtemp(prefix="nucypher-tmp-certs-") self.certificates_dir = self.__temp_certificates_dir return bool(os.path.isdir(self.metadata_dir) and os.path.isdir(self.certificates_dir))
# # Node Storage Registry # NODE_STORAGES = {storage_class._name: storage_class for storage_class in NodeStorage.__subclasses__()}