Skip to content

GE CMS FTP

Sensors

Echoenergia 1.X wind turbines are equipped with the Bently Nevada 3701 condition monitoring system. It has 6 accelerometers positioned as shown in the image below.

GE CMS sensor locations

For a more detailed view on the sensors, the following images show them installed in a real wind turbine. These images were extracted from the Bently 3701 operations and maintenance manual saved in the Performance Sharepoint.

  • Main Bearing

    GE CMS Main Bear Sensor

  • Gearbox Planetary Stage

    GE CMS Planetary Sensor

  • Gearbox Intermediate Stage

    GE CMS IMS Sensor

  • Gearbox High Speed Stage

    GE CMS HSS Sensor

  • Generator Drive End

    GE CMS Gen DE Sensor

  • Generator Non-Drive End

    GE CMS Gen NDE Sensor

Data

The Bently CMS equipment in our GE wind farms collects Static and Dynamic data. Dynamic data is collected every four hours and Static data is collected every 10 minutes. This data is stored in the server using GE's proprietary Proficy Historian.

Waveforms are acquired in two different ways: synchronously or asynchronously.

  • Synchronous: Considers the rotation speed of the shaft so that every waveform collected represents 1, 2, 3... rotations of the shaft, regardless of its speed.
  • Asynchronous: Waveform collected at the same frequency all the times. This will result in waveforms that represent partial rotation cycles.

Below, the number of samples and resolutions for each sensor are described:

  • Asynchronous Waveform: 8192 samples, 320 ms
  • Synchronous Waveform: 8192 samples
    • 4 revolutions, 2048 samples/rev
    • 8 revolutions, 1024 samples/rev
    • 16 revolutions, 512 samples/rev
    • 32 revolutions, 256 samples/rev
    • 64 revolutions, 128 samples/rev
  • Spectrums:
    • Main Bearing:
      • 8 revolutions, 1024 samples/rev
      • 3200 lines Sync Enveloped
    • Gearbox stages:
      • 3200 lines Sync Enveloped
      • 3200 lines Sync High Res
      • Number of revolutions and samples per revolution vary based on the selected asset
    • Generator
      • 64 revolutions, 128 samples/rev
      • 3200 lines Sync Enveloped
      • 3200 lines Sync High Res
    • Tower Sway
      • 15.625 Hz, 200 lines

Data Access

OPC DA

The Farm Client has an OPC DA server which can be used to retrieve data from the farm client. It is available under the name Bently Nevada OPC DA Server V1 and has all static data available.

There are two main problems with using this OPC DA server:

  1. The data available is not real time, but 10 minute resolution. Using OPC to get this type of data is far from ideal, as if we lose the connection we'll not be able to retrieve historical data.
  2. The OPC server has a lot of points, and tests made with Bazefield proved that it is not reliable. When trying to get data for all the turbines, the server just did not respond, resulting in a lot of data gaps.

Considering the above, OPC DA was discarded as a reliable option to retrieve historical data.

SQL

Proficy Historian has a SQL like query interface that allows us to retrieve data from it. The documentation of this interface can be found here and in the CMS server test queries can be executed using the Historian Interactive SQL application, which is just a console where we can execute SQL commands to the historian.

Initial tests using this SQL interface showed that we can reliably retrieve all historical data available with it. Unfortunately, there were two main points that stopped us from using it:

  1. We could not connect to the historian using Python or any other SQL administration program. The only way the connection worked was through the Historian Interactive SQL application, which does not solve our problem. Maybe with further investigation this could be solved, but not much time was spent here.
  2. When trying to get data for many tags or assets the queries just hang up, being stuck for several minutes. As the goal was to retrieve all data, this was a main problem.

Again, considering the points above, SQL interface was discarded.

Farm Client Export

The most straightforward option to reliably get CMS data was to directly open the Farm Client in the CMS server and use its built in export functionality, which will create a CSV file with all the requested data.

The problem with this solution is that this export functionality is not available through any APIs, but only through the GUI of the Farm Client. This imposes a couple problems:

  1. Interacting with GUI programs is hard to do as you have to mimic mouse movement or keyboard presses to achieve your desired goal.
  2. GUI interfaces can only be accessed on Windows with a logged user, which is not the case when we are using a remote desktop connection that will log off when closing it.

