Skip to content

rs_server_edrs/edrs_connector.md

<< Back to index

EDRS Connector module for secure FTPES communication.

EDRSConnector

EDRS Connector using FTPES (FTP over explicit TLS) for secure file transfers.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class EDRSConnector:
    """EDRS Connector using FTPES (FTP over explicit TLS) for secure file transfers."""

    def __init__(
        self,
        host: str,
        port: int,
        login: str,
        password: str,
        ca_cert: str,
        client_cert: str,
        client_key: str,
        disable_mlsd=True,
    ):
        """
        Initialize EDRS connector with FTPS (FTPES) credentials.
        """
        self.host = host
        self.port = port
        self.login = login
        self.password = password
        self.ca_cert = ca_cert
        self.client_cert = client_cert
        self.client_key = client_key
        self.ftp: FTP | FTP_TLS | None = None
        self.disable_mlsd = disable_mlsd  # Set to True to disable MLSD command usage
        # Read environment variable (defaults to FALSE)
        use_ssl_env = os.getenv("USE_SSL", "FALSE").strip().lower()
        self.use_ssl = use_ssl_env in ["1", "true", "yes"]

    def connect(self):
        """
        Establish an FTP or FTPES (explicit TLS) connection depending on USE_SSL.
        """
        if self.use_ssl:
            logger.debug("Connecting via FTPES (explicit TLS)...")
            # EDRS uses internal certificates; hostname verification intentionally disabled.
            context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.ca_cert)  # NOSONAR
            if self.client_cert and self.client_key:
                context.load_cert_chain(certfile=self.client_cert, keyfile=self.client_key)
            context.check_hostname = False  # NOSONAR
            context.verify_mode = ssl.CERT_REQUIRED

            self.ftp = FTP_TLS(context=context)
            self.ftp.connect(self.host, self.port, timeout=10)
            self.ftp.auth()  # AUTH TLS (explicit)
            self.ftp.prot_p()  # Encrypt data channel
            self.ftp.login(self.login, self.password)
        else:
            logger.debug("Connecting via plain FTP (no SSL)...")
            self.ftp = FTP()  # nosec B321 # NOSONAR
            self.ftp.connect(self.host, self.port, timeout=10)
            self.ftp.login(self.login, self.password)

        logger.info(f"Connected to {self.host}:{self.port} as {self.login}")

    def walk(self, path: str) -> list[dict[str, Any]]:
        """List files and directories under /NOMINAL/<path>.

        Args:
            path (str): Relative path under the NOMINAL directory
                (e.g., "SAT123/ch_01/").

        Returns:
            list[dict[str, str | int | None]]: A list of dictionaries containing
                information about each file or directory.

        Raises:
            ConnectionError: If the FTP client is not connected.
            RuntimeError: If the FTP listing operation fails.
        """

        if not self.ftp:
            raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

        base_path = f"/NOMINAL/{path.strip('/')}"

        entries = self._list_directory_entries(base_path)

        current_dir = self.ftp.pwd()
        results = []

        for entry in entries:
            info = self._get_entry_info(entry, current_dir)
            results.append(info)

        return results

    def _list_directory_entries(self, base_path: str) -> list[str]:
        """Helper to list directory entries, handling MLSD/NLST fallback."""
        if not self.ftp:
            raise ConnectionError(NOT_CONNECTED_ERROR_MSG)
        if self.disable_mlsd:
            return self.ftp.nlst(base_path)

        try:
            return [name for name, _ in self.ftp.mlsd(base_path)]
        except Exception as e:  # pylint: disable=broad-except
            logger.error(f"MLSD failed for {base_path}: {e}, using NLST instead.")
            if "500" in str(e):
                self.disable_mlsd = True
                return self.ftp.nlst(base_path)
            raise RuntimeError(f"Failed to list {base_path} using MLSD: {e}") from e

    def _get_entry_info(self, entry: str, current_dir: str) -> dict[str, Any]:
        """Helper to determine type and size of an FTP entry."""
        if not self.ftp:
            raise ConnectionError(NOT_CONNECTED_ERROR_MSG)
        info = {"path": entry, "type": "dir", "size": 0}
        try:
            self.ftp.cwd(entry)
            self.ftp.cwd(current_dir)
            return info
        except Exception:  # pylint: disable=broad-except
            info["type"] = "file"
            try:
                info["size"] = self.ftp.size(entry) or 0
            except Exception:  # pylint: disable=broad-except
                info["size"] = 0
            return info

    def download(self, remote_path: str, p_local_path: str = "") -> str:
        """Download a file from the FTP server.

        Args:
            remote_path (str): Remote path to the file (absolute or relative to the current working directory).
            local_path (str, optional): Local filesystem path where the file will be saved.
                If omitted, the remote filename is used in the current working directory.

        Returns:
            str: The local file path where the file was saved.

        Raises:
            ConnectionError: If the FTP client is not connected.
            RuntimeError: If the file cannot be retrieved.
        """

        if not self.ftp:
            raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

        # Determine local target path
        local_path: Path = Path(p_local_path) if p_local_path else Path(Path(remote_path).name)
        if not local_path.name:
            raise ValueError("remote_path has no filename and no local_path provided")

        # Ensure directory exists
        local_path.parent.mkdir(parents=True, exist_ok=True)

        try:
            with local_path.open("wb") as f:
                self.ftp.retrbinary(f"RETR {remote_path}", f.write)
        except Exception as e:
            local_path.unlink(missing_ok=True)  # Remove partial file if exists
            raise RuntimeError(f"Failed to download {remote_path}: {e}") from e

        return str(local_path)

    def read_file(self, remote_path: str) -> Any:
        """
        Read a file from the FTP server directly into memory.

        If the file is XML, it is parsed into a Python dictionary.
        Otherwise, the raw bytes content is returned.

        Args:
            remote_path (str): Path to the file on the FTP server.

        Returns:
            dict | bytes: A dictionary if the file is XML, otherwise raw bytes.

        Raises:
            ConnectionError: If the FTP client is not connected.
            RuntimeError: If the file cannot be retrieved or parsed.
        """

        if not self.ftp:
            raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

        buffer = io.BytesIO()

        try:
            # Retrieve the remote file into memory
            self.ftp.retrbinary(f"RETR {remote_path}", buffer.write)
        except Exception as e:
            error_msg = str(e)
            if "550" in error_msg or "Not a plain file" in error_msg:
                logger.error(f"Remote path '{remote_path}' is a directory, not a file.")
                raise RuntimeError(f"Remote path '{remote_path}' appears to be a directory, not a file.") from e
            logger.error(f"Failed to read remote file '{remote_path}': {e}")
            raise RuntimeError(f"Failed to read remote file '{remote_path}': {e}") from e

        buffer.seek(0)

        # Check if file is XML based on extension
        if remote_path.lower().endswith(".xml"):
            try:
                # Parse XML into dict
                content = xmltodict.parse(buffer.getvalue())
                return content
            except Exception as e:
                raise RuntimeError(f"Failed to parse XML file {remote_path}: {e}") from e
        else:
            # Return raw bytes for non-XML files
            return buffer.getvalue()

    def close(self):
        """Close the FTP connection."""
        if self.ftp:
            try:
                self.ftp.quit()
            except Exception:  # pylint: disable=broad-except
                self.ftp.close()
            logger.info("Connection closed.")

