Source code for fgen.f2py

"""
f2py wrapper

Patches the default behaviour of f2py with some custom error handling.
This was inspired by the amazing [f90wrap](https://github.com/jameskermode/f90wrap).
"""
import sys

import numpy.f2py.rules as f2py_rules
from numpy.f2py import main as f2py_main  # type: ignore


[docs]def patch_f2py() -> None: """ Patch f2py to raise a RuntimeError if :c:func:`on_error` is called during execution Interrupt signals are also captured and handled. This has only been tested with numpy 1.21.0 and may be flakey if f2py has substantial changes. """ includes_inject = "#includes0#\n" if sys.platform == "win32": includes_inject = includes_inject + "#include <setjmpex.h>\n" else: includes_inject = includes_inject + "#include <setjmp.h>\n" includes_inject = ( includes_inject + """ #include <signal.h> #include <stdlib.h> #include <string.h> #define ABORT_BUFFER_SIZE 512 extern jmp_buf environment_buffer; extern char abort_message[ABORT_BUFFER_SIZE]; void on_error_(char *message, size_t len); void abort_int_handler(int signum); jmp_buf environment_buffer; char abort_message[ABORT_BUFFER_SIZE]; // Error handler for fgen void on_error_(char *message, size_t len_message) { strncpy(abort_message, message, ABORT_BUFFER_SIZE); abort_message[ABORT_BUFFER_SIZE-1] = '\\0'; longjmp(environment_buffer, 0); } // Handle sigint signals (ctrl + c) during a f2py calls void sigint_handler(int signum) { char message[] = "User interrupt occurred"; on_error_(message, strlen(message)); } """ ) f2py_rules.module_rules["modulebody"] = f2py_rules.module_rules[ "modulebody" ].replace("#includes0#\n", includes_inject) f2py_rules.routine_rules["body"] = f2py_rules.routine_rules["body"].replace( # type: ignore "volatile int f2py_success = 1;\n", "volatile int f2py_success = 1; int setjmp_value;", ) f2py_rules.routine_rules["body"] = f2py_rules.routine_rules["body"].replace( # type: ignore "#callfortranroutine#\n", """ PyOS_sighandler_t _npy_sig_save; // Catch any sigint (ctrl + c) with a call to sigint_handler _npy_sig_save = PyOS_setsig(SIGINT, sigint_handler); setjmp_value = setjmp(environment_buffer); if (setjmp_value != 0) { // jumped back as a result of a call to longjmp // Raise a RuntimeError PyOS_setsig(SIGINT, _npy_sig_save); PyErr_SetString(PyExc_RuntimeError, abort_message); } else { #callfortranroutine# PyOS_setsig(SIGINT, _npy_sig_save); } """, )
if __name__ == "__main__": patch_f2py() f2py_main()