Considering that all other possibilities were discarded, we invested some time to find solutions for the two problems above. For the first one we found a chain of keyboard commands that led to the export window and from there it was easy to fill the dates, select the turbines and export the data. For the second one, we found out that installing rdpwrap will increase the number of simultaneous connections to 15 and allow for localhost connection, this way we can open a remote connection to the local server which will force the GUI to say open.

Considering that rdpwrap is needed, please make sure it is installed following the steps described in its GitHub repository. In a simple way, the steps that need to be done are:

  1. Download the latest installer of rdwrap from the release page. Please download the zip file.
  2. Export the contents of the folder to a desktop folder
  3. Install the program using the install.bat file.
  4. Check if it is working using RDPConf.exe.

After rdpwrap is installed, we need to open a remote connection to the server itself but for another user. Doing this, we make sure that this second connection GUI is kept alive even if the first connection is closed. Usually an RDP icon to connect using user bazefield2 will be saved in the desktop, but if not, connect to localhost with user bazefield2 and password bazefield. Once this is done, you just leave the window open and you can close the first RDP session to the server, the scheduled job will run as expected.

One important point here is to make sure Group Policy settings do not force log off of disconnected sessions. To make sure this is correct go to the path below and make sure the settings are correct, without time limits.

  • Run gpedit.msc → Navigate to Computer ConfigurationAdministrative TemplatesWindows ComponentsRemote Desktop ServicesRemote Desktop Session HostSession Time Limits.

Debug

In case data is not being exported correctly (Airflow DAG is failing for all wind turbines of one site), you should check the following:

  1. Access the server remotely and check if the RDP window to localhost using user bazefield2 is open. In case it's not, just reconnect using the RDP icon in the desktop and wait for the next day.
  2. Are all the items below configured correctly? If not, probably the server was formatted and all needs to be configured again.
    1. Can you manually export data from Farm Client?
    2. Task for exporting data configured in task scheduler for every day with program/script C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe and arguments -command python f:/echo_cms/ge_cms_exporter.py
    3. Python installed.
    4. Pyautogui library installed in Python.
    5. Folder f:\echo_cms contains all the scripts?
    6. Folder f:\echo_cms\_vibration exists?
    7. Is the FilleZilla server installed and running?

FTP Server

An FTP server was configured in each GE CMS server to allow for remote access to the exported files and folders. This was done using FileZilla which is an open source FTP server.

To configure the FTP server, you need to open FileZilla Administration Interface in the server and connect using the credentials below.

  • Port for admin interface: 14148
  • Admin password: pErformance@

The definition of users allowed for remote access is done in the Rights Management -> Users section. By default, we created a performance user using the same password above and a native path F:/echo_cms was mapped to the virtual path /echo_cms. This will make the echo_cms folder available at the root when connecting to the FTP server using the performance user.

Scripts

The script used to import data from the FTP server is located in this package and it's description and source code can be found below.

Note

The importer_ge_cms_ftp method is called periodically by an Airflow DAG to import data from the FTP server automatically. Check Airflow for more information on the task and its schedule.

importer_ge_cms_ftp(objects=None, max_tries=3, **kwargs)

Method used to import CMS files from FTP server running in GE CMS server. This also assumes that the files are being correctly exported from the Bently Nevada Farm Client using the GE CMS Exporter script.

This script requires the following definitions in the performance database:

  1. Data source of type "ftp_ge_cms" with the following attributes:

    1.1 host_address: IP address of the FTP server

    1.2 user: username to connect to the FTP server

    1.3 password: password to connect to the FTP server

    1.4 ftp_folder: folder in the FTP server where the files are located

  2. Object with the following attributes:

    2.1 manufacturer_name: manufacturer name of the object

  3. Connection between the data source and the object

  4. Raw data definitions for each type of raw data that will be imported. The name of the raw data definition must be in the format "ge_cms_{raw_data_name}" where raw_data_name is the name of the raw data as it appears in the file name.

  5. Features definitions for the model of each object that will be imported. The data_source_type_name must be "ftp_ge_cms" and the name_in_data_source must be the name of the column in the static data file that will be imported.

The script will do the following:

  1. Connect to the performance database and get the list of data sources of type "ftp_ge_cms".

  2. For each data source, it will connect to the FTP server and get the list of files in the folder.

  3. For each object connected to the data source, it will get the list of files that start with the manufacturer_name of the object.

  4. For the files of the object, it will get the periods that define an export.

  5. For each period, it will get the static data and raw data files.

  6. It will read the static data and upload it to Bazefield.

  7. It will read the raw data and upload it to the performance database.

  8. It will delete the files from the FTP server.

  9. Errors will be raised and saved to the ErrorSummary object that will be returned.