__init__(host, port, login, password, ca_cert, client_cert, client_key, disable_mlsd=True)

Initialize EDRS connector with FTPS (FTPES) credentials.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    host: str,
    port: int,
    login: str,
    password: str,
    ca_cert: str,
    client_cert: str,
    client_key: str,
    disable_mlsd=True,
):
    """
    Initialize EDRS connector with FTPS (FTPES) credentials.
    """
    self.host = host
    self.port = port
    self.login = login
    self.password = password
    self.ca_cert = ca_cert
    self.client_cert = client_cert
    self.client_key = client_key
    self.ftp: FTP | FTP_TLS | None = None
    self.disable_mlsd = disable_mlsd  # Set to True to disable MLSD command usage
    # Read environment variable (defaults to FALSE)
    use_ssl_env = os.getenv("USE_SSL", "FALSE").strip().lower()
    self.use_ssl = use_ssl_env in ["1", "true", "yes"]

close()

Close the FTP connection.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
237
238
239
240
241
242
243
244
def close(self):
    """Close the FTP connection."""
    if self.ftp:
        try:
            self.ftp.quit()
        except Exception:  # pylint: disable=broad-except
            self.ftp.close()
        logger.info("Connection closed.")

connect()

Establish an FTP or FTPES (explicit TLS) connection depending on USE_SSL.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def connect(self):
    """
    Establish an FTP or FTPES (explicit TLS) connection depending on USE_SSL.
    """
    if self.use_ssl:
        logger.debug("Connecting via FTPES (explicit TLS)...")
        # EDRS uses internal certificates; hostname verification intentionally disabled.
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self.ca_cert)  # NOSONAR
        if self.client_cert and self.client_key:
            context.load_cert_chain(certfile=self.client_cert, keyfile=self.client_key)
        context.check_hostname = False  # NOSONAR
        context.verify_mode = ssl.CERT_REQUIRED

        self.ftp = FTP_TLS(context=context)
        self.ftp.connect(self.host, self.port, timeout=10)
        self.ftp.auth()  # AUTH TLS (explicit)
        self.ftp.prot_p()  # Encrypt data channel
        self.ftp.login(self.login, self.password)
    else:
        logger.debug("Connecting via plain FTP (no SSL)...")
        self.ftp = FTP()  # nosec B321 # NOSONAR
        self.ftp.connect(self.host, self.port, timeout=10)
        self.ftp.login(self.login, self.password)

    logger.info(f"Connected to {self.host}:{self.port} as {self.login}")