Note: All files in the FTP server will be processed. If everything is correct, at the end of the process no files will be left in the FTP server.

Parameters:

  • objects

    (list[str] | None, default: None ) –

    List of desired objects to import data. Set to None to import all. By default None

  • max_tries

    (int, default: 3 ) –

    Number of tries when connecting to data source and copying files, by default 3

Returns:

  • ErrorSummary

    Object containing summary of all errors that occurred during the import process.

Source code in echo_cms/ge_cms_ftp.py
def importer_ge_cms_ftp(
    objects: list[str] | None = None,
    max_tries: int = 3,
    **kwargs,  # noqa
) -> ErrorSummary:
    """Method used to import CMS files from FTP server running in GE CMS server. This also assumes that the files are being correctly exported from the Bently Nevada Farm Client using the GE CMS Exporter script.

    This script requires the following definitions in the performance database:

    1. Data source of type "ftp_ge_cms" with the following attributes:

        1.1 host_address: IP address of the FTP server

        1.2 user: username to connect to the FTP server

        1.3 password: password to connect to the FTP server

        1.4 ftp_folder: folder in the FTP server where the files are located

    2. Object with the following attributes:

        2.1 manufacturer_name: manufacturer name of the object

    3. Connection between the data source and the object

    4. Raw data definitions for each type of raw data that will be imported. The name of the raw data definition must be in the format "ge_cms_{raw_data_name}" where raw_data_name is the name of the raw data as it appears in the file name.

    5. Features definitions for the model of each object that will be imported. The data_source_type_name must be "ftp_ge_cms" and the name_in_data_source must be the name of the column in the static data file that will be imported.

    The script will do the following:

    1. Connect to the performance database and get the list of data sources of type "ftp_ge_cms".

    2. For each data source, it will connect to the FTP server and get the list of files in the folder.

    3. For each object connected to the data source, it will get the list of files that start with the manufacturer_name of the object.

    4. For the files of the object, it will get the periods that define an export.

    5. For each period, it will get the static data and raw data files.

    6. It will read the static data and upload it to Bazefield.

    7. It will read the raw data and upload it to the performance database.

    8. It will delete the files from the FTP server.

    9. Errors will be raised and saved to the ErrorSummary object that will be returned.

    Note: All files in the FTP server will be processed. If everything is correct, at the end of the process no files will be left in the FTP server.

    Parameters
    ----------
    objects : list[str] | None, optional
        List of desired objects to import data. Set to None to import all. By default None
    max_tries : int, optional
        Number of tries when connecting to data source and copying files, by default 3

    Returns
    -------
    ErrorSummary
        Object containing summary of all errors that occurred during the import process.

    """
    # checking inputs
    if objects is None:
        objects = []

    if not isinstance(objects, list):
        raise TypeError(f"objects must be a list of strings, not {type(objects)}")
    if not all(isinstance(obj, str) for obj in objects):
        raise TypeError("objects must be a list of strings")
    if not isinstance(max_tries, int):
        raise TypeError(f"max_tries must be an integer, not {type(max_tries)}")

    # creating ErrorSummary
    errs_summary = ErrorSummary(name="importer_ge_cms_ftp")

    # creating connection to performance_db
    perfdb = PerfDB(application_name="importer_ge_cms_ftp")

    # creating connection to Bazefield
    baze = Baze()

    # creating a temporary directory to store the files
    Path("./temp/").mkdir(exist_ok=True, parents=True)

    # deleting all folders inside temp_dir that are more than 2 days old
    for folder in Path("./temp/").iterdir():
        if folder.is_dir() and datetime.fromtimestamp(folder.stat().st_ctime) < datetime.now() - timedelta(days=2):
            shutil.rmtree(folder, ignore_errors=True)
    with TemporaryDirectory(dir="./temp/") as temp_dir:
        logger.info(f"Created temporary directory {temp_dir}")

        # getting the list of data sources
        data_sources: dict[str, dict[str, Any]] = perfdb.datasources.instances.get(
            data_source_types_names=["ftp_ge_cms"],
            get_attributes=True,
        )

        logger.info(f"Found data sources: {list(data_sources.keys())}")

        # looping through the data sources
        for ds_name, ds_attributes in data_sources.items():
            logger.info(f"Processing data source {ds_name}")

            # creating ErrorDataSource
            errs_ds = ErrorDataSource(name=ds_name)
            errs_summary = errs_summary.add_child(errs_ds)

            try:
                # checking if all the necessary attributes are present
                required_attrs = ["host_address", "user", "password", "ftp_folder"]

                if any(attr not in ds_attributes for attr in required_attrs):
                    missing_attrs = [attr for attr in required_attrs if attr not in ds_attributes]
                    raise ValueError(f"Data source {ds_name} is missing attributes: {missing_attrs}")

                # checking if there are connected objects
                if ds_attributes["object_names"] is None or (
                    isinstance(ds_attributes["object_names"], list) and len(ds_attributes["object_names"]) == 0
                ):
                    continue

                # if we specified objects, check if any of them are connected to the data source
                if len(objects) > 0 and all(obj not in objects for obj in ds_attributes["object_names"]):
                    logger.info(
                        f"No objects to import for data source {ds_name} as of it's connected objects are in the list of desired objects",
                    )
                    continue

                # connecting to the FTP server
                ftp_conn_properties = FtpConnProperties(
                    host=ds_attributes["host_address"],
                    user=ds_attributes["user"],
                    password=ds_attributes["password"],
                    timeout=180,
                )
                # defining number of seconds as a random number from 3 to 10 seconds
                retry_wait = 3 + secrets.randbelow(8)
                ftp_handler = FtpHandler(ftp_conn_properties, max_retries=max_tries, retry_wait_time=retry_wait)

                # change_directory
                ftp_handler.change_directory(ds_attributes["ftp_folder"])
                baset_path = f"{ftp_handler.current_directory()}/"

                # checking if last_run.txt exists
                files_list = ftp_handler.list_contents()
                if "last_run.txt" not in files_list:
                    raise ValueError("last_run.txt not found in FTP folder, the exporter script may not be running correctly.")

                # copying last_run.txt to temp_dir
                ftp_handler.get_file(filename="last_run.txt", dest_directory=temp_dir)

                # reading last_run.txt to determine the last time the script was run
                with Path(temp_dir, "last_run.txt").open("r") as f:
                    lines = f.readlines()
                    last_run = datetime.strptime(lines[0].split(":", maxsplit=1)[1].strip(), "%Y-%m-%d %H:%M:%S")
                    last_start_day = datetime.strptime(lines[1].split(":", maxsplit=1)[1].strip(), "%Y-%m-%d %H:%M:%S")
                    last_end_day = datetime.strptime(lines[2].split(":", maxsplit=1)[1].strip(), "%Y-%m-%d %H:%M:%S")
                    logger.info(f"last_run.txt found. {last_run=}, {last_start_day=}, {last_end_day=}")
                if last_run < datetime.now() - timedelta(days=2):
                    err_message = f"The last run of the exporter script was more than 2 days ago ({last_run:%Y-%m-%d %H:%M:%S}). The script will try to import the data, but it may be incomplete. Check the CMS server ASAP."
                    logger.error(err_message)
                    errs_ds = errs_ds.add_exception(RuntimeError(err_message))

                ftp_handler.change_directory(f"{baset_path}_vibration/")

                # checking if there are files to be copied
                files_list = ftp_handler.list_contents()
                files_list.sort()
                logger.info(f"Found {len(files_list)} files in FTP folder")

                if len(files_list) == 0:
                    # checking if data was already imported
                    # reading CmsActivePower from Bazefield to check if there is data for the last day
                    check_period = DateTimeRange(last_end_day - timedelta(days=1), last_end_day)
                    check_data = baze.points.values.series.get(
                        points={ds_attributes["object_names"][0]: ["CmsActivePower"]},
                        period=check_period,
                    )
                    if len(check_data) > 0:
                        logger.info("No files found in FTP folder. Data was already imported.")
                        continue

                    raise ValueError(
                        "No files found in FTP folder. The exporter script may not be running correctly. Check the CMS server ASAP.",
                    )

                # iterating objects
                for obj_name in ds_attributes["object_names"]:
                    # skipping objects that are not in the list
                    if len(objects) > 0 and obj_name not in objects:
                        continue

                    # deleting object folder in temp_dir if it exists
                    shutil.rmtree(Path(temp_dir) / obj_name, ignore_errors=True)

                    logger.info(f"Processing object {obj_name} of data source {ds_name}")

                    # creating ErrorObject
                    errs_obj = ErrorObject(name=obj_name)
                    errs_ds = errs_ds.add_child(errs_obj)

                    try:
                        # getting object attributes
                        obj_def: dict[str, dict[str, Any]] = perfdb.objects.instances.get(
                            object_names=[obj_name],
                            get_attributes=True,
                            attribute_names=["manufacturer_name"],
                        )[obj_name]

                        # checking if object contains manufacturer_name
                        if "manufacturer_name" not in obj_def:
                            raise ValueError(f"Object {obj_name} is missing attribute 'manufacturer_name'")

                        # checking if there are files to be copied for this object (files starting with the manufacturer_name)
                        obj_files = [file for file in files_list if file.startswith(obj_def["manufacturer_name"])]

                        if not obj_files:
                            # checking if data was already imported
                            # reading CmsActivePower from Bazefield to check if there is data for the last day
                            check_period = DateTimeRange(last_end_day - timedelta(days=1), last_end_day)
                            check_data = baze.points.values.series.get(
                                points={obj_name: ["CmsActivePower"]},
                                period=check_period,
                            )
                            if len(check_data) > 0:
                                logger.info("No files found in FTP folder. Data was already imported.")
                                continue

                            raise ValueError(f"No files found for object {obj_name}")

                        logger.info(f"Found {len(obj_files)} files for object {obj_name}")

                        # creating directory for the object
                        obj_dir = Path(temp_dir) / obj_name
                        obj_dir.mkdir(exist_ok=True, parents=True)

                        # getting raw data def for this object
                        raw_data_def = perfdb.rawdata.definitions.get(
                            object_names=[obj_name],
                            data_source_types=["ftp_ge_cms"],
                            output_type="dict",
                        )[obj_def["object_model_name"]]

                        # getting all features for this turbine so we can later upload to Bazefield
                        features_def: pd.DataFrame = perfdb.features.definitions.get(object_names=[obj_name], output_type="DataFrame")
                        # filtering only to features with data_source_type_name = "ftp_ge_cms"
                        features_def = features_def[features_def["data_source_type_name"] == "ftp_ge_cms"].copy()
                        # removing first level of the index
                        features_def.index = features_def.index.droplevel(0)

                        # first getting all static data files to get the dates available
                        static_files = [file for file in obj_files if "static" in file.lower()]

                        # getting the periods available (split by _ in -3 and -2 that will be start and end dates)
                        periods_available = [f"{file.split('_')[-2]}_{file.split('_')[-1].split('.')[0]}" for file in static_files]

                        # iterating each period available
                        for period in periods_available:
                            # getting start and end dates
                            start_date, end_date = period.split("_")
                            # dates are in the format MMDDYYYYHHMM
                            period_range = DateTimeRange(start_date, end_date, date_format="%m%d%Y%H%M")

                            logger.info(f"{obj_name}: Processing period {period_range}")

                            # getting the files for this period
                            period_files = [file for file in obj_files if period in file]

                            # deleting all "waveform" or "DEI" files
                            not_wanted_files = [file for file in period_files if "waveform" in file.lower() or "dei" in file.lower()]
                            for not_wanted_file in not_wanted_files:
                                ftp_handler.delete_file(not_wanted_file)

                            # first reading static data as it will be used to get metadata as well
                            static_files = [file for file in period_files if "static" in file.lower()]
                            raw_files = [
                                file
                                for file in period_files
                                if "static" not in file.lower() and "waveform" not in file.lower() and "dei" not in file.lower()
                            ]

                            if not static_files or not raw_files:
                                # deleting all files for this period
                                for period_file in period_files:
                                    ftp_handler.delete_file(period_file)
                                continue

                            if not static_files or len(static_files) > 1:
                                err_message = f"{obj_name}: Found {len(static_files)} static files for period {period_range}. Expected 1."
                                logger.error(err_message)
                                errs_obj = errs_obj.add_exception(ValueError(err_message))

                            # copying static files to temp_dir
                            for file in static_files:
                                ftp_handler.get_file(filename=file, dest_directory=obj_dir)

                            # checking if file has more than two lines
                            more_than_2_lines = False
                            with Path(obj_dir, static_files[0]).open("r") as f:
                                line_count = 0
                                for _ in f:
                                    line_count += 1  # noqa
                                    if more_than_2_lines:
                                        break
                                    if line_count > 2:
                                        more_than_2_lines = True

                            if not more_than_2_lines:
                                logger.warning(f"{obj_name}: Static file {static_files[0]} has only one line. Deleting file and raw files.")
                                ftp_handler.delete_file(static_files[0])
                                for raw_file in raw_files:
                                    ftp_handler.delete_file(raw_file)
                                continue

                            # reading static_files
                            # dates are in the format 4/29/2024 1:55:48 AM
                            static_df = pd.read_csv(
                                Path(obj_dir, static_files[0]),
                                sep=",",
                                decimal=".",
                                header=0,
                                engine="pyarrow",
                                dtype_backend="pyarrow",
                                parse_dates=True,
                                index_col=0,
                                date_format="%m/%d/%Y %I:%M:%S %p",
                            )
                            # removing name of turbine from columns
                            static_df.columns = [col.split("\\", maxsplit=1)[-1] for col in static_df.columns]

                            # saving static data to Bazefield
                            baze_static_df = static_df.copy()
                            baze_static_df.index = baze_static_df.index.astype("datetime64[s]")
                            # adjusting timestamps to nearest 10 minutes
                            baze_static_df.index = baze_static_df.index.round("10min")
                            baze_static_df = baze_static_df.loc[~baze_static_df.index.duplicated(keep="first")]
                            baze_static_df.columns = [col.replace("\\", " - ") for col in baze_static_df.columns]
                            if not_wanted_cols := [
                                col for col in baze_static_df.columns if col not in features_def["name_in_data_source"].to_list()
                            ]:
                                baze_static_df = baze_static_df.drop(columns=not_wanted_cols)
                                err_message = f"{obj_name}: Removed columns {not_wanted_cols} from static data as they are not present in the features table."
                                logger.error(err_message)
                                errs_obj = errs_obj.add_exception(ValueError(err_message))
                            baze_static_df = baze_static_df.astype("double[pyarrow]")
                            # renaming from name_in_data_source to name of feature (index of features_def)
                            rename_dict = {v: k for k, v in features_def["name_in_data_source"].to_dict().items()}
                            baze_static_df = baze_static_df.rename(columns=rename_dict)
                            baze_static_df.index.name = "time"
                            baze_static_df.columns = pd.MultiIndex.from_product(
                                [[obj_name], baze_static_df.columns],
                                names=["object_name", "feature_name"],
                            )

                            # uploading to postgres and Bazefield
                            perfdb.features.values.series.insert(df=baze_static_df)

                            # getting only Power, Speed, Mode and Toque
                            static_df = static_df[[col for col in static_df.columns if "IONet" in col]].copy()
                            static_df = static_df.rename(
                                columns={col: col.split("\\", maxsplit=1)[1].replace("External", "") for col in static_df.columns},
                            )
                            static_df = static_df.dropna(how="all")

                            # copying raw data files to temp_dir
                            # we are ignoring files that contain "static", "waveform" or "DEI" in the name
                            for file in raw_files:
                                ftp_handler.get_file(filename=file, dest_directory=obj_dir)
                                logger.info(f"{obj_name}: Copied file {file} to temp_dir")

                            if not raw_files:
                                continue

                            # finding the file that contains less lines in all raw files
                            # this file will be the basis for selecting the moment of the day that will be representative of each day as all timestamps in this file are present in all other files
                            smaller_file = None
                            smallest_nrows = None
                            for raw_file in raw_files:
                                with Path(obj_dir, raw_file).open("r") as f:
                                    row_count = sum(1 for _ in f)
                                    if row_count > 1 and (smaller_file is None or row_count < smallest_nrows):
                                        smaller_file = raw_file
                                        smallest_nrows = row_count

                            if smaller_file is None:
                                # no files with data found
                                # deleting all raw files for this period and moving to the next period
                                logger.warning(
                                    f"{obj_name}: No raw files with data found for period {period_range}. Deleting all files for this period.",
                                )
                                for raw_file in raw_files:
                                    ftp_handler.delete_file(raw_file)
                                continue

                            # getting the timestamps in the smaller file
                            small_raw_df = pd.read_csv(
                                Path(obj_dir, smaller_file),
                                sep=",",
                                decimal=".",
                                header=None,
                                engine="pyarrow",
                                dtype_backend="pyarrow",
                                parse_dates=True,
                                index_col=0,
                                date_format="%m/%d/%Y %I:%M:%S %p",
                            )

                            # getting list of timestamps
                            raw_timestamps = small_raw_df.index.tolist()

                            # getting list of days present in the raw data
                            raw_days = [ts.date() for ts in raw_timestamps]
                            raw_days = dict.fromkeys(set(raw_days))

                            # getting the moment of the day that will be representative of each day
                            # we will use active power as the metric to select the moment, the moment with the highest active power will be selected
                            for day in raw_days:
                                # getting all timestamps for this day
                                day_timestamps = [ts for ts in raw_timestamps if ts.date() == day]
                                # getting the active power for each timestamp in this day using nearest lookup
                                selected_timestamp = None
                                selected_power = None
                                # getting a copy of static_df with rows that contain Power
                                power_df = static_df[["Power"]].dropna().copy()

                                # checking if there is any Power data in the static data
                                if power_df.empty:
                                    logger.warning(f"{obj_name}: No Power data found in static data for day {day}. Skipping day.")
                                    raw_days[day] = None
                                    continue

                                for ts in day_timestamps:
                                    # getting the closest timestamp in the static data
                                    closest_ts = power_df.index[power_df.index.get_indexer([ts], method="nearest")[0]]
                                    # getting the active power for this timestamp
                                    power = power_df.loc[closest_ts, "Power"]
                                    if selected_power is None or power > selected_power:
                                        selected_power = power
                                        selected_timestamp = ts

                                if selected_timestamp is None:
                                    err_message = f"{obj_name}: Could not find a representative moment for day {day}."
                                    logger.error(err_message)
                                    errs_obj = errs_obj.add_exception(ValueError(err_message))
                                    continue

                                raw_days[day] = {
                                    "Power": static_df.loc[selected_timestamp, "Power"],
                                    "Speed": static_df.loc[selected_timestamp, "Speed"],
                                    "Mode": static_df.loc[selected_timestamp, "Mode"],
                                    "Torque": static_df.loc[selected_timestamp, "Torque"],
                                    "Timestamp": selected_timestamp,
                                }
                                logger.debug(
                                    f"{obj_name}: Selected moment for day {day}: {raw_days[day]} with active power {selected_power}",
                                )

                            # reading each raw file and keeping only the data for the selected moments
                            for raw_file in raw_files:
                                # getting name of raw data
                                raw_data_name = raw_file.split("_", maxsplit=3)[1] + "_" + raw_file.split("_", maxsplit=3)[2]
                                # converting name to snake case
                                raw_data_name = raw_data_name.replace(" ", "_").lower()
                                # getting the raw data def for this raw data
                                raw_data_name_db = f"ge_cms_{raw_data_name}"
                                if raw_data_name_db not in raw_data_def:
                                    err_message = f"{obj_name}: Raw data {raw_data_name_db} not found in database."
                                    logger.error(err_message)
                                    errs_obj = errs_obj.add_exception(ValueError(err_message))
                                    continue

                                # checking if raw_file is empty
                                if Path(obj_dir, raw_file).stat().st_size == 0:
                                    logger.warning(f"{obj_name}: Raw file {raw_file} is empty. Deleting file.")
                                    ftp_handler.delete_file(raw_file)
                                    continue

                                # reading raw file
                                raw_df = pd.read_csv(
                                    Path(obj_dir, raw_file),
                                    sep=",",
                                    decimal=".",
                                    header=None,
                                    engine="pyarrow",
                                    dtype_backend="pyarrow",
                                    parse_dates=True,
                                    index_col=0,
                                    date_format="%m/%d/%Y %I:%M:%S %p",
                                )
                                # dropping rows that are not in the selected moments
                                raw_df = raw_df[raw_df.index.isin([x["Timestamp"] for x in raw_days.values() if x is not None])].copy()
                                raw_df = raw_df.T
                                # first dropping rows with all NaNs
                                raw_df = raw_df.dropna(how="all")
                                # now dropping columns that have more than 10% of NaNs
                                cols_before = len(raw_df.columns)
                                raw_df = raw_df.dropna(axis=1, thresh=int(0.9 * len(raw_df)))
                                cols_after = len(raw_df.columns)
                                if cols_before != cols_after:
                                    logger.warning(
                                        f"{obj_name}: Raw file {raw_file} had {cols_before - cols_after} timestamps with more than 10% of NaNs. They were removed",
                                    )
                                # converting any remaining NaNs to 0 to avoid errors in the next steps
                                raw_df = raw_df.fillna(0)
                                # converting to float 32
                                raw_df = raw_df.astype("float32")

                                # creating dict in the format to be inserted in the database
                                raw_data_dict = {obj_name: {raw_data_name_db: {}}}
                                for timestamp in raw_df.columns:
                                    # converting column values to a numpy array
                                    raw_data_dict[obj_name][raw_data_name_db][timestamp] = {
                                        "value": raw_df[timestamp].to_numpy(),
                                        "metadata": {
                                            k: None if pd.isna(v) else v for k, v in raw_days[timestamp.date()].items() if k != "Timestamp"
                                        },
                                    }
                                # inserting data into the database
                                perfdb.rawdata.values.insert(data=raw_data_dict, on_conflict="update", value_as_path=False)

                                # deleting raw file
                                ftp_handler.delete_file(raw_file)

                            # deleting static file
                            ftp_handler.delete_file(static_files[0])

                    except Exception as e:
                        logger.exception(f"Error processing object {obj_name}")
                        # adding error to the ErrorObject
                        errs_obj = errs_obj.add_exception(e)

            except Exception as e:
                logger.exception(f"Error processing data source {ds_name}")
                # adding error to the ErrorDataSource
                errs_ds = errs_ds.add_exception(e)
                errs_ds = errs_ds.add_exception(e)

    return errs_summary

Glossary

The following terms are used in the variables exported by the Farm Client. This was extracted from the software help and brought here for easier access.

  • Assembly Phase Passage Frequency: When a pinion and gear mesh, specific teeth contact. After rotations, these same teeth meet again, creating this frequency.
  • Ball Spin 1X Frequency: An individual rolling element's rotational speed. Defects cause a one-per-turn disturbance.
  • Ball Spin 2X Frequency: Twice an individual rolling element's rotational speed. Defects cause disturbances on both races, resulting in a 2X ball spin frequency component.
  • Bias: The transducer bias voltage in volts DC.
  • Cage Frequency: The frequency corresponding to one complete bearing cage revolution relative to a fixed reference.
  • Corner Clip: The highest alarm indication in the corner of the Alarm icons.
  • Crest Factor: The ratio of Direct and Direct RMS values. Changes indicate signal modifications like spikes or sideband harmonics.
  • Cumulative Event Rate: The total number of impulse events detected over time.
  • Cumulative Impulse Count: The total number of impulses detected.
  • Cumulative Impulse Energy: The total energy of the detected impulses.
  • Cumulative Impulse Measurement (CIM): A GE algorithm for detecting metal particles between gear teeth by monitoring impacts and their cumulative count and energy.
  • Direct: Data representing the overall zero-to-peak acceleration amplitude (0.5 Hz to 10,000 Hz).
  • Direct RMS: Similar to Direct but uses a root-mean-square (RMS) algorithm for signal magnitude (less sensitive to spikes).
  • Dynamic Energy Index (DEI): A GE algorithm to determine vibration energy within set frequency bands, normalized for operating conditions for trending machine performance.
  • Enveloping Spectrum Plot: Displays the repetition frequency of impulse events causing high-frequency machine vibration. Useful for early detection of bearing problems.
  • Gearmesh 1X Mesh frequency: The rate at which tooth pairs contact as they mesh.
  • Gearmesh 2X, 3X, etc. frequencies: Harmonics of the fundamental gear mesh frequency (multiples of 1X).
  • High Pass Direct: Data representing the overall zero-to-peak acceleration amplitude (5 kHz to 10 kHz).
  • Inner Race Ball Pass Frequency (IRBP): The frequency that rolling elements pass a point on the inner race.
  • Kurtosis: A measure of whether data is peaked or flat compared to a normal distribution. Changes indicate signal waveform shape modifications.
  • Outer Race Ball Pass Frequency (ORBP): The frequency that rolling elements pass a point on the outer race.
  • Tower Sway: The bending and movement of the wind turbine tower due to wind loads. Measured in axial and transverse directions.
  • WTG: Wind Turbine Generator