download(remote_path, p_local_path='')

Download a file from the FTP server.

Parameters:

Name Type Description Default
remote_path str

Remote path to the file (absolute or relative to the current working directory).

required
local_path str

Local filesystem path where the file will be saved. If omitted, the remote filename is used in the current working directory.

required

Returns:

Name Type Description
str str

The local file path where the file was saved.

Raises:

Type Description
ConnectionError

If the FTP client is not connected.

RuntimeError

If the file cannot be retrieved.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def download(self, remote_path: str, p_local_path: str = "") -> str:
    """Download a file from the FTP server.

    Args:
        remote_path (str): Remote path to the file (absolute or relative to the current working directory).
        local_path (str, optional): Local filesystem path where the file will be saved.
            If omitted, the remote filename is used in the current working directory.

    Returns:
        str: The local file path where the file was saved.

    Raises:
        ConnectionError: If the FTP client is not connected.
        RuntimeError: If the file cannot be retrieved.
    """

    if not self.ftp:
        raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

    # Determine local target path
    local_path: Path = Path(p_local_path) if p_local_path else Path(Path(remote_path).name)
    if not local_path.name:
        raise ValueError("remote_path has no filename and no local_path provided")

    # Ensure directory exists
    local_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        with local_path.open("wb") as f:
            self.ftp.retrbinary(f"RETR {remote_path}", f.write)
    except Exception as e:
        local_path.unlink(missing_ok=True)  # Remove partial file if exists
        raise RuntimeError(f"Failed to download {remote_path}: {e}") from e

    return str(local_path)

read_file(remote_path)

Read a file from the FTP server directly into memory.

If the file is XML, it is parsed into a Python dictionary. Otherwise, the raw bytes content is returned.

Parameters:

Name Type Description Default
remote_path str

Path to the file on the FTP server.

required

Returns:

Type Description
Any

dict | bytes: A dictionary if the file is XML, otherwise raw bytes.

Raises:

Type Description
ConnectionError

If the FTP client is not connected.

RuntimeError

If the file cannot be retrieved or parsed.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def read_file(self, remote_path: str) -> Any:
    """
    Read a file from the FTP server directly into memory.

    If the file is XML, it is parsed into a Python dictionary.
    Otherwise, the raw bytes content is returned.

    Args:
        remote_path (str): Path to the file on the FTP server.

    Returns:
        dict | bytes: A dictionary if the file is XML, otherwise raw bytes.

    Raises:
        ConnectionError: If the FTP client is not connected.
        RuntimeError: If the file cannot be retrieved or parsed.
    """

    if not self.ftp:
        raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

    buffer = io.BytesIO()

    try:
        # Retrieve the remote file into memory
        self.ftp.retrbinary(f"RETR {remote_path}", buffer.write)
    except Exception as e:
        error_msg = str(e)
        if "550" in error_msg or "Not a plain file" in error_msg:
            logger.error(f"Remote path '{remote_path}' is a directory, not a file.")
            raise RuntimeError(f"Remote path '{remote_path}' appears to be a directory, not a file.") from e
        logger.error(f"Failed to read remote file '{remote_path}': {e}")
        raise RuntimeError(f"Failed to read remote file '{remote_path}': {e}") from e

    buffer.seek(0)

    # Check if file is XML based on extension
    if remote_path.lower().endswith(".xml"):
        try:
            # Parse XML into dict
            content = xmltodict.parse(buffer.getvalue())
            return content
        except Exception as e:
            raise RuntimeError(f"Failed to parse XML file {remote_path}: {e}") from e
    else:
        # Return raw bytes for non-XML files
        return buffer.getvalue()

walk(path)

List files and directories under /NOMINAL/.

Parameters:

Name Type Description Default
path str

Relative path under the NOMINAL directory (e.g., "SAT123/ch_01/").

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, str | int | None]]: A list of dictionaries containing information about each file or directory.

Raises:

Type Description
ConnectionError

If the FTP client is not connected.

RuntimeError

If the FTP listing operation fails.

Source code in docs/rs-server/services/edrs/rs_server_edrs/edrs_connector.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def walk(self, path: str) -> list[dict[str, Any]]:
    """List files and directories under /NOMINAL/<path>.

    Args:
        path (str): Relative path under the NOMINAL directory
            (e.g., "SAT123/ch_01/").

    Returns:
        list[dict[str, str | int | None]]: A list of dictionaries containing
            information about each file or directory.

    Raises:
        ConnectionError: If the FTP client is not connected.
        RuntimeError: If the FTP listing operation fails.
    """

    if not self.ftp:
        raise ConnectionError(NOT_CONNECTED_ERROR_MSG)

    base_path = f"/NOMINAL/{path.strip('/')}"

    entries = self._list_directory_entries(base_path)

    current_dir = self.ftp.pwd()
    results = []

    for entry in entries:
        info = self._get_entry_info(entry, current_dir)
        results.append(info)

    return results