Earthquake Detection Workflow using QuakeFlow¶
Outline¶
Here we show an example of the current modules in QuakeFlow
- Download data using Obpsy: 
- PhaseNet for picking P/S phases - Find more details in PhaseNet github page 
- GaMMA for associating picking and estimate approximate location and magnitude - Find more details in GaMMA github page 
- Earthquake location, magnitude estimation, etc. 
git clone https://github.com/wayneweiqiang/PhaseNet.git
git clone https://github.com/wayneweiqiang/GaMMA.git
conda env update -f=env.yml -n baseSecond option: install to quakeflow environment, but need to select jupyter notebook kernel to quakeflow
conda env create -f=env.yml -n quakeflow
python -m ipykernel install --user --name=quakeflowIn [19]:
                Copied!
                
                
            import warnings
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
warnings.filterwarnings("ignore")
import warnings
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
warnings.filterwarnings("ignore")
    
        1. Set configurations¶
In [20]:
                Copied!
                
                
            import os
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("agg")
%matplotlib inline
region_name = "Demo"
# region_name = "Ridgecrest"
# region_name = "SaltonSea"
# region_name = "Ridgecrest"
# region_name = "SanSimeon"
# region_name = "Italy"
# region_name = "PNSN"
# region_name = "Hawaii2"
# region_name = "SierraNegra"
# region_name = "PuertoRico"
# region_name = "SmithValley"
# region_name = "Antilles"
# region_name = "LongValley"
dir_name = region_name
if not os.path.exists(dir_name):
    os.mkdir(dir_name)
root_dir = lambda x: os.path.join(dir_name, x)
run_local = True
import os
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("agg")
%matplotlib inline
region_name = "Demo"
# region_name = "Ridgecrest"
# region_name = "SaltonSea"
# region_name = "Ridgecrest"
# region_name = "SanSimeon"
# region_name = "Italy"
# region_name = "PNSN"
# region_name = "Hawaii2"
# region_name = "SierraNegra"
# region_name = "PuertoRico"
# region_name = "SmithValley"
# region_name = "Antilles"
# region_name = "LongValley"
dir_name = region_name
if not os.path.exists(dir_name):
    os.mkdir(dir_name)
root_dir = lambda x: os.path.join(dir_name, x)
run_local = True
    
        In [21]:
                Copied!
                
                
            def set_config(
    region_name,
    index_json: OutputPath("json"),
    config_json: OutputPath("json"),
    datetime_json: OutputPath("json"),
    num_parallel: int = 1,
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> list:
    import datetime
    import json
    import os
    import pickle
    import numpy as np
    import obspy
    degree2km = np.pi * 6371 / 180
    if region_name == "Demo":
        center = (-117.504, 35.705)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2019-07-04T17")
        endtime = obspy.UTCDateTime("2019-07-04T19")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Ridgecrest":
        center = (-117.504, 35.705)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2019-07-04T00")
        endtime = obspy.UTCDateTime("2019-07-10T00")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Hawaii":
        center = (-155.32, 19.39)
        horizontal_degree = 2.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2018-01-01T00")
        endtime = obspy.UTCDateTime("2022-08-12T00")
        client = "IRIS"
        network_list = ["HV", "PT"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Hawaii2":
        center = (-155.32, 19.39)
        horizontal_degree = 2.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2022-08-01T00")
        endtime = obspy.UTCDateTime("2022-08-16T00")
        client = "IRIS"
        network_list = ["HV", "PT"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "PuertoRico":
        center = (-66.5, 18)
        horizontal_degree = 3.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2018-05-01T00")
        endtime = obspy.UTCDateTime("2021-11-01T00")
        client = "IRIS"
        network_list = ["*"]
        channel_list = "HH*,BH*,HN*"
    if region_name == "SaltonSea":
        center = (-115.53, 32.98)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2020-10-01T00")
        endtime = obspy.UTCDateTime("2020-10-01T02")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "2003SanSimeon":
        center = (-121.101, 35.701)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2003-12-22T00")
        endtime = obspy.UTCDateTime("2003-12-24T00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Italy":
        center = (13.188, 42.723)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2016-08-24T00")
        endtime = obspy.UTCDateTime("2016-08-26T00")
        client = "INGV"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "SmithValley":
        center = (-119.5, 38.51)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2021-07-08T00:00")
        endtime = obspy.UTCDateTime("2021-07-16T00:00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Antilles":
        center = (-61.14867, 14.79683)
        horizontal_degree = 0.2
        vertical_degree = 0.2
        starttime = obspy.UTCDateTime("2021-04-10T00")
        endtime = obspy.UTCDateTime("2021-04-15T00")
        client = "RESIF"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "LongValley":
        # center = (-118.8, 37.6+0.3)
        # horizontal_degree = 3.0
        # vertical_degree = 3.0
        center = (-118.8 - 0.1, 37.6)
        horizontal_degree = 1.5
        vertical_degree = 1.5
        starttime = obspy.UTCDateTime("2020-01-01T00")
        endtime = obspy.UTCDateTime("2022-08-11T00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,EH*"
    if region_name == "SierraNegra":
        center = (-91.13, -0.81)
        horizontal_degree = 1.5
        vertical_degree = 1.5
        starttime = obspy.UTCDateTime("2009-07-22T00")
        endtime = obspy.UTCDateTime("2011-06-20T00")
        client = "IRIS"
        network_list = ["*"]
        channel_list = "BH*,HH*,EH*"
    ####### save config ########
    config = {}
    config["region"] = region_name
    config["center"] = center
    config["xlim_degree"] = [
        center[0] - horizontal_degree / 2,
        center[0] + horizontal_degree / 2,
    ]
    config["ylim_degree"] = [
        center[1] - vertical_degree / 2,
        center[1] + vertical_degree / 2,
    ]
    config["min_longitude"] = center[0] - horizontal_degree / 2
    config["max_longitude"] = center[0] + horizontal_degree / 2
    config["min_latitude"] = center[1] - vertical_degree / 2
    config["max_latitude"] = center[1] + vertical_degree / 2
    config["degree2km"] = degree2km
    config["starttime"] = starttime.datetime.isoformat(timespec="milliseconds")
    config["endtime"] = endtime.datetime.isoformat(timespec="milliseconds")
    config["networks"] = network_list
    config["channels"] = channel_list
    config["client"] = client
    ## PhaseNet
    config["phasenet"] = {}
    ## GaMMA
    config["gamma"] = {}
    ## HypoDD
    config["hypodd"] = {"MAXEVENT": 1e4}
    with open(config_json, "w") as fp:
        json.dump(config, fp, indent=2)
    print(json.dumps(config, indent=4))
    ####### set paraell for cloud ########
    ## split data by hours
    # one_day = datetime.timedelta(days=1)
    one_hour = datetime.timedelta(hours=1)
    starttimes = []
    tmp_start = starttime
    while tmp_start < endtime:
        starttimes.append(tmp_start.datetime.isoformat(timespec="milliseconds"))
        tmp_start += one_hour
    with open(datetime_json, "w") as fp:
        json.dump(
            {"starttimes": starttimes, "interval": one_hour.total_seconds()},
            fp,
            indent=2,
        )
    ## split stattimes into N parallel process
    if num_parallel == 0:
        num_parallel = min(60, int((len(starttimes) - 1) // 6 + 1))
        # num_parallel = min(30, int((len(starttimes)-1)//6+1))
        # num_parallel = min(24, len(starttimes))
    idx = [x.tolist() for x in np.array_split(np.arange(len(starttimes)), num_parallel)]
    with open(index_json, "w") as fp:
        json.dump(idx, fp, indent=2)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/config.json",
            config_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/datetime.json",
            datetime_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/index.json",
            index_json,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    return list(range(num_parallel))
def set_config(
    region_name,
    index_json: OutputPath("json"),
    config_json: OutputPath("json"),
    datetime_json: OutputPath("json"),
    num_parallel: int = 1,
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> list:
    import datetime
    import json
    import os
    import pickle
    import numpy as np
    import obspy
    degree2km = np.pi * 6371 / 180
    if region_name == "Demo":
        center = (-117.504, 35.705)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2019-07-04T17")
        endtime = obspy.UTCDateTime("2019-07-04T19")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Ridgecrest":
        center = (-117.504, 35.705)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2019-07-04T00")
        endtime = obspy.UTCDateTime("2019-07-10T00")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Hawaii":
        center = (-155.32, 19.39)
        horizontal_degree = 2.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2018-01-01T00")
        endtime = obspy.UTCDateTime("2022-08-12T00")
        client = "IRIS"
        network_list = ["HV", "PT"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Hawaii2":
        center = (-155.32, 19.39)
        horizontal_degree = 2.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2022-08-01T00")
        endtime = obspy.UTCDateTime("2022-08-16T00")
        client = "IRIS"
        network_list = ["HV", "PT"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "PuertoRico":
        center = (-66.5, 18)
        horizontal_degree = 3.0
        vertical_degree = 2.0
        starttime = obspy.UTCDateTime("2018-05-01T00")
        endtime = obspy.UTCDateTime("2021-11-01T00")
        client = "IRIS"
        network_list = ["*"]
        channel_list = "HH*,BH*,HN*"
    if region_name == "SaltonSea":
        center = (-115.53, 32.98)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2020-10-01T00")
        endtime = obspy.UTCDateTime("2020-10-01T02")
        client = "SCEDC"
        network_list = ["CI"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "2003SanSimeon":
        center = (-121.101, 35.701)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2003-12-22T00")
        endtime = obspy.UTCDateTime("2003-12-24T00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Italy":
        center = (13.188, 42.723)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2016-08-24T00")
        endtime = obspy.UTCDateTime("2016-08-26T00")
        client = "INGV"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "SmithValley":
        center = (-119.5, 38.51)
        horizontal_degree = 1.0
        vertical_degree = 1.0
        starttime = obspy.UTCDateTime("2021-07-08T00:00")
        endtime = obspy.UTCDateTime("2021-07-16T00:00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "Antilles":
        center = (-61.14867, 14.79683)
        horizontal_degree = 0.2
        vertical_degree = 0.2
        starttime = obspy.UTCDateTime("2021-04-10T00")
        endtime = obspy.UTCDateTime("2021-04-15T00")
        client = "RESIF"
        network_list = ["*"]
        channel_list = "HH*,BH*,EH*,HN*"
    if region_name == "LongValley":
        # center = (-118.8, 37.6+0.3)
        # horizontal_degree = 3.0
        # vertical_degree = 3.0
        center = (-118.8 - 0.1, 37.6)
        horizontal_degree = 1.5
        vertical_degree = 1.5
        starttime = obspy.UTCDateTime("2020-01-01T00")
        endtime = obspy.UTCDateTime("2022-08-11T00")
        client = "NCEDC"
        network_list = ["*"]
        channel_list = "HH*,EH*"
    if region_name == "SierraNegra":
        center = (-91.13, -0.81)
        horizontal_degree = 1.5
        vertical_degree = 1.5
        starttime = obspy.UTCDateTime("2009-07-22T00")
        endtime = obspy.UTCDateTime("2011-06-20T00")
        client = "IRIS"
        network_list = ["*"]
        channel_list = "BH*,HH*,EH*"
    ####### save config ########
    config = {}
    config["region"] = region_name
    config["center"] = center
    config["xlim_degree"] = [
        center[0] - horizontal_degree / 2,
        center[0] + horizontal_degree / 2,
    ]
    config["ylim_degree"] = [
        center[1] - vertical_degree / 2,
        center[1] + vertical_degree / 2,
    ]
    config["min_longitude"] = center[0] - horizontal_degree / 2
    config["max_longitude"] = center[0] + horizontal_degree / 2
    config["min_latitude"] = center[1] - vertical_degree / 2
    config["max_latitude"] = center[1] + vertical_degree / 2
    config["degree2km"] = degree2km
    config["starttime"] = starttime.datetime.isoformat(timespec="milliseconds")
    config["endtime"] = endtime.datetime.isoformat(timespec="milliseconds")
    config["networks"] = network_list
    config["channels"] = channel_list
    config["client"] = client
    ## PhaseNet
    config["phasenet"] = {}
    ## GaMMA
    config["gamma"] = {}
    ## HypoDD
    config["hypodd"] = {"MAXEVENT": 1e4}
    with open(config_json, "w") as fp:
        json.dump(config, fp, indent=2)
    print(json.dumps(config, indent=4))
    ####### set paraell for cloud ########
    ## split data by hours
    # one_day = datetime.timedelta(days=1)
    one_hour = datetime.timedelta(hours=1)
    starttimes = []
    tmp_start = starttime
    while tmp_start < endtime:
        starttimes.append(tmp_start.datetime.isoformat(timespec="milliseconds"))
        tmp_start += one_hour
    with open(datetime_json, "w") as fp:
        json.dump(
            {"starttimes": starttimes, "interval": one_hour.total_seconds()},
            fp,
            indent=2,
        )
    ## split stattimes into N parallel process
    if num_parallel == 0:
        num_parallel = min(60, int((len(starttimes) - 1) // 6 + 1))
        # num_parallel = min(30, int((len(starttimes)-1)//6+1))
        # num_parallel = min(24, len(starttimes))
    idx = [x.tolist() for x in np.array_split(np.arange(len(starttimes)), num_parallel)]
    with open(index_json, "w") as fp:
        json.dump(idx, fp, indent=2)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/config.json",
            config_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/datetime.json",
            datetime_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/index.json",
            index_json,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    return list(range(num_parallel))
    
        In [22]:
                Copied!
                
                
            if run_local:
    idx = set_config(
        region_name,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("datetimes.json"),
        num_parallel=1,
    )
if run_local:
    idx = set_config(
        region_name,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("datetimes.json"),
        num_parallel=1,
    )
    
        {
    "region": "Demo",
    "center": [
        -117.504,
        35.705
    ],
    "xlim_degree": [
        -118.004,
        -117.004
    ],
    "ylim_degree": [
        35.205,
        36.205
    ],
    "min_longitude": -118.004,
    "max_longitude": -117.004,
    "min_latitude": 35.205,
    "max_latitude": 36.205,
    "degree2km": 111.19492664455873,
    "starttime": "2019-07-04T17:00:00.000",
    "endtime": "2019-07-04T19:00:00.000",
    "networks": [
        "CI"
    ],
    "channels": "HH*,BH*,EH*,HN*",
    "client": "SCEDC",
    "phasenet": {},
    "gamma": {},
    "hypodd": {
        "MAXEVENT": 10000.0
    }
}
ERROR: can not access minio service! 
No module named 'minio'
In [23]:
                Copied!
                
                
            config_op = comp.func_to_container_op(
    set_config,
    base_image="python:3.8",
    packages_to_install=["numpy", "obspy", "minio"],
)
config_op = comp.func_to_container_op(
    set_config,
    base_image="python:3.8",
    packages_to_install=["numpy", "obspy", "minio"],
)
    
        2. Download events of the standard catalog¶
This catalog is not used by QuakeFolow. It is only used for comparing detection results.
In [24]:
                Copied!
                
                
            def download_events(
    config_json: InputPath("json"), 
    standard_catalog: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    import pickle
    from collections import defaultdict
    import matplotlib
    import matplotlib.pyplot as plt
    import obspy
    import pandas as pd
    from obspy.clients.fdsn import Client
    # matplotlib.use("agg")
    # %matplotlib inline
    with open(config_json, "r") as fp:
        config = json.load(fp)
    ####### IRIS catalog ########
    try:
        events = Client(config["client"]).get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
            # filename='events.xml',
        )
    except:
        events = Client("iris").get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
            # filename='events.xml',
        )
    #     events = obspy.read_events('events.xml')
    print(f"Number of events: {len(events)}")
    #     events.plot('local', outfile="events.png")
    #     events.plot('local')
    ####### Save catalog ########
    catalog = defaultdict(list)
    for event in events:
        if len(event.magnitudes) > 0:
            catalog["time"].append(event.origins[0].time.datetime)
            catalog["magnitude"].append(event.magnitudes[0].mag)
            catalog["longitude"].append(event.origins[0].longitude)
            catalog["latitude"].append(event.origins[0].latitude)
            catalog["depth(m)"].append(event.origins[0].depth)
    catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
    catalog.to_csv(
        standard_catalog,
        # sep="\t",
        index=False,
        float_format="%.3f",
        date_format="%Y-%m-%dT%H:%M:%S.%f",
        columns=["time", "magnitude", "longitude", "latitude", "depth(m)"],
    )
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/standard_catalog.csv",
            standard_catalog,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    ####### Plot catalog ########
    plt.figure()
    plt.plot(catalog["longitude"], catalog["latitude"], ".", markersize=1)
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    # plt.savefig(os.path.join(data_path, "events_loc.png"))
    plt.show()
    plt.figure()
    plt.plot_date(catalog["time"], catalog["magnitude"], ".", markersize=1)
    plt.gcf().autofmt_xdate()
    plt.ylabel("Magnitude")
    plt.title(f"Number of events: {len(events)}")
    # plt.savefig(os.path.join(data_path, "events_mag_time.png"))
    plt.show()
def download_events(
    config_json: InputPath("json"), 
    standard_catalog: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    import pickle
    from collections import defaultdict
    import matplotlib
    import matplotlib.pyplot as plt
    import obspy
    import pandas as pd
    from obspy.clients.fdsn import Client
    # matplotlib.use("agg")
    # %matplotlib inline
    with open(config_json, "r") as fp:
        config = json.load(fp)
    ####### IRIS catalog ########
    try:
        events = Client(config["client"]).get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
            # filename='events.xml',
        )
    except:
        events = Client("iris").get_events(
            starttime=config["starttime"],
            endtime=config["endtime"],
            minlongitude=config["xlim_degree"][0],
            maxlongitude=config["xlim_degree"][1],
            minlatitude=config["ylim_degree"][0],
            maxlatitude=config["ylim_degree"][1],
            # filename='events.xml',
        )
    #     events = obspy.read_events('events.xml')
    print(f"Number of events: {len(events)}")
    #     events.plot('local', outfile="events.png")
    #     events.plot('local')
    ####### Save catalog ########
    catalog = defaultdict(list)
    for event in events:
        if len(event.magnitudes) > 0:
            catalog["time"].append(event.origins[0].time.datetime)
            catalog["magnitude"].append(event.magnitudes[0].mag)
            catalog["longitude"].append(event.origins[0].longitude)
            catalog["latitude"].append(event.origins[0].latitude)
            catalog["depth(m)"].append(event.origins[0].depth)
    catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
    catalog.to_csv(
        standard_catalog,
        # sep="\t",
        index=False,
        float_format="%.3f",
        date_format="%Y-%m-%dT%H:%M:%S.%f",
        columns=["time", "magnitude", "longitude", "latitude", "depth(m)"],
    )
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/standard_catalog.csv",
            standard_catalog,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    ####### Plot catalog ########
    plt.figure()
    plt.plot(catalog["longitude"], catalog["latitude"], ".", markersize=1)
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    # plt.savefig(os.path.join(data_path, "events_loc.png"))
    plt.show()
    plt.figure()
    plt.plot_date(catalog["time"], catalog["magnitude"], ".", markersize=1)
    plt.gcf().autofmt_xdate()
    plt.ylabel("Magnitude")
    plt.title(f"Number of events: {len(events)}")
    # plt.savefig(os.path.join(data_path, "events_mag_time.png"))
    plt.show()
    
        In [25]:
                Copied!
                
                
            if run_local:
    download_events(root_dir("config.json"), root_dir("standard_catalog.csv"))
if run_local:
    download_events(root_dir("config.json"), root_dir("standard_catalog.csv"))
    
        Number of events: 252 ERROR: can not access minio service! No module named 'minio'
In [26]:
                Copied!
                
                
            download_events_op = comp.func_to_container_op(
    download_events,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
        "minio",
    ],
)
download_events_op = comp.func_to_container_op(
    download_events,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
        "minio",
    ],
)
    
        3. Download stations¶
In [27]:
                Copied!
                
                
            def download_stations(
    config_json: InputPath("json"),
    station_json: OutputPath("json"),
    station_pkl: OutputPath("pickle"),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    import pickle
    from collections import defaultdict
    import matplotlib
    import matplotlib.pyplot as plt
    import obspy
    import pandas as pd
    from obspy.clients.fdsn import Client
    # matplotlib.use("agg")
    # %matplotlib inline
    with open(config_json, "r") as fp:
        config = json.load(fp)
    print("Network:", ",".join(config["networks"]))
    ####### Download stations ########
    stations = Client(config["client"]).get_stations(
        network=",".join(config["networks"]),
        station="*",
        starttime=config["starttime"],
        endtime=config["endtime"],
        minlongitude=config["xlim_degree"][0],
        maxlongitude=config["xlim_degree"][1],
        minlatitude=config["ylim_degree"][0],
        maxlatitude=config["ylim_degree"][1],
        channel=config["channels"],
        level="response",
        # filename="stations.xml"
    )
    #     stations = obspy.read_inventory("stations.xml")
    print("Number of stations: {}".format(sum([len(x) for x in stations])))
    # stations.plot('local', outfile="stations.png")
    #     stations.plot('local')
    ####### Save stations ########
    station_locs = defaultdict(dict)
    for network in stations:
        for station in network:
            for chn in station:
                sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
                if sid in station_locs:
                    if chn.code[-1] not in station_locs[sid]["component"]:
                        station_locs[sid]["component"].append(chn.code[-1])
                        station_locs[sid]["response"].append(round(chn.response.instrument_sensitivity.value, 2))
                else:
                    tmp_dict = {
                        "longitude": chn.longitude,
                        "latitude": chn.latitude,
                        "elevation(m)": chn.elevation,
                        "component": [
                            chn.code[-1],
                        ],
                        "response": [
                            round(chn.response.instrument_sensitivity.value, 2),
                        ],
                        "unit": chn.response.instrument_sensitivity.input_units.lower(),
                    }
                    station_locs[sid] = tmp_dict
    with open(station_json, "w") as fp:
        json.dump(station_locs, fp, indent=2)
    with open(station_pkl, "wb") as fp:
        pickle.dump(stations, fp)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/stations.json",
            station_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/stations.pkl",
            station_pkl,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    
    ######## Plot stations ########
    station_locs = pd.DataFrame.from_dict(station_locs, orient="index")
    plt.figure()
    plt.plot(station_locs["longitude"], station_locs["latitude"], "^", label="Stations")
    plt.xlabel("X (km)")
    plt.ylabel("Y (km)")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    plt.legend()
    plt.title(f"Number of stations: {len(station_locs)}")
    #     plt.savefig(os.path.join(data_path, "stations_loc.png"))
    plt.show()
def download_stations(
    config_json: InputPath("json"),
    station_json: OutputPath("json"),
    station_pkl: OutputPath("pickle"),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    import pickle
    from collections import defaultdict
    import matplotlib
    import matplotlib.pyplot as plt
    import obspy
    import pandas as pd
    from obspy.clients.fdsn import Client
    # matplotlib.use("agg")
    # %matplotlib inline
    with open(config_json, "r") as fp:
        config = json.load(fp)
    print("Network:", ",".join(config["networks"]))
    ####### Download stations ########
    stations = Client(config["client"]).get_stations(
        network=",".join(config["networks"]),
        station="*",
        starttime=config["starttime"],
        endtime=config["endtime"],
        minlongitude=config["xlim_degree"][0],
        maxlongitude=config["xlim_degree"][1],
        minlatitude=config["ylim_degree"][0],
        maxlatitude=config["ylim_degree"][1],
        channel=config["channels"],
        level="response",
        # filename="stations.xml"
    )
    #     stations = obspy.read_inventory("stations.xml")
    print("Number of stations: {}".format(sum([len(x) for x in stations])))
    # stations.plot('local', outfile="stations.png")
    #     stations.plot('local')
    ####### Save stations ########
    station_locs = defaultdict(dict)
    for network in stations:
        for station in network:
            for chn in station:
                sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
                if sid in station_locs:
                    if chn.code[-1] not in station_locs[sid]["component"]:
                        station_locs[sid]["component"].append(chn.code[-1])
                        station_locs[sid]["response"].append(round(chn.response.instrument_sensitivity.value, 2))
                else:
                    tmp_dict = {
                        "longitude": chn.longitude,
                        "latitude": chn.latitude,
                        "elevation(m)": chn.elevation,
                        "component": [
                            chn.code[-1],
                        ],
                        "response": [
                            round(chn.response.instrument_sensitivity.value, 2),
                        ],
                        "unit": chn.response.instrument_sensitivity.input_units.lower(),
                    }
                    station_locs[sid] = tmp_dict
    with open(station_json, "w") as fp:
        json.dump(station_locs, fp, indent=2)
    with open(station_pkl, "wb") as fp:
        pickle.dump(stations, fp)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/stations.json",
            station_json,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/stations.pkl",
            station_pkl,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    
    ######## Plot stations ########
    station_locs = pd.DataFrame.from_dict(station_locs, orient="index")
    plt.figure()
    plt.plot(station_locs["longitude"], station_locs["latitude"], "^", label="Stations")
    plt.xlabel("X (km)")
    plt.ylabel("Y (km)")
    plt.axis("scaled")
    plt.xlim(config["xlim_degree"])
    plt.ylim(config["ylim_degree"])
    plt.legend()
    plt.title(f"Number of stations: {len(station_locs)}")
    #     plt.savefig(os.path.join(data_path, "stations_loc.png"))
    plt.show()
    
        In [28]:
                Copied!
                
                
            if run_local:
    download_stations(root_dir("config.json"), root_dir("stations.json"), root_dir("stations.pkl"))
if run_local:
    download_stations(root_dir("config.json"), root_dir("stations.json"), root_dir("stations.pkl"))
    
        Network: CI Number of stations: 17 ERROR: can not access minio service! No module named 'minio'
In [29]:
                Copied!
                
                
            download_stations_op = comp.func_to_container_op(
    download_stations,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
        "minio",
    ],
)
download_stations_op = comp.func_to_container_op(
    download_stations,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "obspy",
        "pandas",
        "matplotlib",
        "minio",
    ],
)
    
        4. Download waveform data¶
In [30]:
                Copied!
                
                
            def download_waveform(
    node_i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    datetime_json: InputPath("json"),
    station_pkl: InputPath("pickle"),
    fname_csv: OutputPath(str),
    data_path: str,
    bucket_name: str = "waveforms",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> str:
    import json
    import os
    import pickle
    import random
    import threading
    import time
    import obspy
    from obspy.clients.fdsn import Client
    lock = threading.Lock()
    upload_minio = False
    # try:
    #     from minio import Minio
    #     minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    #     if not minioClient.bucket_exists(bucket_name):
    #         minioClient.make_bucket(bucket_name)
    #     upload_minio = True
    # except Exception as err:
    #     # print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[node_i]
    with open(config_json, "r") as fp:
        config = json.load(fp)
    with open(datetime_json, "r") as fp:
        tmp = json.load(fp)
        starttimes = tmp["starttimes"]
        interval = tmp["interval"]
    with open(station_pkl, "rb") as fp:
        stations = pickle.load(fp)
    waveform_dir = os.path.join(data_path, config["region"], "waveforms")
    if not os.path.exists(waveform_dir):
        os.makedirs(waveform_dir)
    ####### Download data ########
    client = Client(config["client"])
    fname_list = ["fname"]
    def download(i):
        #     for i in idx:
        starttime = obspy.UTCDateTime(starttimes[i])
        endtime = starttime + interval
        fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
        if not upload_minio:
            if os.path.exists(os.path.join(waveform_dir, fname)):
                print(f"{fname} exists")
                fname_list.append(fname)
                return
        else:
            try:
                minioClient.fget_object(
                    bucket_name,
                    os.path.join(config["region"], fname),
                    os.path.join(waveform_dir, fname),
                )
                print(
                    f"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}"
                )
                fname_list.append(fname)
                return
            except Exception as err:
                print(err)
        max_retry = 10
        stream = obspy.Stream()
        print(f"{fname} download starts")
        num_sta = 0
        for network in stations:
            for station in network:
                print(f"********{network.code}.{station.code}********")
                retry = 0
                while retry < max_retry:
                    try:
                        tmp = client.get_waveforms(
                            network.code,
                            station.code,
                            "*",
                            config["channels"],
                            starttime,
                            endtime,
                        )
                        #  for trace in tmp:
                        #      if trace.stats.sampling_rate != 100:
                        #          print(trace)
                        #          trace = trace.interpolate(100, method="linear")
                        #      trace = trace.detrend("spline", order=2, dspline=5*trace.stats.sampling_rate)
                        #      stream.append(trace)
                        stream += tmp
                        num_sta += len(tmp)
                        break
                    except Exception as err:
                        print("Error {}.{}: {}".format(network.code, station.code, err))
                        message = "No data available for request."
                        if str(err)[: len(message)] == message:
                            break
                        retry += 1
                        time.sleep(5)
                        continue
                if retry == max_retry:
                    print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}")
        if len(stream) > 0:
            # stream = stream.merge(fill_value=0)
            # stream = stream.trim(starttime, endtime, pad=True, fill_value=0)
            stream.write(os.path.join(waveform_dir, fname))
            print(f"{fname} download succeeds")
            # if upload_minio:
            #     minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
            #     print(f"{fname} upload to minio {os.path.join(config['region'], fname)}")
        else:
            print(f"{fname} empty data")
        lock.acquire()
        fname_list.append(fname)
        lock.release()
    threads = []
    MAX_THREADS = 2
    # MAX_THREADS = 1
    for ii, i in enumerate(idx):
        t = threading.Thread(target=download, args=(i,))
        t.start()
        time.sleep(1)
        threads.append(t)
        if ii % MAX_THREADS == MAX_THREADS - 1:
            for t in threads:
                t.join()
            threads = []
    for t in threads:
        t.join()
    with open(fname_csv, "w") as fp:
        fp.write("\n".join(fname_list))
    return waveform_dir
def download_waveform(
    node_i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    datetime_json: InputPath("json"),
    station_pkl: InputPath("pickle"),
    fname_csv: OutputPath(str),
    data_path: str,
    bucket_name: str = "waveforms",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> str:
    import json
    import os
    import pickle
    import random
    import threading
    import time
    import obspy
    from obspy.clients.fdsn import Client
    lock = threading.Lock()
    upload_minio = False
    # try:
    #     from minio import Minio
    #     minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    #     if not minioClient.bucket_exists(bucket_name):
    #         minioClient.make_bucket(bucket_name)
    #     upload_minio = True
    # except Exception as err:
    #     # print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[node_i]
    with open(config_json, "r") as fp:
        config = json.load(fp)
    with open(datetime_json, "r") as fp:
        tmp = json.load(fp)
        starttimes = tmp["starttimes"]
        interval = tmp["interval"]
    with open(station_pkl, "rb") as fp:
        stations = pickle.load(fp)
    waveform_dir = os.path.join(data_path, config["region"], "waveforms")
    if not os.path.exists(waveform_dir):
        os.makedirs(waveform_dir)
    ####### Download data ########
    client = Client(config["client"])
    fname_list = ["fname"]
    def download(i):
        #     for i in idx:
        starttime = obspy.UTCDateTime(starttimes[i])
        endtime = starttime + interval
        fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
        if not upload_minio:
            if os.path.exists(os.path.join(waveform_dir, fname)):
                print(f"{fname} exists")
                fname_list.append(fname)
                return
        else:
            try:
                minioClient.fget_object(
                    bucket_name,
                    os.path.join(config["region"], fname),
                    os.path.join(waveform_dir, fname),
                )
                print(
                    f"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}"
                )
                fname_list.append(fname)
                return
            except Exception as err:
                print(err)
        max_retry = 10
        stream = obspy.Stream()
        print(f"{fname} download starts")
        num_sta = 0
        for network in stations:
            for station in network:
                print(f"********{network.code}.{station.code}********")
                retry = 0
                while retry < max_retry:
                    try:
                        tmp = client.get_waveforms(
                            network.code,
                            station.code,
                            "*",
                            config["channels"],
                            starttime,
                            endtime,
                        )
                        #  for trace in tmp:
                        #      if trace.stats.sampling_rate != 100:
                        #          print(trace)
                        #          trace = trace.interpolate(100, method="linear")
                        #      trace = trace.detrend("spline", order=2, dspline=5*trace.stats.sampling_rate)
                        #      stream.append(trace)
                        stream += tmp
                        num_sta += len(tmp)
                        break
                    except Exception as err:
                        print("Error {}.{}: {}".format(network.code, station.code, err))
                        message = "No data available for request."
                        if str(err)[: len(message)] == message:
                            break
                        retry += 1
                        time.sleep(5)
                        continue
                if retry == max_retry:
                    print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}")
        if len(stream) > 0:
            # stream = stream.merge(fill_value=0)
            # stream = stream.trim(starttime, endtime, pad=True, fill_value=0)
            stream.write(os.path.join(waveform_dir, fname))
            print(f"{fname} download succeeds")
            # if upload_minio:
            #     minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
            #     print(f"{fname} upload to minio {os.path.join(config['region'], fname)}")
        else:
            print(f"{fname} empty data")
        lock.acquire()
        fname_list.append(fname)
        lock.release()
    threads = []
    MAX_THREADS = 2
    # MAX_THREADS = 1
    for ii, i in enumerate(idx):
        t = threading.Thread(target=download, args=(i,))
        t.start()
        time.sleep(1)
        threads.append(t)
        if ii % MAX_THREADS == MAX_THREADS - 1:
            for t in threads:
                t.join()
            threads = []
    for t in threads:
        t.join()
    with open(fname_csv, "w") as fp:
        fp.write("\n".join(fname_list))
    return waveform_dir
    
        In [31]:
                Copied!
                
                
            if run_local:
    waveform_path = download_waveform(
        0,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("datetimes.json"),
        root_dir("stations.pkl"),
        root_dir("fname.csv"),
        data_path=root_dir(""),
    )
if run_local:
    waveform_path = download_waveform(
        0,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("datetimes.json"),
        root_dir("stations.pkl"),
        root_dir("fname.csv"),
        data_path=root_dir(""),
    )
    
        2019-07-04T17:00:00.mseed exists 2019-07-04T18:00:00.mseed exists
In [32]:
                Copied!
                
                
            download_waveform_op = comp.func_to_container_op(
    download_waveform,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=["obspy", "minio"],
)
download_waveform_op = comp.func_to_container_op(
    download_waveform,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=["obspy", "minio"],
)
    
        5. Run PhaseNet to pick P/S picks¶
In [33]:
                Copied!
                
                
            def phasenet_op(data_path: str, data_list: str, stations: str):
    return dsl.ContainerOp(
        name="PhaseNet Picking",
        image="zhuwq0/phasenet-api:1.0",
        command=["python"],
        arguments=[
            "phasenet/predict.py",
            "--model",
            "model/190703-214543",
            "--data_dir",
            data_path,
            "--data_list",
            dsl.InputArgumentPath(data_list),
            "--stations",
            dsl.InputArgumentPath(stations),
            # '--result_dir', "results",
            "--format",
            "mseed_array",
            "--amplitude",
        ],
        file_outputs={"picks": "/opt/results/picks.csv"},
    )
def phasenet_op(data_path: str, data_list: str, stations: str):
    return dsl.ContainerOp(
        name="PhaseNet Picking",
        image="zhuwq0/phasenet-api:1.0",
        command=["python"],
        arguments=[
            "phasenet/predict.py",
            "--model",
            "model/190703-214543",
            "--data_dir",
            data_path,
            "--data_list",
            dsl.InputArgumentPath(data_list),
            "--stations",
            dsl.InputArgumentPath(stations),
            # '--result_dir', "results",
            "--format",
            "mseed_array",
            "--amplitude",
        ],
        file_outputs={"picks": "/opt/results/picks.csv"},
    )
    
        In [37]:
                Copied!
                
                
            # %%capture
if run_local:
    command = f"python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir={root_dir(root_dir('waveforms'))} --data_list={root_dir('fname.csv')} --stations={root_dir('stations.json')} --result_dir={root_dir('phasenet')} --format=mseed_array --amplitude"# --upload_waveform"
    print(command)
    !{command}
# %%capture
if run_local:
    command = f"python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir={root_dir(root_dir('waveforms'))} --data_list={root_dir('fname.csv')} --stations={root_dir('stations.json')} --result_dir={root_dir('phasenet')} --format=mseed_array --amplitude"# --upload_waveform"
    print(command)
    !{command}
    
        python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir=Demo/Demo/waveforms --data_list=Demo/fname.csv --stations=Demo/stations.json --result_dir=Demo/phasenet --format=mseed_array --amplitude
2023-03-15 14:40:16.834011: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
                 longitude   latitude  elevation(m)  component                                 response    unit
CI.CCC..BH     -117.364530  35.524950         670.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.CCC..HH     -117.364530  35.524950         670.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.CCC..HN     -117.364530  35.524950         670.0  [E, N, Z]           [213979.0, 214322.0, 213808.0]  m/s**2
CI.CLC..BH     -117.597510  35.815740         775.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.CLC..HH     -117.597510  35.815740         775.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.CLC..HN     -117.597510  35.815740         775.0  [E, N, Z]           [213945.0, 213808.0, 213740.0]  m/s**2
CI.DTP..BH     -117.845810  35.267420         908.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.DTP..HH     -117.845810  35.267420         908.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.DTP..HN     -117.845810  35.267420         908.0  [E, N, Z]           [214399.0, 213971.0, 214484.0]  m/s**2
CI.JRC2..BH    -117.808850  35.982490        1469.0  [E, N, Z]  [784866000.0, 784866000.0, 790478000.0]     m/s
CI.JRC2..HH    -117.808850  35.982490        1469.0  [E, N, Z]  [784866000.0, 784866000.0, 790478000.0]     m/s
CI.JRC2..HN    -117.808850  35.982490        1469.0  [E, N, Z]           [213808.0, 213945.0, 214185.0]  m/s**2
CI.LRL..BH     -117.682120  35.479540        1340.0  [E, N, Z]  [628306000.0, 629984000.0, 627467000.0]     m/s
CI.LRL..HH     -117.682120  35.479540        1340.0  [E, N, Z]  [628306000.0, 629984000.0, 627467000.0]     m/s
CI.LRL..HN     -117.682120  35.479540        1340.0  [E, N, Z]           [213757.0, 213671.0, 213201.0]  m/s**2
CI.LRL.2C.HN   -117.682120  35.479540        1340.0  [E, N, Z]           [213757.0, 213671.0, 213201.0]  m/s**2
CI.MPM..BH     -117.489010  36.057990        1839.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.MPM..HH     -117.489010  36.057990        1839.0  [E, N, Z]  [627368000.0, 627368000.0, 627368000.0]     m/s
CI.MPM..HN     -117.489010  36.057990        1839.0  [E, N, Z]           [213911.0, 214219.0, 213911.0]  m/s**2
CI.Q0072.01.HN -117.666721  35.609617         695.0  [E, N, Z]           [256354.0, 256354.0, 256354.0]  m/s**2
CI.SLA..BH     -117.283320  35.890950        1174.0  [E, N, Z]  [622338000.0, 618992000.0, 616482000.0]     m/s
CI.SLA..HH     -117.283320  35.890950        1174.0  [E, N, Z]  [622338000.0, 618992000.0, 616482000.0]     m/s
CI.SLA..HN     -117.283320  35.890950        1174.0  [E, N, Z]           [214253.0, 213671.0, 213979.0]  m/s**2
CI.SRT..BH     -117.750510  35.692350         667.0  [E, N, Z]  [633341000.0, 633341000.0, 633341000.0]     m/s
CI.SRT..HH     -117.750510  35.692350         667.0  [E, N, Z]  [629865000.0, 629865000.0, 629865000.0]     m/s
CI.SRT..HN     -117.750510  35.692350         667.0  [E, N, Z]           [214301.0, 213873.0, 214087.0]  m/s**2
CI.TOW2..BH    -117.764880  35.808560         685.0  [E, N, Z]  [626915000.0, 626915000.0, 626843000.0]     m/s
CI.TOW2..HH    -117.764880  35.808560         685.0  [E, N, Z]  [626911000.0, 626911000.0, 626839000.0]     m/s
CI.TOW2..HN    -117.764880  35.808560         685.0  [E, N, Z]           [213800.0, 214142.0, 214356.0]  m/s**2
CI.WBM..BH     -117.890490  35.608390         892.0  [E, N, Z]  [314573000.0, 314573000.0, 314573000.0]     m/s
CI.WBM..HH     -117.890490  35.608390         892.0  [E, N, Z]  [314573000.0, 314573000.0, 314573000.0]     m/s
CI.WBM..HN     -117.890490  35.608390         892.0  [E, N, Z]           [213550.0, 214064.0, 213550.0]  m/s**2
CI.WBM.2C.HN   -117.890490  35.608390         892.0  [E, N, Z]           [213550.0, 214064.0, 213550.0]  m/s**2
CI.WCS2..BH    -117.765260  36.025210        1143.0  [E, N, Z]  [626910000.0, 626910000.0, 626838000.0]     m/s
CI.WCS2..HH    -117.765260  36.025210        1143.0  [E, N, Z]  [626910000.0, 626910000.0, 626838000.0]     m/s
CI.WCS2..HN    -117.765260  36.025210        1143.0  [E, N, Z]           [213757.0, 213329.0, 213415.0]  m/s**2
CI.WMF..BH     -117.854860  36.117580        1537.4  [E, N, Z]  [629963000.0, 631652000.0, 629963000.0]     m/s
CI.WMF..HH     -117.854860  36.117580        1537.4  [E, N, Z]  [626506000.0, 628185000.0, 626506000.0]     m/s
CI.WMF..HN     -117.854860  36.117580        1537.4  [E, N, Z]           [214087.0, 214087.0, 214087.0]  m/s**2
CI.WMF.2C.HN   -117.854860  36.117580        1537.4  [E, N, Z]           [213952.0, 213952.0, 213952.0]  m/s**2
CI.WNM..EH     -117.906160  35.842200         974.3        [Z]                             [69306200.0]     m/s
CI.WNM..HN     -117.906160  35.842200         974.3  [E, N, Z]           [214054.0, 213926.0, 214054.0]  m/s**2
CI.WNM.2C.HN   -117.906160  35.842200         974.3  [E, N, Z]           [214048.0, 213920.0, 214048.0]  m/s**2
CI.WRC2..BH    -117.650380  35.947900         943.0  [E, N, Z]  [629145000.0, 629145000.0, 629145000.0]     m/s
CI.WRC2..HH    -117.650380  35.947900         943.0  [E, N, Z]  [629145000.0, 629145000.0, 629145000.0]     m/s
CI.WRC2..HN    -117.650380  35.947900         943.0  [E, N, Z]           [214227.0, 213970.0, 214056.0]  m/s**2
CI.WRV2..EH    -117.890400  36.007740        1070.0        [Z]                             [71427500.0]     m/s
CI.WRV2..HN    -117.890400  36.007740        1070.0  [E, N, Z]           [213883.0, 235224.0, 235139.0]  m/s**2
CI.WRV2.2C.HN  -117.890400  36.007740        1070.0  [E, N, Z]           [213877.0, 235218.0, 235132.0]  m/s**2
CI.WVP2..EH    -117.817690  35.949390        1465.0        [Z]                             [68041300.0]     m/s
CI.WVP2..HN    -117.817690  35.949390        1465.0  [E, N, Z]           [213764.0, 213550.0, 213721.0]  m/s**2
CI.WVP2.2C.HN  -117.817690  35.949390        1465.0  [E, N, Z]           [213782.0, 213569.0, 213740.0]  m/s**2
2023-03-15 14:40:21,519 Resampling CI.CCC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,618 Resampling CI.CCC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,626 Resampling CI.CCC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,633 Resampling CI.CLC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,640 Resampling CI.CLC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,646 Resampling CI.CLC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,653 Resampling CI.DTP..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,660 Resampling CI.DTP..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,667 Resampling CI.DTP..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,674 Resampling CI.JRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,681 Resampling CI.JRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,687 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,694 Resampling CI.LRL..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,699 Resampling CI.LRL..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,705 Resampling CI.LRL..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,711 Resampling CI.MPM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,717 Resampling CI.MPM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,722 Resampling CI.MPM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,728 Resampling CI.SLA..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,735 Resampling CI.SLA..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,741 Resampling CI.SLA..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,748 Resampling CI.SRT..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,755 Resampling CI.SRT..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,762 Resampling CI.SRT..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,768 Resampling CI.TOW2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,774 Resampling CI.TOW2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,780 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,787 Resampling CI.WBM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,793 Resampling CI.WBM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,798 Resampling CI.WBM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,804 Resampling CI.WCS2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,811 Resampling CI.WCS2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,817 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,823 Resampling CI.WMF..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,830 Resampling CI.WMF..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,836 Resampling CI.WMF..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:21,842 Resampling CI.WRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:21,847 Resampling CI.WRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:21,853 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz
Empty trace: CI.LRL.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.LRL.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.LRL.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNZ 2019-07-04T16:59:59.994500Z
2023-03-15 14:40:32,078 Pred log: Demo/phasenet
2023-03-15 14:40:32,078 Dataset size: 2
2023-03-15 14:40:32.210711: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-15 14:40:32.219257: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 80. Tune using inter_op_parallelism_threads for best performance.
2023-03-15 14:40:32,265 Model: depths 5, filters 8, filter size 7x1, pool size: 4x1, dilation rate: 1x1
2023-03-15 14:40:34.157725: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 80. Tune using inter_op_parallelism_threads for best performance.
2023-03-15 14:40:34.286939: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:357] MLIR V1 optimization pass is not enabled
2023-03-15 14:40:34,616 restoring model ../PhaseNet/model/190703-214543/model_95.ckpt
Pred:   0%|                                               | 0/2 [00:00<?, ?it/s]2023-03-15 14:40:35,609 Resampling CI.CCC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,618 Resampling CI.CCC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,624 Resampling CI.CCC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,630 Resampling CI.CLC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,637 Resampling CI.CLC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,643 Resampling CI.CLC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,649 Resampling CI.DTP..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,655 Resampling CI.DTP..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,661 Resampling CI.DTP..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,668 Resampling CI.JRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,674 Resampling CI.JRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,680 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,686 Resampling CI.LRL..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,692 Resampling CI.LRL..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,698 Resampling CI.LRL..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,705 Resampling CI.MPM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,711 Resampling CI.MPM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,717 Resampling CI.MPM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,724 Resampling CI.SLA..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,730 Resampling CI.SLA..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,737 Resampling CI.SLA..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,743 Resampling CI.SRT..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,750 Resampling CI.SRT..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,756 Resampling CI.SRT..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,763 Resampling CI.TOW2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,769 Resampling CI.TOW2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,776 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,782 Resampling CI.WBM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,789 Resampling CI.WBM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,795 Resampling CI.WBM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,802 Resampling CI.WCS2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,808 Resampling CI.WCS2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,815 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,822 Resampling CI.WMF..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,829 Resampling CI.WMF..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,836 Resampling CI.WMF..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:35,843 Resampling CI.WRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:35,849 Resampling CI.WRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:35,856 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz
Empty trace: CI.LRL.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.LRL.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.LRL.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.Q0072.01.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WBM.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WMF.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WNM.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WRV2.2C.HNZ 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNE 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNN 2019-07-04T16:59:59.994500Z
Empty trace: CI.WVP2.2C.HNZ 2019-07-04T16:59:59.994500Z
2023-03-15 14:40:52,818 Resampling CI.CCC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,828 Resampling CI.CCC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,834 Resampling CI.CCC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,841 Resampling CI.CLC..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,847 Resampling CI.CLC..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,853 Resampling CI.CLC..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,860 Resampling CI.DTP..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,866 Resampling CI.DTP..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,873 Resampling CI.DTP..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,879 Resampling CI.JRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,885 Resampling CI.JRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,892 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,898 Resampling CI.LRL..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,904 Resampling CI.LRL..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,910 Resampling CI.LRL..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,916 Resampling CI.MPM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,923 Resampling CI.MPM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,929 Resampling CI.MPM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,935 Resampling CI.SLA..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,942 Resampling CI.SLA..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,948 Resampling CI.SLA..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,954 Resampling CI.SRT..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,961 Resampling CI.SRT..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,967 Resampling CI.SRT..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,974 Resampling CI.TOW2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:52,980 Resampling CI.TOW2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:52,987 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:52,993 Resampling CI.WBM..BHE from 40.0 to 100 Hz
2023-03-15 14:40:53,000 Resampling CI.WBM..BHN from 40.0 to 100 Hz
2023-03-15 14:40:53,006 Resampling CI.WBM..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:53,013 Resampling CI.WCS2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:53,019 Resampling CI.WCS2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:53,026 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:53,032 Resampling CI.WMF..BHE from 40.0 to 100 Hz
2023-03-15 14:40:53,038 Resampling CI.WMF..BHN from 40.0 to 100 Hz
2023-03-15 14:40:53,045 Resampling CI.WMF..BHZ from 40.0 to 100 Hz
2023-03-15 14:40:53,051 Resampling CI.WRC2..BHE from 40.0 to 100 Hz
2023-03-15 14:40:53,057 Resampling CI.WRC2..BHN from 40.0 to 100 Hz
2023-03-15 14:40:53,063 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz
Empty trace: CI.LRL.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.LRL.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.LRL.2C.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.Q0072.01.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.Q0072.01.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.Q0072.01.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.WBM.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.WBM.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.WBM.2C.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.WMF.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.WMF.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.WMF.2C.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.WNM.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.WNM.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.WNM.2C.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.WRV2.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.WRV2.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.WRV2.2C.HNZ 2019-07-04T17:59:59.994500Z
Empty trace: CI.WVP2.2C.HNE 2019-07-04T17:59:59.994500Z
Empty trace: CI.WVP2.2C.HNN 2019-07-04T17:59:59.994500Z
Empty trace: CI.WVP2.2C.HNZ 2019-07-04T17:59:59.994500Z
Pred: 100%|███████████████████████████████████████| 2/2 [01:06<00:00, 33.49s/it]
Done with 8856 P-picks and 9294 S-picks
6. Run GaMMA to associate P/S picks¶
In [38]:
                Copied!
                
                
            def gamma(
    node_i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    pick_csv: InputPath("csv"),
    station_json: InputPath("json"),
    gamma_catalog_csv: OutputPath(str),
    gamma_pick_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> str:
    import json
    import os
    import pickle
    from datetime import datetime, timedelta
    import numpy as np
    import pandas as pd
    from gamma.utils import association, convert_picks_csv, from_seconds
    from pyproj import Proj
    from tqdm import tqdm
    catalog_dir = os.path.join("/tmp/", bucket_name)
    if not os.path.exists(catalog_dir):
        os.makedirs(catalog_dir)
    ## read config
    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[node_i]
    with open(config_json, "r") as fp:
        config = json.load(fp)
    ## read picks
    # picks = pd.read_json(pick_json)
    picks = pd.read_csv(pick_csv, parse_dates=["phase_time"])
    picks["id"] = picks["station_id"]
    picks["timestamp"] = picks["phase_time"]
    picks["amp"] = picks["phase_amp"]
    picks["type"] = picks["phase_type"]
    picks["prob"] = picks["phase_score"]
    ## read stations
    # stations = pd.read_csv(station_csv, delimiter="\t")
    with open(station_json, "r") as fp:
        stations = json.load(fp)
    stations = pd.DataFrame.from_dict(stations, orient="index")
    # stations = stations.rename(columns={"station": "id"})
    stations["id"] = stations.index
    proj = Proj(f"+proj=aeqd +lon_0={config['center'][0]} +lat_0={config['center'][1]} +units=km")
    stations[["x(km)", "y(km)"]] = stations.apply(
        lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1
    )
    stations["z(km)"] = stations["elevation(m)"].apply(lambda x: -x / 1e3)
    ## setting GMMA configs
    config["use_dbscan"] = True
    config["use_amplitude"] = True
    config["method"] = "BGMM"
    if config["method"] == "BGMM":  ## BayesianGaussianMixture
        config["oversample_factor"] = 4
    if config["method"] == "GMM":  ## GaussianMixture
        config["oversample_factor"] = 1
    # Earthquake location
    config["dims"] = ["x(km)", "y(km)", "z(km)"]
    config["vel"] = {"p": 6.0, "s": 6.0 / 1.73}
    config["x(km)"] = (np.array(config["xlim_degree"]) - np.array(config["center"][0])) * config["degree2km"]
    config["y(km)"] = (np.array(config["ylim_degree"]) - np.array(config["center"][1])) * config["degree2km"]
    config["z(km)"] = (0, 60)
    config["bfgs_bounds"] = (
        (config["x(km)"][0] - 1, config["x(km)"][1] + 1),  # x
        (config["y(km)"][0] - 1, config["y(km)"][1] + 1),  # y
        (0, config["z(km)"][1] + 1),  # z
        (None, None),  # t
    )
    # DBSCAN
    config["dbscan_eps"] = 10  # second
    config["dbscan_min_samples"] = 3  ## see DBSCAN
    # Filtering
    config["min_picks_per_eq"] = min(10, len(stations) // 2)
    config["min_p_picks_per_eq"] = 0
    config["min_s_picks_per_eq"] = 0
    config["max_sigma11"] = 2.0  # s
    config["max_sigma22"] = 2.0  # m/s
    config["max_sigma12"] = 1.0  # covariance
    # if use amplitude
    if config["use_amplitude"]:
        picks = picks[picks["amp"] != -1]
    # print(config)
    for k, v in config.items():
        print(f"{k}: {v}")
    ## run GMMA association
    event_idx0 = 1
    assignments = []
    catalogs, assignments = association(picks, stations, config, event_idx0, method=config["method"])
    event_idx0 += len(catalogs)
    ## create catalog
    catalogs = pd.DataFrame(
        catalogs,
        columns=["time"]
        + config["dims"]
        + [
            "magnitude",
            "sigma_time",
            "sigma_amp",
            "cov_time_amp",
            "event_index",
            "gamma_score",
        ],
    )
    catalogs[["longitude", "latitude"]] = catalogs.apply(
        lambda x: pd.Series(proj(longitude=x["x(km)"], latitude=x["y(km)"], inverse=True)),
        axis=1,
    )
    catalogs["depth(m)"] = catalogs["z(km)"].apply(lambda x: x * 1e3)
    catalogs.sort_values(by=["time"], inplace=True)
    with open(gamma_catalog_csv, "w") as fp:
        catalogs.to_csv(
            fp,
            # sep="\t",
            index=False,
            float_format="%.3f",
            date_format="%Y-%m-%dT%H:%M:%S.%f",
            columns=[
                "time",
                "magnitude",
                "longitude",
                "latitude",
                "depth(m)",
                "sigma_time",
                "sigma_amp",
                "cov_time_amp",
                "gamma_score",
                "event_index",
            ],
        )
    # catalogs = catalogs[
    #     ['time', 'magnitude', 'longitude', 'latitude', 'depth(m)', 'sigma_time', 'sigma_amp']
    # ]
    ## add assignment to picks
    assignments = pd.DataFrame(assignments, columns=["pick_index", "event_index", "gamma_score"])
    picks = picks.join(assignments.set_index("pick_index")).fillna(-1).astype({"event_index": int})
    picks.sort_values(by=["timestamp"], inplace=True)
    with open(gamma_pick_csv, "w") as fp:
        picks.to_csv(
            fp,
            # sep="\t",
            index=False,
            date_format="%Y-%m-%dT%H:%M:%S.%f",
            columns=[
                "station_id",
                "phase_time",
                "phase_type",
                "phase_score",
                "phase_amp",
                "gamma_score",
                "event_index",
            ],
        )
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma/catalog_{node_i:03d}.csv",
            gamma_catalog_csv,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma/picks_{node_i:03d}.csv",
            gamma_pick_csv,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    ## upload to mongodb
    try:
        from pymongo import MongoClient
        username = "root"
        password = "quakeflow123"
        client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
        db = client["quakeflow"]
        collection = db["waveform"]
        for i, p in tqdm(picks.iterrows(), desc="Uploading to mongodb"):
            collection.update(
                {"_id": f"{p['station_id']}_{p['timestamp'].isoformat(timespec='milliseconds')}_{p['type']}"},
                {"$set": {"event_index": p["event_index"]}},
            )
    except Exception as err:
        print(f"ERROR: can not access mongodb service! \n{err}")
        pass
    return f"catalog_{node_i:03d}.csv"
def gamma(
    node_i: int,
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    pick_csv: InputPath("csv"),
    station_json: InputPath("json"),
    gamma_catalog_csv: OutputPath(str),
    gamma_pick_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
) -> str:
    import json
    import os
    import pickle
    from datetime import datetime, timedelta
    import numpy as np
    import pandas as pd
    from gamma.utils import association, convert_picks_csv, from_seconds
    from pyproj import Proj
    from tqdm import tqdm
    catalog_dir = os.path.join("/tmp/", bucket_name)
    if not os.path.exists(catalog_dir):
        os.makedirs(catalog_dir)
    ## read config
    with open(index_json, "r") as fp:
        index = json.load(fp)
    idx = index[node_i]
    with open(config_json, "r") as fp:
        config = json.load(fp)
    ## read picks
    # picks = pd.read_json(pick_json)
    picks = pd.read_csv(pick_csv, parse_dates=["phase_time"])
    picks["id"] = picks["station_id"]
    picks["timestamp"] = picks["phase_time"]
    picks["amp"] = picks["phase_amp"]
    picks["type"] = picks["phase_type"]
    picks["prob"] = picks["phase_score"]
    ## read stations
    # stations = pd.read_csv(station_csv, delimiter="\t")
    with open(station_json, "r") as fp:
        stations = json.load(fp)
    stations = pd.DataFrame.from_dict(stations, orient="index")
    # stations = stations.rename(columns={"station": "id"})
    stations["id"] = stations.index
    proj = Proj(f"+proj=aeqd +lon_0={config['center'][0]} +lat_0={config['center'][1]} +units=km")
    stations[["x(km)", "y(km)"]] = stations.apply(
        lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1
    )
    stations["z(km)"] = stations["elevation(m)"].apply(lambda x: -x / 1e3)
    ## setting GMMA configs
    config["use_dbscan"] = True
    config["use_amplitude"] = True
    config["method"] = "BGMM"
    if config["method"] == "BGMM":  ## BayesianGaussianMixture
        config["oversample_factor"] = 4
    if config["method"] == "GMM":  ## GaussianMixture
        config["oversample_factor"] = 1
    # Earthquake location
    config["dims"] = ["x(km)", "y(km)", "z(km)"]
    config["vel"] = {"p": 6.0, "s": 6.0 / 1.73}
    config["x(km)"] = (np.array(config["xlim_degree"]) - np.array(config["center"][0])) * config["degree2km"]
    config["y(km)"] = (np.array(config["ylim_degree"]) - np.array(config["center"][1])) * config["degree2km"]
    config["z(km)"] = (0, 60)
    config["bfgs_bounds"] = (
        (config["x(km)"][0] - 1, config["x(km)"][1] + 1),  # x
        (config["y(km)"][0] - 1, config["y(km)"][1] + 1),  # y
        (0, config["z(km)"][1] + 1),  # z
        (None, None),  # t
    )
    # DBSCAN
    config["dbscan_eps"] = 10  # second
    config["dbscan_min_samples"] = 3  ## see DBSCAN
    # Filtering
    config["min_picks_per_eq"] = min(10, len(stations) // 2)
    config["min_p_picks_per_eq"] = 0
    config["min_s_picks_per_eq"] = 0
    config["max_sigma11"] = 2.0  # s
    config["max_sigma22"] = 2.0  # m/s
    config["max_sigma12"] = 1.0  # covariance
    # if use amplitude
    if config["use_amplitude"]:
        picks = picks[picks["amp"] != -1]
    # print(config)
    for k, v in config.items():
        print(f"{k}: {v}")
    ## run GMMA association
    event_idx0 = 1
    assignments = []
    catalogs, assignments = association(picks, stations, config, event_idx0, method=config["method"])
    event_idx0 += len(catalogs)
    ## create catalog
    catalogs = pd.DataFrame(
        catalogs,
        columns=["time"]
        + config["dims"]
        + [
            "magnitude",
            "sigma_time",
            "sigma_amp",
            "cov_time_amp",
            "event_index",
            "gamma_score",
        ],
    )
    catalogs[["longitude", "latitude"]] = catalogs.apply(
        lambda x: pd.Series(proj(longitude=x["x(km)"], latitude=x["y(km)"], inverse=True)),
        axis=1,
    )
    catalogs["depth(m)"] = catalogs["z(km)"].apply(lambda x: x * 1e3)
    catalogs.sort_values(by=["time"], inplace=True)
    with open(gamma_catalog_csv, "w") as fp:
        catalogs.to_csv(
            fp,
            # sep="\t",
            index=False,
            float_format="%.3f",
            date_format="%Y-%m-%dT%H:%M:%S.%f",
            columns=[
                "time",
                "magnitude",
                "longitude",
                "latitude",
                "depth(m)",
                "sigma_time",
                "sigma_amp",
                "cov_time_amp",
                "gamma_score",
                "event_index",
            ],
        )
    # catalogs = catalogs[
    #     ['time', 'magnitude', 'longitude', 'latitude', 'depth(m)', 'sigma_time', 'sigma_amp']
    # ]
    ## add assignment to picks
    assignments = pd.DataFrame(assignments, columns=["pick_index", "event_index", "gamma_score"])
    picks = picks.join(assignments.set_index("pick_index")).fillna(-1).astype({"event_index": int})
    picks.sort_values(by=["timestamp"], inplace=True)
    with open(gamma_pick_csv, "w") as fp:
        picks.to_csv(
            fp,
            # sep="\t",
            index=False,
            date_format="%Y-%m-%dT%H:%M:%S.%f",
            columns=[
                "station_id",
                "phase_time",
                "phase_type",
                "phase_score",
                "phase_amp",
                "gamma_score",
                "event_index",
            ],
        )
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma/catalog_{node_i:03d}.csv",
            gamma_catalog_csv,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma/picks_{node_i:03d}.csv",
            gamma_pick_csv,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    ## upload to mongodb
    try:
        from pymongo import MongoClient
        username = "root"
        password = "quakeflow123"
        client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
        db = client["quakeflow"]
        collection = db["waveform"]
        for i, p in tqdm(picks.iterrows(), desc="Uploading to mongodb"):
            collection.update(
                {"_id": f"{p['station_id']}_{p['timestamp'].isoformat(timespec='milliseconds')}_{p['type']}"},
                {"$set": {"event_index": p["event_index"]}},
            )
    except Exception as err:
        print(f"ERROR: can not access mongodb service! \n{err}")
        pass
    return f"catalog_{node_i:03d}.csv"
    
        In [39]:
                Copied!
                
                
            if run_local:
    catalog = gamma(
        0,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("phasenet/picks.csv"),
        root_dir("stations.json"),
        root_dir("gamma_catalog.csv"),
        root_dir("gamma_picks.csv"),
        bucket_name="catalogs",
        s3_url="localhost:9000",
        secure=False,
    )
if run_local:
    catalog = gamma(
        0,
        root_dir("index.json"),
        root_dir("config.json"),
        root_dir("phasenet/picks.csv"),
        root_dir("stations.json"),
        root_dir("gamma_catalog.csv"),
        root_dir("gamma_picks.csv"),
        bucket_name="catalogs",
        s3_url="localhost:9000",
        secure=False,
    )
    
        region: Demo
center: [-117.504, 35.705]
xlim_degree: [-118.004, -117.004]
ylim_degree: [35.205, 36.205]
min_longitude: -118.004
max_longitude: -117.004
min_latitude: 35.205
max_latitude: 36.205
degree2km: 111.19492664455873
starttime: 2019-07-04T17:00:00.000
endtime: 2019-07-04T19:00:00.000
networks: ['CI']
channels: HH*,BH*,EH*,HN*
client: SCEDC
phasenet: {}
gamma: {}
hypodd: {'MAXEVENT': 10000.0}
use_dbscan: True
use_amplitude: True
method: BGMM
oversample_factor: 4
dims: ['x(km)', 'y(km)', 'z(km)']
vel: {'p': 6.0, 's': 3.468208092485549}
x(km): [-55.59746332  55.59746332]
y(km): [-55.59746332  55.59746332]
z(km): (0, 60)
bfgs_bounds: ((-56.597463322279367, 56.597463322279367), (-56.597463322279367, 56.597463322279367), (0, 61), (None, None))
dbscan_eps: 10
dbscan_min_samples: 3
min_picks_per_eq: 10
min_p_picks_per_eq: 0
min_s_picks_per_eq: 0
max_sigma11: 2.0
max_sigma22: 2.0
max_sigma12: 1.0
Associating 81 clusters with 79 CPUs
.................................................................................
Finish 99 events
Finish 199 events
Finish 299 events
ERROR: can not access minio service! 
No module named 'minio'
Uploading to mongodb: 0it [00:30, ?it/s]
ERROR: can not access mongodb service! 
127.0.0.1:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 64123c7172f5f960d0d7c589, topology_type: Single, servers: [<ServerDescription ('127.0.0.1', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('127.0.0.1:27017: [Errno 111] Connection refused')>]>
In [40]:
                Copied!
                
                
            gamma_op = comp.func_to_container_op(
    gamma,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "pandas",
        "numpy",
        "scikit-learn",
        "tqdm",
        "minio",
        "gmma",
        "pyproj",
    ],
)
gamma_op = comp.func_to_container_op(
    gamma,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=[
        "pandas",
        "numpy",
        "scikit-learn",
        "tqdm",
        "minio",
        "gmma",
        "pyproj",
    ],
)
    
        Plot catalogs¶
In [41]:
                Copied!
                
                
            if run_local:
    %run plot_catalog.ipynb
if run_local:
    %run plot_catalog.ipynb
    
        Merge parallel processing on cloud¶
In [42]:
                Copied!
                
                
            def merge_catalog(
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    catalog_csv: OutputPath(str),
    picks_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from glob import glob
    import pandas as pd
    from minio import Minio
    minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    with open(index_json, "r") as fp:
        index = json.load(fp)
        
    with open(config_json, "r") as fp:
        config = json.load(fp)
    objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/gamma", recursive=True)
    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        print(obj._object_name)
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
    # files_catalog = sorted(glob(tmp_path("catalog_*.csv")))
    # files_picks = sorted(glob(tmp_path("picks_*.csv")))
    files_catalog = [tmp_path(f"catalog_{node_i:03d}.csv") for node_i in range(len(index))]
    files_picks = [tmp_path(f"picks_{node_i:03d}.csv") for node_i in range(len(index))]
    print(f"Merge catalog: {files_catalog}")
    print(f"Merge picks: {files_picks}")
    if len(files_catalog) > 0:
        catalog_list = []
        for f in files_catalog:
            tmp = pd.read_csv(f, dtype=str)
            tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
            catalog_list.append(tmp)
        merged_catalog = pd.concat(catalog_list).sort_values(by="time")
        merged_catalog["match_id"] = merged_catalog.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
        merged_catalog.sort_values(by="time", inplace=True, ignore_index=True)
        merged_catalog.drop(columns=["event_index", "file_index"], inplace=True)
        merged_catalog["event_index"] = merged_catalog.index.values + 1
        mapping = dict(zip(merged_catalog["match_id"], merged_catalog["event_index"]))
        merged_catalog.drop(columns=["match_id"], inplace=True)
        merged_catalog.to_csv(catalog_csv, index=False)
        del merged_catalog
        pick_list = []
        for f in files_picks:
            tmp = pd.read_csv(f, dtype=str)
            tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
            pick_list.append(tmp)
        merged_picks = pd.concat(pick_list).sort_values(by="phase_time")
        merged_picks["match_id"] = merged_picks.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
        merged_picks.drop(columns=["event_index", "file_index"], inplace=True)
        merged_picks["event_index"] = merged_picks["match_id"].apply(lambda x: mapping[x] if x in mapping else -1)
        merged_picks.drop(columns=["match_id"], inplace=True)
        merged_picks.to_csv(picks_csv, index=False)
        del merged_picks
        
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_catalog.csv",
            catalog_csv,
        )
        
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_picks.csv",
            picks_csv,
        )
    # else:
    #     with open(catalog_csv, "w") as fout:
    #         pass
    #     print("No catalog.csv found!")
    #     with open(picks_csv, "w") as fout:
    #         pass
    #     print("No picks.csv found!")
 def merge_catalog(
    index_json: InputPath("json"),
    config_json: InputPath("json"),
    catalog_csv: OutputPath(str),
    picks_csv: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from glob import glob
    import pandas as pd
    from minio import Minio
    minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    with open(index_json, "r") as fp:
        index = json.load(fp)
        
    with open(config_json, "r") as fp:
        config = json.load(fp)
    objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/gamma", recursive=True)
    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        print(obj._object_name)
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
    # files_catalog = sorted(glob(tmp_path("catalog_*.csv")))
    # files_picks = sorted(glob(tmp_path("picks_*.csv")))
    files_catalog = [tmp_path(f"catalog_{node_i:03d}.csv") for node_i in range(len(index))]
    files_picks = [tmp_path(f"picks_{node_i:03d}.csv") for node_i in range(len(index))]
    print(f"Merge catalog: {files_catalog}")
    print(f"Merge picks: {files_picks}")
    if len(files_catalog) > 0:
        catalog_list = []
        for f in files_catalog:
            tmp = pd.read_csv(f, dtype=str)
            tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
            catalog_list.append(tmp)
        merged_catalog = pd.concat(catalog_list).sort_values(by="time")
        merged_catalog["match_id"] = merged_catalog.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
        merged_catalog.sort_values(by="time", inplace=True, ignore_index=True)
        merged_catalog.drop(columns=["event_index", "file_index"], inplace=True)
        merged_catalog["event_index"] = merged_catalog.index.values + 1
        mapping = dict(zip(merged_catalog["match_id"], merged_catalog["event_index"]))
        merged_catalog.drop(columns=["match_id"], inplace=True)
        merged_catalog.to_csv(catalog_csv, index=False)
        del merged_catalog
        pick_list = []
        for f in files_picks:
            tmp = pd.read_csv(f, dtype=str)
            tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
            pick_list.append(tmp)
        merged_picks = pd.concat(pick_list).sort_values(by="phase_time")
        merged_picks["match_id"] = merged_picks.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
        merged_picks.drop(columns=["event_index", "file_index"], inplace=True)
        merged_picks["event_index"] = merged_picks["match_id"].apply(lambda x: mapping[x] if x in mapping else -1)
        merged_picks.drop(columns=["match_id"], inplace=True)
        merged_picks.to_csv(picks_csv, index=False)
        del merged_picks
        
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_catalog.csv",
            catalog_csv,
        )
        
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_picks.csv",
            picks_csv,
        )
    # else:
    #     with open(catalog_csv, "w") as fout:
    #         pass
    #     print("No catalog.csv found!")
    #     with open(picks_csv, "w") as fout:
    #         pass
    #     print("No picks.csv found!")
    
        In [43]:
                Copied!
                
                
            merge_op = comp.func_to_container_op(
    merge_catalog,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=["pandas", "minio"],
)
merge_op = comp.func_to_container_op(
    merge_catalog,
    # base_image='zhuwq0/quakeflow-env:latest',
    base_image="python:3.8",
    packages_to_install=["pandas", "minio"],
)
    
        7. HypoDD earthquake relocation¶
Download and compile HypoDD¶
In [44]:
                Copied!
                
                
            if run_local:
    if not os.path.exists("HYPODD"):
        os.system("wget -O HYPODD_1.3.tar.gz http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz")
        os.system("tar -xf HYPODD_1.3.tar.gz")
        os.system("ln -s $(which gfortran) ./HYPODD/f77")
        os.system("ln -s $(which gfortran) ./HYPODD/g77")
        os.environ["PATH"] += os.pathsep + os.getcwd() + "/HYPODD/"
        os.system("make -C HYPODD/src")
    if not os.path.exists(root_dir("hypodd")):
        os.mkdir(root_dir("hypodd"))
    os.system(f"cp HYPODD/src/ph2dt/ph2dt {root_dir('hypodd')}/")
    os.system(f"cp HYPODD/src/hypoDD/hypoDD {root_dir('hypodd')}/")
if run_local:
    if not os.path.exists("HYPODD"):
        os.system("wget -O HYPODD_1.3.tar.gz http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz")
        os.system("tar -xf HYPODD_1.3.tar.gz")
        os.system("ln -s $(which gfortran) ./HYPODD/f77")
        os.system("ln -s $(which gfortran) ./HYPODD/g77")
        os.environ["PATH"] += os.pathsep + os.getcwd() + "/HYPODD/"
        os.system("make -C HYPODD/src")
    if not os.path.exists(root_dir("hypodd")):
        os.mkdir(root_dir("hypodd"))
    os.system(f"cp HYPODD/src/ph2dt/ph2dt {root_dir('hypodd')}/")
    os.system(f"cp HYPODD/src/hypoDD/hypoDD {root_dir('hypodd')}/")
    
        --2023-03-15 14:46:00--  http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz
Resolving www.ldeo.columbia.edu (www.ldeo.columbia.edu)... 129.236.14.15
Connecting to www.ldeo.columbia.edu (www.ldeo.columbia.edu)|129.236.14.15|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz [following]
--2023-03-15 14:46:00--  https://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz
Connecting to www.ldeo.columbia.edu (www.ldeo.columbia.edu)|129.236.14.15|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1393039 (1.3M) [application/x-gzip]
Saving to: ‘HYPODD_1.3.tar.gz’
     0K .......... .......... .......... .......... ..........  3%  384K 3s
    50K .......... .......... .......... .......... ..........  7%  774K 2s
   100K .......... .......... .......... .......... .......... 11% 47.0M 2s
   150K .......... .......... .......... .......... .......... 14% 65.5M 1s
   200K .......... .......... .......... .......... .......... 18%  783K 1s
   250K .......... .......... .......... .......... .......... 22% 68.0M 1s
   300K .......... .......... .......... .......... .......... 25% 41.3M 1s
   350K .......... .......... .......... .......... .......... 29%  213M 1s
   400K .......... .......... .......... .......... .......... 33%  799K 1s
   450K .......... .......... .......... .......... .......... 36% 56.2M 1s
   500K .......... .......... .......... .......... .......... 40% 64.0M 0s
   550K .......... .......... .......... .......... .......... 44%  110M 0s
   600K .......... .......... .......... .......... .......... 47% 86.9M 0s
   650K .......... .......... .......... .......... .......... 51% 80.3M 0s
   700K .......... .......... .......... .......... .......... 55%  102M 0s
   750K .......... .......... .......... .......... .......... 58%  161M 0s
   800K .......... .......... .......... .......... .......... 62%  186M 0s
   850K .......... .......... .......... .......... .......... 66%  823K 0s
   900K .......... .......... .......... .......... .......... 69% 96.5M 0s
   950K .......... .......... .......... .......... .......... 73% 62.2M 0s
  1000K .......... .......... .......... .......... .......... 77% 43.8M 0s
  1050K .......... .......... .......... .......... .......... 80% 50.9M 0s
  1100K .......... .......... .......... .......... .......... 84% 76.9M 0s
  1150K .......... .......... .......... .......... .......... 88%  209M 0s
  1200K .......... .......... .......... .......... .......... 91%  178M 0s
  1250K .......... .......... .......... .......... .......... 95%  210M 0s
  1300K .......... .......... .......... .......... .......... 99%  202M 0s
  1350K ..........                                            100%  135M=0.4s
2023-03-15 14:46:01 (3.36 MB/s) - ‘HYPODD_1.3.tar.gz’ saved [1393039/1393039]
make: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src' for d in hypoDD ncsn2pha ph2dt hista2ddsta; do (cd $d; echo $d; make all); done hypoDD make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hypoDD' g77 -I../../include -c hypoDD.f -o hypoDD.o g77 -I../../include -c aprod.f -o aprod.o g77 -I../../include -c cluster1.f -o cluster1.o
hypoDD.f:654:29:
  654 |      & f7.1,f7.1,f7.1,f7.1)'),xav,yav,zav,tav
      |                             1
Warning: Legacy Extension: Comma before i/o item list at (1)
g77 -I../../include -c covar.f -o covar.o g77 -I../../include -c datum.f -o datum.o g77 -I../../include -c delaz.f -o delaz.o g77 -I../../include -c delaz2.f -o delaz2.o g77 -I../../include -c direct1.f -o direct1.o g77 -I../../include -c dist.f -o dist.o g77 -I../../include -c dtres.f -o dtres.o g77 -I../../include -c exist.f -o exist.o g77 -I../../include -c freeunit.f -o freeunit.o g77 -I../../include -c getdata.f -o getdata.o g77 -I../../include -c getinp.f -o getinp.o g77 -I../../include -c ifindi.f -o ifindi.o g77 -I../../include -c indexxi.f -o indexxi.o g77 -I../../include -c juliam.f -o juliam.o g77 -I../../include -c lsfit_lsqr.f -o lsfit_lsqr.o g77 -I../../include -c lsfit_svd.f -o lsfit_svd.o g77 -I../../include -c lsqr.f -o lsqr.o g77 -I../../include -c matmult1.f -o matmult1.o g77 -I../../include -c matmult2.f -o matmult2.o g77 -I../../include -c matmult3.f -o matmult3.o g77 -I../../include -c mdian1.f -o mdian1.o g77 -I../../include -c normlz.f -o normlz.o g77 -I../../include -c partials.f -o partials.o g77 -I../../include -c ran.f -o ran.o g77 -I../../include -c redist.f -o redist.o g77 -I../../include -c refract.f -o refract.o g77 -I../../include -c resstat.f -o resstat.o g77 -I../../include -c scopy.f -o scopy.o
refract.f:84:16:
   84 |       do 23151m=j1,nl
      |                1
Warning: Deleted feature: Start expression in DO loop at (1) must be integer
refract.f:126:36:
  126 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                                    1
Warning: Legacy Extension: REAL array index at (1)
refract.f:126:24:
  126 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                        1
Warning: Legacy Extension: REAL array index at (1)
refract.f:126:18:
  126 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                  1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:53:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                                                     1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:47:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                                               1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:40:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                                        1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:34:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                                  1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:27:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                           1
Warning: Legacy Extension: REAL array index at (1)
refract.f:155:19:
  155 |       xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx))
      |                   1
Warning: Legacy Extension: REAL array index at (1)
refract.f:161:36:
  161 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                                    1
Warning: Legacy Extension: REAL array index at (1)
refract.f:161:24:
  161 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                        1
Warning: Legacy Extension: REAL array index at (1)
refract.f:161:18:
  161 |       xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1))
      |                  1
Warning: Legacy Extension: REAL array index at (1)
scopy.f:25:72:
   25 |       if(incx.eq.incy) if(incx-1) 5,20,60
      |                                                                        1
Warning: Fortran 2018 deleted feature: Arithmetic IF statement at (1)
g77 -I../../include -c sdc2.f -o sdc2.o g77 -I../../include -c setorg.f -o setorg.o g77 -I../../include -c skip.f -o skip.o g77 -I../../include -c snrm2.f -o snrm2.o
snrm2.f:67:72:
   67 |    10 assign 30 to next
      |                                                                        1
Warning: Deleted feature: ASSIGN statement at (1)
snrm2.f:72:19:
   72 |    20    go to next,(30, 50, 70, 110)
      |                   1
Warning: Deleted feature: Assigned GOTO statement at (1)
snrm2.f:74:72:
   74 |       assign 50 to next
      |                                                                        1
Warning: Deleted feature: ASSIGN statement at (1)
snrm2.f:83:72:
   83 |       assign 70 to next
      |                                                                        1
Warning: Deleted feature: ASSIGN statement at (1)
snrm2.f:89:72:
   89 |       assign 110 to next
      |                                                                        1
Warning: Deleted feature: ASSIGN statement at (1)
snrm2.f:125:72:
  125 |    95    sum = sum + sx(j)**2
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 95 at (1)
g77 -I../../include -c sort.f -o sort.o g77 -I../../include -c sorti.f -o sorti.o g77 -I../../include -c sscal.f -o sscal.o g77 -I../../include -c svd.f -o svd.o g77 -I../../include -c tiddid.f -o tiddid.o g77 -I../../include -c trialsrc.f -o trialsrc.o g77 -I../../include -c trimlen.f -o trimlen.o g77 -I../../include -c ttime.f -o ttime.o
svd.f:49:72:
   49 |       DO 1100 J=1,N
      |                                                                        1
Warning: Fortran 2018 deleted feature: Shared DO termination label 1100 at (1)
svd.f:50:72:
   50 |  1100 U(I,J)=A(I,J)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1100 at (1)
svd.f:59:72:
   59 |  2100 S=U(J,I)**2 + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2100 at (1)
svd.f:69:72:
   69 |  2200 S=U(K,I)*U(K,J) + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2200 at (1)
svd.f:72:72:
   72 |  2300 U(K,J)=U(K,J) + F*U(K,I)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2300 at (1)
svd.f:82:72:
   82 |  2600 S=U(I,J)**2 + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2600 at (1)
svd.f:90:72:
   90 |  2650 E(J)=U(I,J)/H
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2650 at (1)
svd.f:97:72:
   97 |  2670 S=U(J,K)*U(I,K) + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2670 at (1)
svd.f:99:72:
   99 |  2690 U(J,K)=U(J,K) + S*E(K)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2690 at (1)
svd.f:117:72:
  117 |  3100 V(J,I)=U(I,J)/H
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3100 at (1)
svd.f:121:72:
  121 |  3200 S=U(I,K)*V(K,J) + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3200 at (1)
svd.f:123:72:
  123 |  3300 V(K,J)=V(K,J) + S*V(K,I)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3300 at (1)
svd.f:129:72:
  129 |  3600 V(I,J)=0.0
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3600 at (1)
svd.f:145:72:
  145 |  4100 U(I,J)=0.0
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4100 at (1)
svd.f:152:72:
  152 |  4200 S=U(K,I)*U(K,J) + S
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4200 at (1)
svd.f:155:72:
  155 |  4300 U(K,J)=U(K,J) + F*U(K,I)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4300 at (1)
svd.f:159:72:
  159 |  4550 U(J,I)=U(J,I)/G
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4550 at (1)
svd.f:163:72:
  163 |  4600 U(J,I)=0.0
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4600 at (1)
svd.f:164:72:
  164 |  4700 U(I,I)=U(I,I) + 1.0
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4700 at (1)
svd.f:258:72:
  258 |  8200 V(J,K)=-V(J,K)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 8200 at (1)
trimlen.f:11:72:
   11 |     1    if(t(trimlen:trimlen).ne.' ')RETURN
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1)
vmodel.f:34:72:
   34 |    10 vsq(i)=v(i)*v(i)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 10 at (1)
vmodel.f:53:72:
   53 |    30 thk(i)=top(i+1)-top(i)
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 30 at (1)
g77 -I../../include -c vmodel.f -o vmodel.o g77 -I../../include -c weighting.f -o weighting.o gcc -O -I../../include -c -o atoangle_.o atoangle_.c
atoangle_.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif lint
        ^~~~
In file included from atoangle_.c:6:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
gcc -O -I../../include -c -o atoangle.o atoangle.c gcc -O -I../../include -c -o datetime_.o datetime_.c gcc -O -I../../include -c -o hypot_.o hypot_.c gcc -O -I../../include -c -o rpad_.o rpad_.c gcc -O -I../../include -c -o sscanf3_.o sscanf3_.c g77 hypoDD.o aprod.o cluster1.o covar.o datum.o delaz.o delaz2.o direct1.o dist.o dtres.o exist.o freeunit.o getdata.o getinp.o ifindi.o indexxi.o juliam.o lsfit_lsqr.o lsfit_svd.o lsqr.o matmult1.o matmult2.o matmult3.o mdian1.o normlz.o partials.o ran.o redist.o refract.o resstat.o scopy.o sdc2.o setorg.o skip.o snrm2.o sort.o sorti.o sscal.o svd.o tiddid.o trialsrc.o trimlen.o ttime.o vmodel.o weighting.o atoangle_.o atoangle.o datetime_.o hypot_.o rpad_.o sscanf3_.o -o hypoDD make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hypoDD' ncsn2pha make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ncsn2pha' g77 -O -c ncsn2pha.f -o ncsn2pha.o
atoangle.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif lint
        ^~~~
atoangle.c:9:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif NULL
        ^~~~
In file included from datetime_.c:7:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
In file included from rpad_.c:6:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
In file included from sscanf3_.c:7:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
ncsn2pha.f:279:72:
  279 |     1    if(t(trimlen:trimlen).ne.' ')RETURN
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1)
ncsn2pha.f:62:0:
   62 |       fn0= str30 (1:trimlen(str30))
      | 
Warning: ‘__builtin_memcpy’ reading 80 bytes from a region of size 30 [-Wstringop-overflow=]
g77 ncsn2pha.o -o ncsn2pha make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ncsn2pha' ph2dt make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ph2dt' g77 -O -I../../include -c ph2dt.f -o ph2dt.o g77 -O -I../../include -c cluster.f -o cluster.o g77 -O -I../../include -c datetime.f -o datetime.o g77 -O -I../../include -c delaz.f -o delaz.o g77 -O -I../../include -c ifindi.f -o ifindi.o g77 -O -I../../include -c indexx.f -o indexx.o g77 -O -I../../include -c indexxi.f -o indexxi.o g77 -O -I../../include -c sorti.f -o sorti.o g77 -O -I../../include -c trimlen.f -o trimlen.o gcc -I../../include -O -c -o atoangle_.o atoangle_.c gcc -I../../include -O -c -o atoangle.o atoangle.c gcc -I../../include -O -c -o rpad_.o rpad_.c gcc -I../../include -O -c -o sscanf3_.o sscanf3_.c g77 ph2dt.o cluster.o datetime.o delaz.o ifindi.o indexx.o indexxi.o sorti.o trimlen.o atoangle_.o atoangle.o rpad_.o sscanf3_.o -o ph2dt
trimlen.f:11:72:
   11 |     1    if(t(trimlen:trimlen).ne.' ')RETURN
      |                                                                        1
Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1)
atoangle_.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif lint
        ^~~~
In file included from atoangle_.c:6:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
atoangle.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif lint
        ^~~~
atoangle.c:9:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif NULL
        ^~~~
In file included from rpad_.c:6:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
In file included from sscanf3_.c:7:0:
../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels]
 #endif F77TYPES_H
        ^~~~~~~~~~
make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ph2dt' hista2ddsta make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hista2ddsta' f77 -O -I../../include -c hista2ddsta.f -o hista2ddsta.o f77 hista2ddsta.o -o hista2ddsta make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hista2ddsta' make: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src'
7.1. Convert station format¶
In [45]:
                Copied!
                
                
            def convert_station(
    config_json: InputPath("json"),
    station_json: InputPath("json"),
    hypoinverse_station: OutputPath(str),
    hypodd_station: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import os
    import json
    import pandas as pd
    from tqdm import tqdm
    
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    hypoinv_path = os.path.join(data_path, "hypoinv")
    if not os.path.exists(hypoinv_path):
        os.mkdir(hypoinv_path)
    stations = pd.read_json(station_json, orient="index")
    converted_hypoinverse = []
    converted_hypodd = {}
    for sta, row in tqdm(stations.iterrows()):
        network_code, station_code, comp_code, channel_code = sta.split(".")
        station_weight = " "
        lat_degree = int(row["latitude"])
        lat_minute = (row["latitude"] - lat_degree) * 60
        north = "N" if lat_degree >= 0 else "S"
        lng_degree = int(row["longitude"])
        lng_minute = (row["longitude"] - lng_degree) * 60
        west = "W" if lng_degree <= 0 else "E"
        elevation = row["elevation(m)"]
        line_hypoinverse = f"{station_code:<5} {network_code:<2} {comp_code[:-1]:<1}{channel_code:<3} {station_weight}{abs(lat_degree):2.0f} {abs(lat_minute):7.4f}{north}{abs(lng_degree):3.0f} {abs(lng_minute):7.4f}{west}{elevation:4.0f}\n"
        converted_hypoinverse.append(line_hypoinverse)
        tmp_code = f"{station_code}{channel_code}"
        converted_hypodd[
            f"{station_code}{channel_code}"
        ] = f"{tmp_code:<8s} {row['latitude']:.3f} {row['longitude']:.3f}\n"
    with open(hypoinverse_station, "w") as f:
        f.writelines(converted_hypoinverse)
    with open(hypodd_station, "w") as f:
        for k, v in converted_hypodd.items():
            f.write(v)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/stations.dat",
            hypodd_station,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
def convert_station(
    config_json: InputPath("json"),
    station_json: InputPath("json"),
    hypoinverse_station: OutputPath(str),
    hypodd_station: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import os
    import json
    import pandas as pd
    from tqdm import tqdm
    
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    hypoinv_path = os.path.join(data_path, "hypoinv")
    if not os.path.exists(hypoinv_path):
        os.mkdir(hypoinv_path)
    stations = pd.read_json(station_json, orient="index")
    converted_hypoinverse = []
    converted_hypodd = {}
    for sta, row in tqdm(stations.iterrows()):
        network_code, station_code, comp_code, channel_code = sta.split(".")
        station_weight = " "
        lat_degree = int(row["latitude"])
        lat_minute = (row["latitude"] - lat_degree) * 60
        north = "N" if lat_degree >= 0 else "S"
        lng_degree = int(row["longitude"])
        lng_minute = (row["longitude"] - lng_degree) * 60
        west = "W" if lng_degree <= 0 else "E"
        elevation = row["elevation(m)"]
        line_hypoinverse = f"{station_code:<5} {network_code:<2} {comp_code[:-1]:<1}{channel_code:<3} {station_weight}{abs(lat_degree):2.0f} {abs(lat_minute):7.4f}{north}{abs(lng_degree):3.0f} {abs(lng_minute):7.4f}{west}{elevation:4.0f}\n"
        converted_hypoinverse.append(line_hypoinverse)
        tmp_code = f"{station_code}{channel_code}"
        converted_hypodd[
            f"{station_code}{channel_code}"
        ] = f"{tmp_code:<8s} {row['latitude']:.3f} {row['longitude']:.3f}\n"
    with open(hypoinverse_station, "w") as f:
        f.writelines(converted_hypoinverse)
    with open(hypodd_station, "w") as f:
        for k, v in converted_hypodd.items():
            f.write(v)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/stations.dat",
            hypodd_station,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    
    
        In [46]:
                Copied!
                
                
            if run_local:
    convert_station(
        root_dir("config.json"),
        root_dir("stations.json"),
        root_dir("hypoinv/stations.dat"),
        root_dir("hypodd/stations.dat"),
        root_dir(""),
    )
if run_local:
    convert_station(
        root_dir("config.json"),
        root_dir("stations.json"),
        root_dir("hypoinv/stations.dat"),
        root_dir("hypodd/stations.dat"),
        root_dir(""),
    )
    
        52it [00:00, 11423.83it/s]
ERROR: can not access minio service! No module named 'minio'
In [47]:
                Copied!
                
                
            convert_station_op = comp.func_to_container_op(
    convert_station,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
convert_station_op = comp.func_to_container_op(
    convert_station,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
    
        Split large catalog due to memory and time¶
In [48]:
                Copied!
                
                
            def split_hypodd(
    config_json: InputPath("json"),
    catalog_csv: InputPath(str),
) -> list:
    import json
    import pandas as pd
    with open(config_json, "r") as fp:
        config = json.load(fp)
    events = pd.read_csv(catalog_csv)
    if "MAXEVENT" in config["hypodd"]:
        MAXEVENT = config["hypodd"]["MAXEVENT"]
    else:
        MAXEVENT = 1e4  ## segment by time
    MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
    num_parallel = int((len(events) - 1) // MAXEVENT + 1)
    return list(range(num_parallel))
def split_hypodd(
    config_json: InputPath("json"),
    catalog_csv: InputPath(str),
) -> list:
    import json
    import pandas as pd
    with open(config_json, "r") as fp:
        config = json.load(fp)
    events = pd.read_csv(catalog_csv)
    if "MAXEVENT" in config["hypodd"]:
        MAXEVENT = config["hypodd"]["MAXEVENT"]
    else:
        MAXEVENT = 1e4  ## segment by time
    MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
    num_parallel = int((len(events) - 1) // MAXEVENT + 1)
    return list(range(num_parallel))
    
        In [49]:
                Copied!
                
                
            if run_local:
    nodes = split_hypodd(
        root_dir("config.json"),
        root_dir("gamma_catalog.csv"),
    )
if run_local:
    nodes = split_hypodd(
        root_dir("config.json"),
        root_dir("gamma_catalog.csv"),
    )
    
        In [50]:
                Copied!
                
                
            split_hypodd_op = comp.func_to_container_op(
    split_hypodd,
    base_image="python:3.8",
    packages_to_install=[
        "pandas",
    ],
)
split_hypodd_op = comp.func_to_container_op(
    split_hypodd,
    base_image="python:3.8",
    packages_to_install=[
        "pandas",
    ],
)
    
        7.2. Convert phase format¶
In [51]:
                Copied!
                
                
            def convert_phase(
    node_i: int,
    config_json: InputPath("json"),
    picks_csv: InputPath(str),
    catalog_csv: InputPath(str),
    hypodd_phase: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    import pandas as pd
    from tqdm import tqdm
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    picks = pd.read_csv(picks_csv)
    events = pd.read_csv(catalog_csv)
    if "MAXEVENT" in config["hypodd"]:
        MAXEVENT = config["hypodd"]["MAXEVENT"]
    else:
        MAXEVENT = 1e4  ## segment by time
    MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
    num_parallel = int((len(events) - 1) // MAXEVENT + 1)
    events.sort_values("time", inplace=True)
    events = events.iloc[node_i::num_parallel]
    picks = picks.loc[picks["event_index"].isin(events["event_index"])]
    # output_lines = []
    output_file = open(hypodd_phase, "w")
    picks_by_event = picks.groupby("event_index").groups
    # for i in tqdm(range(node_i, len(events), num_parallel)):
    #     event = events.iloc[i]
    for i, event in events.iterrows():
        event_time = datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%S.%f")
        lat = event["latitude"]
        lng = event["longitude"]
        dep = event["depth(m)"] / 1e3
        mag = event["magnitude"]
        EH = 0
        EZ = 0
        RMS = event["sigma_time"]
        year, month, day, hour, min, sec = (
            event_time.year,
            event_time.month,
            event_time.day,
            event_time.hour,
            event_time.minute,
            float(event_time.strftime("%S.%f")),
        )
        event_line = f"# {year:4d} {month:2d} {day:2d} {hour:2d} {min:2d} {sec:5.2f}  {lat:7.4f} {lng:9.4f}   {dep:5.2f} {mag:5.2f} {EH:5.2f} {EZ:5.2f} {RMS:5.2f} {event['event_index']:9d}\n"
        # output_lines.append(event_line)
        output_file.write(event_line)
        picks_idx = picks_by_event[event["event_index"]]
        for j in picks_idx:
            # pick = picks.iloc[j]
            pick = picks.loc[j]
            network_code, station_code, comp_code, channel_code = pick["station_id"].split(".")
            phase_type = pick["phase_type"].upper()
            phase_score = pick["phase_score"]
            pick_time = (datetime.strptime(pick["phase_time"], "%Y-%m-%dT%H:%M:%S.%f") - event_time).total_seconds()
            tmp_code = f"{station_code}{channel_code}"
            pick_line = f"{tmp_code:<7s}   {pick_time:6.3f}   {phase_score:5.4f}   {phase_type}\n"
            # output_lines.append(pick_line)
            output_file.write(pick_line)
    # with open(hypodd_phase, "w") as fp:
    #     fp.writelines(output_lines)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/phase_{node_i:03d}.pha",
            hypodd_phase,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    return hypodd_phase
def convert_phase(
    node_i: int,
    config_json: InputPath("json"),
    picks_csv: InputPath(str),
    catalog_csv: InputPath(str),
    hypodd_phase: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    import pandas as pd
    from tqdm import tqdm
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    picks = pd.read_csv(picks_csv)
    events = pd.read_csv(catalog_csv)
    if "MAXEVENT" in config["hypodd"]:
        MAXEVENT = config["hypodd"]["MAXEVENT"]
    else:
        MAXEVENT = 1e4  ## segment by time
    MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
    num_parallel = int((len(events) - 1) // MAXEVENT + 1)
    events.sort_values("time", inplace=True)
    events = events.iloc[node_i::num_parallel]
    picks = picks.loc[picks["event_index"].isin(events["event_index"])]
    # output_lines = []
    output_file = open(hypodd_phase, "w")
    picks_by_event = picks.groupby("event_index").groups
    # for i in tqdm(range(node_i, len(events), num_parallel)):
    #     event = events.iloc[i]
    for i, event in events.iterrows():
        event_time = datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%S.%f")
        lat = event["latitude"]
        lng = event["longitude"]
        dep = event["depth(m)"] / 1e3
        mag = event["magnitude"]
        EH = 0
        EZ = 0
        RMS = event["sigma_time"]
        year, month, day, hour, min, sec = (
            event_time.year,
            event_time.month,
            event_time.day,
            event_time.hour,
            event_time.minute,
            float(event_time.strftime("%S.%f")),
        )
        event_line = f"# {year:4d} {month:2d} {day:2d} {hour:2d} {min:2d} {sec:5.2f}  {lat:7.4f} {lng:9.4f}   {dep:5.2f} {mag:5.2f} {EH:5.2f} {EZ:5.2f} {RMS:5.2f} {event['event_index']:9d}\n"
        # output_lines.append(event_line)
        output_file.write(event_line)
        picks_idx = picks_by_event[event["event_index"]]
        for j in picks_idx:
            # pick = picks.iloc[j]
            pick = picks.loc[j]
            network_code, station_code, comp_code, channel_code = pick["station_id"].split(".")
            phase_type = pick["phase_type"].upper()
            phase_score = pick["phase_score"]
            pick_time = (datetime.strptime(pick["phase_time"], "%Y-%m-%dT%H:%M:%S.%f") - event_time).total_seconds()
            tmp_code = f"{station_code}{channel_code}"
            pick_line = f"{tmp_code:<7s}   {pick_time:6.3f}   {phase_score:5.4f}   {phase_type}\n"
            # output_lines.append(pick_line)
            output_file.write(pick_line)
    # with open(hypodd_phase, "w") as fp:
    #     fp.writelines(output_lines)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/phase_{node_i:03d}.pha",
            hypodd_phase,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    return hypodd_phase
    
        In [52]:
                Copied!
                
                
            if run_local:
    for node_i in nodes:
        convert_phase(
            node_i,
            root_dir("config.json"),
            root_dir("gamma_picks.csv"),
            root_dir("gamma_catalog.csv"),
            root_dir("hypodd/hypodd_phase.pha"),
            root_dir(""),
        )
if run_local:
    for node_i in nodes:
        convert_phase(
            node_i,
            root_dir("config.json"),
            root_dir("gamma_picks.csv"),
            root_dir("gamma_catalog.csv"),
            root_dir("hypodd/hypodd_phase.pha"),
            root_dir(""),
        )
    
        ERROR: can not access minio service! No module named 'minio'
In [53]:
                Copied!
                
                
            convert_phase_op = comp.func_to_container_op(
    convert_phase,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
convert_phase_op = comp.func_to_container_op(
    convert_phase,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
    
        7.3. Run ph2dt to calculate differential time between phases¶
In [56]:
                Copied!
                
                
            def ph2dt(
    node_i: int,
    config_json: InputPath("json"),
    hypodd_phase: InputPath(str),
    station_dat: InputPath(str),
    ct_file: OutputPath(str),
    hypodd_event: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    # try:
    #     minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    #     minioClient.fget_object(bucket_name, f"{config['region']}/hypodd_{node_i:03d}.pha", os.path.join(hypodd_path, f"hypodd_{node_i:03d}.pha"))
    # except Exception as err:
    #     print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    ph2dt = """* ph2dt.inp - input control file for program ph2dt
* Input station file:
stations.dat
* Input phase file:
hypodd.pha
*MINWGHT: min. pick weight allowed [0]
*MAXDIST: max. distance in km between event pair and stations [200]
*MAXSEP: max. hypocentral separation in km [10]
*MAXNGH: max. number of neighbors per event [10]
*MINLNK: min. number of links required to define a neighbor [8]
*MINOBS: min. number of links per pair saved [8]
*MAXOBS: max. number of links per pair saved [20]
*MINWGHT MAXDIST MAXSEP MAXNGH MINLNK MINOBS MAXOBS
   0      120     10     50     8      8     100
"""
    with open(os.path.join(hypodd_path, "ph2dt.inp"), "w") as fp:
        fp.writelines(ph2dt)
    def copy_file(fp_from, fp_to):
        with open(fp_from, "r") as fp:
            lines = fp.readlines()
        with open(fp_to, "w") as fp:
            fp.writelines(lines)
    copy_file(hypodd_phase, os.path.join(hypodd_path, "hypodd.pha"))
    copy_file(station_dat, os.path.join(hypodd_path, "stations.dat"))
    PH2DT_CMD = f"cd {hypodd_path} && ./ph2dt ph2dt.inp"
    print(PH2DT_CMD)
    if os.system(PH2DT_CMD) != 0:
        raise ("{PH2DT_CMD}" + " failed!")
    copy_file(os.path.join(hypodd_path, "dt.ct"), ct_file)
    copy_file(os.path.join(hypodd_path, "event.sel"), hypodd_event)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/dt_{node_i:03d}.ct",
            ct_file,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/event_{node_i:03d}.sel",
            hypodd_event,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    # try:
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/dt_{node_i:03d}.ct",
    #         f"{os.path.join(hypodd_path, 'dt.ct')}",
    #     )
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/event_{node_i:03d}.dat",
    #         f"{os.path.join(hypodd_path, 'event.dat')}",
    #     )
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/event_{node_i:03d}.sel",
    #         f"{os.path.join(hypodd_path, 'event.sel')}",
    #     )
    # except Exception as err:
    #     print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    return 0
def ph2dt(
    node_i: int,
    config_json: InputPath("json"),
    hypodd_phase: InputPath(str),
    station_dat: InputPath(str),
    ct_file: OutputPath(str),
    hypodd_event: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    if not os.path.exists(hypodd_path):
        os.mkdir(hypodd_path)
    # try:
    #     minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
    #     minioClient.fget_object(bucket_name, f"{config['region']}/hypodd_{node_i:03d}.pha", os.path.join(hypodd_path, f"hypodd_{node_i:03d}.pha"))
    # except Exception as err:
    #     print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    ph2dt = """* ph2dt.inp - input control file for program ph2dt
* Input station file:
stations.dat
* Input phase file:
hypodd.pha
*MINWGHT: min. pick weight allowed [0]
*MAXDIST: max. distance in km between event pair and stations [200]
*MAXSEP: max. hypocentral separation in km [10]
*MAXNGH: max. number of neighbors per event [10]
*MINLNK: min. number of links required to define a neighbor [8]
*MINOBS: min. number of links per pair saved [8]
*MAXOBS: max. number of links per pair saved [20]
*MINWGHT MAXDIST MAXSEP MAXNGH MINLNK MINOBS MAXOBS
   0      120     10     50     8      8     100
"""
    with open(os.path.join(hypodd_path, "ph2dt.inp"), "w") as fp:
        fp.writelines(ph2dt)
    def copy_file(fp_from, fp_to):
        with open(fp_from, "r") as fp:
            lines = fp.readlines()
        with open(fp_to, "w") as fp:
            fp.writelines(lines)
    copy_file(hypodd_phase, os.path.join(hypodd_path, "hypodd.pha"))
    copy_file(station_dat, os.path.join(hypodd_path, "stations.dat"))
    PH2DT_CMD = f"cd {hypodd_path} && ./ph2dt ph2dt.inp"
    print(PH2DT_CMD)
    if os.system(PH2DT_CMD) != 0:
        raise ("{PH2DT_CMD}" + " failed!")
    copy_file(os.path.join(hypodd_path, "dt.ct"), ct_file)
    copy_file(os.path.join(hypodd_path, "event.sel"), hypodd_event)
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/dt_{node_i:03d}.ct",
            ct_file,
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/event_{node_i:03d}.sel",
            hypodd_event,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    # try:
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/dt_{node_i:03d}.ct",
    #         f"{os.path.join(hypodd_path, 'dt.ct')}",
    #     )
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/event_{node_i:03d}.dat",
    #         f"{os.path.join(hypodd_path, 'event.dat')}",
    #     )
    #     minioClient.fput_object(
    #         bucket_name,
    #         f"{config['region']}/event_{node_i:03d}.sel",
    #         f"{os.path.join(hypodd_path, 'event.sel')}",
    #     )
    # except Exception as err:
    #     print(f"ERROR: can not access minio service! \n{err}")
    #     pass
    return 0
    
        In [57]:
                Copied!
                
                
            if run_local:
    ph2dt(
        0,
        root_dir("config.json"),
        root_dir("hypodd/hypodd_phase.pha"),
        root_dir("hypodd/stations.dat"),
        root_dir("hypodd/dt.ct"),
        root_dir("hypodd/event.sel"),
        root_dir(""),
    )
if run_local:
    ph2dt(
        0,
        root_dir("config.json"),
        root_dir("hypodd/hypodd_phase.pha"),
        root_dir("hypodd/stations.dat"),
        root_dir("hypodd/dt.ct"),
        root_dir("hypodd/event.sel"),
        root_dir(""),
    )
    
        cd Demo/hypodd && ./ph2dt ph2dt.inp starting ph2dt (v1.3 - 08/2010)... reading data ... > stations = 46 > events total = 363 > events selected = 363 > phases = 16187 forming dtimes... > P-phase pairs total = 145432 > S-phase pairs total = 173413 > outliers = 3467 ( 1 %) > phases at stations not in station list = 0 > phases at distances larger than MAXDIST = 0 > P-phase pairs selected = 138381 ( 95 %) > S-phase pairs selected = 166806 ( 96 %) > weakly linked events = 86 ( 23 %) > linked event pairs = 9869 > average links per pair = 30 > average offset (km) betw. linked events = 4.81116629 > average offset (km) betw. strongly linked events = 4.81116629 > maximum offset (km) betw. strongly linked events = 9.99739456 Done. Output files: dt.ct; event.dat; event.sel; ph2dt.log ph2dt parameters were: (minwght,maxdist,maxsep,maxngh,minlnk,minobs,maxobs) 0.00000000 120.000000 10.0000000 50 8 8 100 ERROR: can not access minio service! No module named 'minio'
In [58]:
                Copied!
                
                
            ph2dt_op = comp.func_to_container_op(ph2dt, base_image="zhuwq0/hypodd-api:1.0")
ph2dt_op = comp.func_to_container_op(ph2dt, base_image="zhuwq0/hypodd-api:1.0")
    
        7.4. Run HypoDD re-location¶
In [59]:
                Copied!
                
                
            def hypodd_ct(
    node_i: int,
    config_json: InputPath("json"),
    ct_file: InputPath(str),
    event: InputPath(str),
    station: InputPath(str),
    catalog_txt: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    ct_inp = """* RELOC.INP:
*--- input file selection
* cross correlation diff times:
*
*catalog P diff times:
dt.ct
*
* event file:
event.sel
*
* station file:
stations.dat
*
*--- output file selection
* original locations:
hypodd.loc
* relocations:
hypodd.reloc
* station information:
hypodd.sta
* residual information:
hypodd.res
* source paramater information:
hypodd.src
*
*--- data type selection: 
* IDAT:  0 = synthetics; 1= cross corr; 2= catalog; 3= cross & cat 
* IPHA: 1= P; 2= S; 3= P&S
* DIST:max dist [km] between cluster centroid and station 
* IDAT   IPHA   DIST
    2     3     120
*
*--- event clustering:
* OBSCC:    min # of obs/pair for crosstime data (0= no clustering)
* OBSCT:    min # of obs/pair for network data (0= no clustering)
* OBSCC  OBSCT    
     0     8        
*
*--- solution control:
* ISTART:  	1 = from single source; 2 = from network sources
* ISOLV:	1 = SVD, 2=lsqr
* NSET:      	number of sets of iteration with specifications following
*  ISTART  ISOLV  NSET
    2        2      4
*
*--- data weighting and re-weighting: 
* NITER: 		last iteration to used the following weights
* WTCCP, WTCCS:		weight cross P, S 
* WTCTP, WTCTS:		weight catalog P, S 
* WRCC, WRCT:		residual threshold in sec for cross, catalog data 
* WDCC, WDCT:  		max dist [km] between cross, catalog linked pairs
* DAMP:    		damping (for lsqr only) 
*       ---  CROSS DATA ----- ----CATALOG DATA ----
* NITER WTCCP WTCCS WRCC WDCC WTCTP WTCTS WRCT WDCT DAMP
   4     -9     -9   -9    -9   1     1      8   -9  70 
   4     -9     -9   -9    -9   1     1      6    4  70 
   4     -9     -9   -9    -9   1    0.8     4    2  70 
   4     -9     -9   -9    -9   1    0.8     3    2  70 
*
*--- 1D model:
* NLAY:		number of model layers  
* RATIO:	vp/vs ratio 
* TOP:		depths of top of layer (km) 
* VEL: 		layer velocities (km/s)
* NLAY  RATIO 
   12     1.82
* TOP 
0.0 1.0 3.0 5.0 7.0 9.0 11.0 13.0 17.0 21.0 31.00 31.10
* VEL
5.30 5.65 5.93 6.20 6.20 6.20 6.20 6.20 6.20 6.20 7.50 8.11
*
*--- event selection:
* CID: 	cluster to be relocated (0 = all)
* ID:	cuspids of event to be relocated (8 per line)
* CID    
    0      
* ID
"""
    with open(os.path.join(hypodd_path, "ct.inp"), "w") as fp:
        fp.writelines(ct_inp)
    def copy_file(fp_from, fp_to):
        with open(fp_from, "r") as fp:
            lines = fp.readlines()
        with open(fp_to, "w") as fp:
            fp.writelines(lines)
    copy_file(ct_file, os.path.join(hypodd_path, "dt.ct"))
    copy_file(event, os.path.join(hypodd_path, "event.sel"))
    copy_file(station, os.path.join(hypodd_path, "stations.dat"))
    # os.system(f"cat {ct_file}")
    # os.system(f"cat {event}")
    # os.system(f"cat {station}")
    HYPODD_CMD = f"cd {hypodd_path} && ./hypoDD ct.inp"
    print(HYPODD_CMD)
    if os.system(HYPODD_CMD) != 0:
        raise ("{HYPODD_CMD}" + " failed!")
    copy_file(os.path.join(hypodd_path, "hypodd.reloc"), catalog_txt)
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/hypodd_ct_{node_i:03d}.reloc",
            catalog_txt,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
def hypodd_ct(
    node_i: int,
    config_json: InputPath("json"),
    ct_file: InputPath(str),
    event: InputPath(str),
    station: InputPath(str),
    catalog_txt: OutputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from datetime import datetime
    with open(config_json, "r") as fp:
        config = json.load(fp)
    hypodd_path = os.path.join(data_path, "hypodd")
    ct_inp = """* RELOC.INP:
*--- input file selection
* cross correlation diff times:
*
*catalog P diff times:
dt.ct
*
* event file:
event.sel
*
* station file:
stations.dat
*
*--- output file selection
* original locations:
hypodd.loc
* relocations:
hypodd.reloc
* station information:
hypodd.sta
* residual information:
hypodd.res
* source paramater information:
hypodd.src
*
*--- data type selection: 
* IDAT:  0 = synthetics; 1= cross corr; 2= catalog; 3= cross & cat 
* IPHA: 1= P; 2= S; 3= P&S
* DIST:max dist [km] between cluster centroid and station 
* IDAT   IPHA   DIST
    2     3     120
*
*--- event clustering:
* OBSCC:    min # of obs/pair for crosstime data (0= no clustering)
* OBSCT:    min # of obs/pair for network data (0= no clustering)
* OBSCC  OBSCT    
     0     8        
*
*--- solution control:
* ISTART:  	1 = from single source; 2 = from network sources
* ISOLV:	1 = SVD, 2=lsqr
* NSET:      	number of sets of iteration with specifications following
*  ISTART  ISOLV  NSET
    2        2      4
*
*--- data weighting and re-weighting: 
* NITER: 		last iteration to used the following weights
* WTCCP, WTCCS:		weight cross P, S 
* WTCTP, WTCTS:		weight catalog P, S 
* WRCC, WRCT:		residual threshold in sec for cross, catalog data 
* WDCC, WDCT:  		max dist [km] between cross, catalog linked pairs
* DAMP:    		damping (for lsqr only) 
*       ---  CROSS DATA ----- ----CATALOG DATA ----
* NITER WTCCP WTCCS WRCC WDCC WTCTP WTCTS WRCT WDCT DAMP
   4     -9     -9   -9    -9   1     1      8   -9  70 
   4     -9     -9   -9    -9   1     1      6    4  70 
   4     -9     -9   -9    -9   1    0.8     4    2  70 
   4     -9     -9   -9    -9   1    0.8     3    2  70 
*
*--- 1D model:
* NLAY:		number of model layers  
* RATIO:	vp/vs ratio 
* TOP:		depths of top of layer (km) 
* VEL: 		layer velocities (km/s)
* NLAY  RATIO 
   12     1.82
* TOP 
0.0 1.0 3.0 5.0 7.0 9.0 11.0 13.0 17.0 21.0 31.00 31.10
* VEL
5.30 5.65 5.93 6.20 6.20 6.20 6.20 6.20 6.20 6.20 7.50 8.11
*
*--- event selection:
* CID: 	cluster to be relocated (0 = all)
* ID:	cuspids of event to be relocated (8 per line)
* CID    
    0      
* ID
"""
    with open(os.path.join(hypodd_path, "ct.inp"), "w") as fp:
        fp.writelines(ct_inp)
    def copy_file(fp_from, fp_to):
        with open(fp_from, "r") as fp:
            lines = fp.readlines()
        with open(fp_to, "w") as fp:
            fp.writelines(lines)
    copy_file(ct_file, os.path.join(hypodd_path, "dt.ct"))
    copy_file(event, os.path.join(hypodd_path, "event.sel"))
    copy_file(station, os.path.join(hypodd_path, "stations.dat"))
    # os.system(f"cat {ct_file}")
    # os.system(f"cat {event}")
    # os.system(f"cat {station}")
    HYPODD_CMD = f"cd {hypodd_path} && ./hypoDD ct.inp"
    print(HYPODD_CMD)
    if os.system(HYPODD_CMD) != 0:
        raise ("{HYPODD_CMD}" + " failed!")
    copy_file(os.path.join(hypodd_path, "hypodd.reloc"), catalog_txt)
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd/hypodd_ct_{node_i:03d}.reloc",
            catalog_txt,
        )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    
        In [60]:
                Copied!
                
                
            if run_local:
    hypodd_ct(
        0,
        root_dir("config.json"),
        root_dir("hypodd/dt.ct"),
        root_dir("hypodd/event.sel"),
        root_dir("hypodd/stations.dat"),
        root_dir("hypodd_ct_catalog.txt"),
        root_dir(""),
    )
if run_local:
    hypodd_ct(
        0,
        root_dir("config.json"),
        root_dir("hypodd/dt.ct"),
        root_dir("hypodd/event.sel"),
        root_dir("hypodd/stations.dat"),
        root_dir("hypodd_ct_catalog.txt"),
        root_dir(""),
    )
    
        cd Demo/hypodd && ./hypoDD ct.inp
starting hypoDD (v1.3 - 11/2010)...          Wed Mar 15 14:47:00 2023 
INPUT FILES:
cross dtime data:  
catalog dtime data: dt.ct
events: event.sel
stations: stations.dat
OUTPUT FILES:
initial locations: hypodd.loc
relocated events: hypodd.reloc
event pair residuals: hypodd.res
station residuals: hypodd.sta
source parameters: hypodd.src
 Relocate all clusters
 Relocate all events
Reading data ...   Wed Mar 15 14:47:00 2023 
# events =   363
# stations < maxdist =     46
# catalog P dtimes =  138381
# catalog S dtimes =  166806
# dtimes total =   305187
# events after dtime match =        351
# stations =     45
clustering ...  
Clustered events:   351
Isolated events:     0
# clusters:    1
Cluster   1:   351 events
RELOCATION OF CLUSTER: 1     Wed Mar 15 14:47:01 2023 
----------------------
Initial trial sources =   351
  IT   EV  CT    RMSCT   RMSST   DX   DY   DZ   DT   OS  AQ  CND
        %   %   ms     %    ms    m    m    m   ms    m 
 1    100 100  230 -21.4     0  798  660 2348  177    0   8  205
 2     98  99  229  -0.4     0  773  631 2302  168    0   1  202
 3  1  97  99  228  -0.4   410  770  629 2301  167 1128   0  202
 4     97  98  187 -18.0   410  407  252 2664   77 1128  13  192
 5     93  97  185  -1.5   410  392  230 1314   71 1128   1  189
 6  2  93  97  184  -0.5   383  389  229 1287   71  958   0  189
 7     93  96  163 -11.1   383  254  135  666   44  958   3  180
 8  3  92  95  162  -1.0   335  248  134  645   43  551   0  177
 9     92  95  152  -6.0   335  188   89  912   28  551   2  175
10  4  91  94  151  -0.5   327  189   90  526   27  697   0  174
11  5  87  49  135 -10.4   269  167  116  379   30  735   0  135
12  6  86  46  115 -14.9   236   92   64  245   17  771   0  131
13  7  85  45  109  -5.8   222   60   42  158   12  750   0  129
14  8  85  44  106  -2.7   219   39   26   99    9  728   0  121
15  9  79  14   76 -27.8   146   43   35   83    7  568   0   66
16 10  78  13   64 -15.9   120   30   24   59    5  573   0   64
17 11  78  13   60  -7.0   112   22   18   47    4  563   0   60
18 12  77  12   57  -4.2   108   16   14   34    3  561   0   60
19 13  77  12   47 -17.6    86   17   15   33    3  558   0   59
20 14  77  11   43  -7.6    78   16   13   28    2  557   0   57
21 15  77  11   41  -5.1    75   14   11   26    2  560   0   54
22 16  77  11   40  -3.5    72   12    9   21    2  557   0   55
writing out results ...
ERROR: can not access minio service! 
No module named 'minio'
In [61]:
                Copied!
                
                
            hypodd_ct_op = comp.func_to_container_op(hypodd_ct, base_image="zhuwq0/hypodd-api:1.0")
hypodd_ct_op = comp.func_to_container_op(hypodd_ct, base_image="zhuwq0/hypodd-api:1.0")
    
        In [62]:
                Copied!
                
                
            if run_local:
    from datetime import datetime
    import matplotlib.pyplot as plt
    import pandas as pd
    catalog_hypodd = pd.read_csv(
        root_dir(f"hypodd/hypodd.reloc"),
        sep="\s+",
        names=[
            "ID",
            "LAT",
            "LON",
            "DEPTH",
            "X",
            "Y",
            "Z",
            "EX",
            "EY",
            "EZ",
            "YR",
            "MO",
            "DY",
            "HR",
            "MI",
            "SC",
            "MAG",
            "NCCP",
            "NCCS",
            "NCTP",
            "NCTS",
            "RCC",
            "RCT",
            "CID",
        ],
    )
    catalog_hypodd["time"] = catalog_hypodd.apply(
        lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
        axis=1,
    )
    catalog_hypodd["time"] = catalog_hypodd["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
    plt.figure()
    plt.plot(catalog_hypodd["LON"], catalog_hypodd["LAT"], ".")
    plt.show()
if run_local:
    from datetime import datetime
    import matplotlib.pyplot as plt
    import pandas as pd
    catalog_hypodd = pd.read_csv(
        root_dir(f"hypodd/hypodd.reloc"),
        sep="\s+",
        names=[
            "ID",
            "LAT",
            "LON",
            "DEPTH",
            "X",
            "Y",
            "Z",
            "EX",
            "EY",
            "EZ",
            "YR",
            "MO",
            "DY",
            "HR",
            "MI",
            "SC",
            "MAG",
            "NCCP",
            "NCCS",
            "NCTP",
            "NCTS",
            "RCC",
            "RCT",
            "CID",
        ],
    )
    catalog_hypodd["time"] = catalog_hypodd.apply(
        lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
        axis=1,
    )
    catalog_hypodd["time"] = catalog_hypodd["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
    plt.figure()
    plt.plot(catalog_hypodd["LON"], catalog_hypodd["LAT"], ".")
    plt.show()
    
        In [ ]:
                Copied!
                
                
            def cross_correlation(
    ct_file: InputPath(str),
    catalog_file: InputPath(str),
    picks_file: InputPath(str),
    cc_file: OutputPath(str),
):
    import time
    from multiprocessing import Manager, Process
    import numpy as np
    import pandas as pd
    from pymongo import MongoClient
    from tqdm import tqdm
    catalog = pd.read_csv(
        catalog_file,
        sep="\t",
        parse_dates=["time"],
        index_col=["event_idx"],
        dtype={"event_idx": str},
    )
    picks = pd.read_csv(picks_file, sep="\t", parse_dates=["timestamp"], dtype={"event_idx": str})
    picks["station"] = picks["id"].apply(lambda x: x.split(".")[1] + x.split(".")[3])
    picks = picks.set_index(["event_idx", "station", "type"])
    picks = picks.sort_index()
    pick_index = 100
    lo = pick_index - 50
    hi = pick_index + 100
    dt = 0.01
    ct_dict = Manager().dict()
    cc_dict = Manager().dict()
    with open(ct_file) as fct:
        meta = fct.readlines()
        for i, line in enumerate(meta):
            if line[0] == "#":
                if i > 0:
                    ct_dict[key] = value
                key = line
                value = []
                continue
            value.append(line)
        ct_dict[key] = value
    keys = sorted(list(ct_dict.keys()))
    def calc_cross_correlation(keys, ct_dict, cc_dict):
        username = "root"
        password = "quakeflow123"
        # client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
        client = MongoClient(f"mongodb://{username}:{password}@quakeflow-mongodb.default.svc.cluster.local:27017")
        db = client["quakeflow"]
        collection = db["waveform"]
        # normalize = lambda x: (x - np.mean(x, axis=0, keepdims=True)) / np.std(x, axis=0, keepdims=True)
        for key in keys:
            tmp = key.split()
            ID1, ID2 = tmp[1], tmp[2]
            key_cc = f"#    {ID1}    {ID2}    0.0\n"
            lines_cc = []
            for line in ct_dict[key]:
                tmp = line.split()
                STA, TT1, TT2, WGT, PHA = (
                    tmp[0],
                    tmp[1],
                    tmp[2],
                    tmp[3],
                    tmp[4],
                )  ##HypoDD format
                for i, row1 in picks.loc[(ID1, STA, PHA)].iterrows():
                    data1 = collection.find_one(
                        {"_id": f"{row1['id']}_{row1['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
                    )
                    for j, row2 in picks.loc[(ID2, STA, PHA)].iterrows():
                        data2 = collection.find_one(
                            {"_id": f"{row2['id']}_{row2['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
                        )
                        # if PHA == "P":  # Z
                        #     waveform1 = np.array(data1["waveform"])[lo:hi, -1:]
                        #     waveform2 = np.array(data2["waveform"])[lo:hi, -1:]
                        # elif PHA == "S":  # E, N
                        #     waveform1 = np.array(data1["waveform"])[lo:hi, :-1]
                        #     waveform2 = np.array(data2["waveform"])[lo:hi, :-1]
                        # else:
                        #     raise (Exception("PHA must be P or S"))
                        waveform1 = np.array(data1["waveform"])[lo:hi, :]
                        waveform2 = np.array(data2["waveform"])[lo:hi, :]
                        cc = np.zeros(waveform1.shape[0])
                        for k in range(waveform1.shape[1]):
                            cc += np.correlate(waveform1[:, k], waveform2[:, k], mode="same")
                        norm = np.sqrt(np.sum(waveform1**2) * np.sum(waveform2**2))
                        if norm == 0:
                            continue
                        else:
                            cc /= norm
                        shift = (np.argmax(np.abs(cc)) - waveform1.shape[0] // 2) * dt + float(TT1) - float(TT2)
                        coeff = np.max(np.abs(cc))
                        if not np.isnan(coeff):
                            lines_cc.append(f"{STA:<7s}    {shift:.5f}    {coeff:.3f}    {PHA}\n")
                cc_dict[key_cc] = lines_cc
        return 0
    t0 = time.time()
    processes = []
    num_process = 16
    for i in range(num_process):
        p = Process(target=calc_cross_correlation, args=(keys[i::num_process], ct_dict, cc_dict))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    print(f"{num_process} process: time = {time.time()-t0:.1f}")
    with open(cc_file, "w") as fcc:
        for key in cc_dict:
            fcc.write(key)
            for line in cc_dict[key]:
                fcc.write(line)
def cross_correlation(
    ct_file: InputPath(str),
    catalog_file: InputPath(str),
    picks_file: InputPath(str),
    cc_file: OutputPath(str),
):
    import time
    from multiprocessing import Manager, Process
    import numpy as np
    import pandas as pd
    from pymongo import MongoClient
    from tqdm import tqdm
    catalog = pd.read_csv(
        catalog_file,
        sep="\t",
        parse_dates=["time"],
        index_col=["event_idx"],
        dtype={"event_idx": str},
    )
    picks = pd.read_csv(picks_file, sep="\t", parse_dates=["timestamp"], dtype={"event_idx": str})
    picks["station"] = picks["id"].apply(lambda x: x.split(".")[1] + x.split(".")[3])
    picks = picks.set_index(["event_idx", "station", "type"])
    picks = picks.sort_index()
    pick_index = 100
    lo = pick_index - 50
    hi = pick_index + 100
    dt = 0.01
    ct_dict = Manager().dict()
    cc_dict = Manager().dict()
    with open(ct_file) as fct:
        meta = fct.readlines()
        for i, line in enumerate(meta):
            if line[0] == "#":
                if i > 0:
                    ct_dict[key] = value
                key = line
                value = []
                continue
            value.append(line)
        ct_dict[key] = value
    keys = sorted(list(ct_dict.keys()))
    def calc_cross_correlation(keys, ct_dict, cc_dict):
        username = "root"
        password = "quakeflow123"
        # client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
        client = MongoClient(f"mongodb://{username}:{password}@quakeflow-mongodb.default.svc.cluster.local:27017")
        db = client["quakeflow"]
        collection = db["waveform"]
        # normalize = lambda x: (x - np.mean(x, axis=0, keepdims=True)) / np.std(x, axis=0, keepdims=True)
        for key in keys:
            tmp = key.split()
            ID1, ID2 = tmp[1], tmp[2]
            key_cc = f"#    {ID1}    {ID2}    0.0\n"
            lines_cc = []
            for line in ct_dict[key]:
                tmp = line.split()
                STA, TT1, TT2, WGT, PHA = (
                    tmp[0],
                    tmp[1],
                    tmp[2],
                    tmp[3],
                    tmp[4],
                )  ##HypoDD format
                for i, row1 in picks.loc[(ID1, STA, PHA)].iterrows():
                    data1 = collection.find_one(
                        {"_id": f"{row1['id']}_{row1['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
                    )
                    for j, row2 in picks.loc[(ID2, STA, PHA)].iterrows():
                        data2 = collection.find_one(
                            {"_id": f"{row2['id']}_{row2['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
                        )
                        # if PHA == "P":  # Z
                        #     waveform1 = np.array(data1["waveform"])[lo:hi, -1:]
                        #     waveform2 = np.array(data2["waveform"])[lo:hi, -1:]
                        # elif PHA == "S":  # E, N
                        #     waveform1 = np.array(data1["waveform"])[lo:hi, :-1]
                        #     waveform2 = np.array(data2["waveform"])[lo:hi, :-1]
                        # else:
                        #     raise (Exception("PHA must be P or S"))
                        waveform1 = np.array(data1["waveform"])[lo:hi, :]
                        waveform2 = np.array(data2["waveform"])[lo:hi, :]
                        cc = np.zeros(waveform1.shape[0])
                        for k in range(waveform1.shape[1]):
                            cc += np.correlate(waveform1[:, k], waveform2[:, k], mode="same")
                        norm = np.sqrt(np.sum(waveform1**2) * np.sum(waveform2**2))
                        if norm == 0:
                            continue
                        else:
                            cc /= norm
                        shift = (np.argmax(np.abs(cc)) - waveform1.shape[0] // 2) * dt + float(TT1) - float(TT2)
                        coeff = np.max(np.abs(cc))
                        if not np.isnan(coeff):
                            lines_cc.append(f"{STA:<7s}    {shift:.5f}    {coeff:.3f}    {PHA}\n")
                cc_dict[key_cc] = lines_cc
        return 0
    t0 = time.time()
    processes = []
    num_process = 16
    for i in range(num_process):
        p = Process(target=calc_cross_correlation, args=(keys[i::num_process], ct_dict, cc_dict))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    print(f"{num_process} process: time = {time.time()-t0:.1f}")
    with open(cc_file, "w") as fcc:
        for key in cc_dict:
            fcc.write(key)
            for line in cc_dict[key]:
                fcc.write(line)
    
        In [ ]:
                Copied!
                
                
            cc_op = comp.func_to_container_op(
    cross_correlation,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio", "pymongo"],
)
cc_op = comp.func_to_container_op(
    cross_correlation,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio", "pymongo"],
)
    
        In [ ]:
                Copied!
                
                
            def hypodd_cc(
    i: int,
    config_json: InputPath("json"),
    ct_file: InputPath(str),
    cc_file: InputPath(str),
    event: InputPath(str),
    station: InputPath(str),
    inp_file: str = "hypodd_cc.inp",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from minio import Minio
    with open(config_json, "r") as fp:
        config = json.load(fp)
    minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
    os.system(f"cat {ct_file} > dt.ct")
    os.system(f"cat {cc_file} > dt.cc")
    os.system(f"cat {event} > event.sel")
    os.system(f"cat {station} > stations_hypodd.dat ")
    HYPODD_CMD = f"HYPODD/src/hypodd/hypodd {inp_file}"
    if os.system(HYPODD_CMD) != 0:
        raise ("{HYPODD_CMD}" + " failed!")
    os.system(f"mv hypodd.reloc hypodd_cc_{i:03d}.reloc")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_cc_{i:03d}.reloc",
        f"hypodd_cc_{i:03d}.reloc",
    )
def hypodd_cc(
    i: int,
    config_json: InputPath("json"),
    ct_file: InputPath(str),
    cc_file: InputPath(str),
    event: InputPath(str),
    station: InputPath(str),
    inp_file: str = "hypodd_cc.inp",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from minio import Minio
    with open(config_json, "r") as fp:
        config = json.load(fp)
    minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
    os.system(f"cat {ct_file} > dt.ct")
    os.system(f"cat {cc_file} > dt.cc")
    os.system(f"cat {event} > event.sel")
    os.system(f"cat {station} > stations_hypodd.dat ")
    HYPODD_CMD = f"HYPODD/src/hypodd/hypodd {inp_file}"
    if os.system(HYPODD_CMD) != 0:
        raise ("{HYPODD_CMD}" + " failed!")
    os.system(f"mv hypodd.reloc hypodd_cc_{i:03d}.reloc")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_cc_{i:03d}.reloc",
        f"hypodd_cc_{i:03d}.reloc",
    )
    
        In [ ]:
                Copied!
                
                
            hypodd_cc_op = comp.func_to_container_op(hypodd_cc, base_image="zhuwq0/hypodd-api:1.0")
hypodd_cc_op = comp.func_to_container_op(hypodd_cc, base_image="zhuwq0/hypodd-api:1.0")
    
        Merge hypodd results¶
In [ ]:
                Copied!
                
                
            def merge_hypodd(
    index: list,
    config_json: InputPath("json"),
    catalog_ct: OutputPath(str),
    catalog_cc: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from glob import glob
    from minio import Minio
    minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
    with open(config_json, "r") as fp:
        config = json.load(fp)
    objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/hypodd/hypodd_", recursive=True)
    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        print(obj._object_name)
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
    # tmp_ct_catalogs = sorted(glob(tmp_path("hypodd_ct_*.reloc")))
    hypodd_ct_catalogs = [tmp_path(f"hypodd_ct_{i:03d}.reloc") for i in index]
    print(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
    os.system(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_ct_catalog.txt",
        tmp_path("hypodd_ct_catalog.txt"),
    )
    os.system(f"cat {tmp_path('hypodd_ct_catalog.txt')} > {catalog_ct}")
    # hypodd_cc_catalogs = sorted(glob(tmp_path("hypodd_cc_*.reloc")))
    hypodd_cc_catalogs = [tmp_path(f"hypodd_cc_{i:03d}.reloc") for i in index]
    print(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
    os.system(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_cc_catalog.txt",
        tmp_path("hypodd_cc_catalog.txt"),
    )
    os.system(f"cat {tmp_path('hypodd_cc_catalog.txt')} > {catalog_cc}")
    return 0
def merge_hypodd(
    index: list,
    config_json: InputPath("json"),
    catalog_ct: OutputPath(str),
    catalog_cc: OutputPath(str),
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import json
    import os
    from glob import glob
    from minio import Minio
    minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
    with open(config_json, "r") as fp:
        config = json.load(fp)
    objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/hypodd/hypodd_", recursive=True)
    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        print(obj._object_name)
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
    # tmp_ct_catalogs = sorted(glob(tmp_path("hypodd_ct_*.reloc")))
    hypodd_ct_catalogs = [tmp_path(f"hypodd_ct_{i:03d}.reloc") for i in index]
    print(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
    os.system(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_ct_catalog.txt",
        tmp_path("hypodd_ct_catalog.txt"),
    )
    os.system(f"cat {tmp_path('hypodd_ct_catalog.txt')} > {catalog_ct}")
    # hypodd_cc_catalogs = sorted(glob(tmp_path("hypodd_cc_*.reloc")))
    hypodd_cc_catalogs = [tmp_path(f"hypodd_cc_{i:03d}.reloc") for i in index]
    print(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
    os.system(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
    minioClient.fput_object(
        bucket_name,
        f"{config['region']}/hypodd_cc_catalog.txt",
        tmp_path("hypodd_cc_catalog.txt"),
    )
    os.system(f"cat {tmp_path('hypodd_cc_catalog.txt')} > {catalog_cc}")
    return 0
    
        In [ ]:
                Copied!
                
                
            merge_hypodd_op = comp.func_to_container_op(
    merge_hypodd,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
merge_hypodd_op = comp.func_to_container_op(
    merge_hypodd,
    base_image="python:3.8",
    packages_to_install=["pandas", "tqdm", "minio"],
)
    
        8. Visulization¶
In [ ]:
                Copied!
                
                
            def visulization(
    config_json: InputPath("json"),
    hypodd_catalog_ct: InputPath(str),
    hypodd_catalog_cc: InputPath(str),
    gamma_catalog: InputPath(str),
    standard_catalog: InputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import pandas as pd
    import plotly.graph_objects as go
    import numpy as np
    import json
    import os
    from datetime import datetime
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    with open(config_json, "r") as fp:
        config = json.load(fp)
    def plot3d(x, y, z, fig_name):
        fig = go.Figure(
            data=[
                go.Scatter3d(
                    x=x,
                    y=y,
                    z=z,
                    mode="markers",
                    marker=dict(size=3.0, color=-z, cmin=-60, cmax=2, colorscale="Viridis", opacity=0.8),
                )
            ],
        )
        fig.update_layout(
            scene=dict(
                xaxis=dict(
                    nticks=4,
                    range=config["xlim_degree"],
                ),
                yaxis=dict(
                    nticks=4,
                    range=config["ylim_degree"],
                ),
                zaxis=dict(
                    nticks=4,
                    # range=[z.max(), z.min()],
                    range=[60, -2],
                ),
                #         aspectratio = dict(x=(xlim[1]-xlim[0])/2, y=(ylim[1]-ylim[0])/2, z=1),
                aspectratio=dict(x=1, y=1, z=0.5),
            ),
            margin=dict(r=0, l=0, b=0, t=0),
        )
        fig.write_html(fig_name)
    hypodd_ct_catalog = pd.read_csv(
        hypodd_catalog_ct,
        sep="\s+",
        names=[
            "ID",
            "LAT",
            "LON",
            "DEPTH",
            "X",
            "Y",
            "Z",
            "EX",
            "EY",
            "EZ",
            "YR",
            "MO",
            "DY",
            "HR",
            "MI",
            "SC",
            "MAG",
            "NCCP",
            "NCCS",
            "NCTP",
            "NCTS",
            "RCC",
            "RCT",
            "CID",
        ],
    )
    hypodd_ct_catalog["time"] = hypodd_ct_catalog.apply(
        lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
        axis=1,
    )
    hypodd_ct_catalog["magnitude"] = hypodd_ct_catalog["MAG"]
    hypodd_ct_catalog["time"] = hypodd_ct_catalog["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
    plot3d(
        hypodd_ct_catalog["LON"],
        hypodd_ct_catalog["LAT"],
        hypodd_ct_catalog["DEPTH"],
        f"{data_path}/hypodd_ct_catalog.html",
    )
    gamma_catalog = pd.read_csv(gamma_catalog, parse_dates=["time"])
    gamma_catalog["depth_km"] = gamma_catalog["depth(m)"] / 1e3
    plot3d(
        gamma_catalog["longitude"],
        gamma_catalog["latitude"],
        gamma_catalog["depth_km"],
        f"{data_path}/gamma_catalog.html",
    )
    standard_catalog = pd.read_csv(standard_catalog, parse_dates=["time"])
    standard_catalog["depth_km"] = standard_catalog["depth(m)"] / 1e3
    plot3d(
        standard_catalog["longitude"],
        standard_catalog["latitude"],
        standard_catalog["depth_km"],
        f"{data_path}/standard_catalog.html",
    )
    ## histogram
    bins = 30
    config["starttime"] = datetime.fromisoformat(config["starttime"])
    config["endtime"] = datetime.fromisoformat(config["endtime"])
    fig, ax = plt.subplots()
    ax.hist(gamma_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=1.0, linewidth=0.5, label=f"GaMMA: {len(gamma_catalog)}")
    ax.hist(hypodd_ct_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.8, linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog)}")
    ax.hist(standard_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.6, linewidth=0.5, label=f"Standard: {len(standard_catalog)}")
    # ax.set_xlabel("Date")
    ax.set_ylabel("Frequency")
    ax.autoscale(enable=True, axis='x', tight=True)
    # ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    fig.autofmt_xdate()
    ax.legend()
    fig.savefig(f"{data_path}/earthquake_frequency_time.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_number.pdf", bbox_inches="tight")
    plt.show()
    fig, ax = plt.subplots()
    xrange = (-1., max(standard_catalog["magnitude"].max(), gamma_catalog["magnitude"].max()))
    ax.hist(gamma_catalog["magnitude"], range=xrange, bins=bins, alpha=1.0,  edgecolor="k", linewidth=0.5, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
    ax.hist(hypodd_ct_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6,  edgecolor="k", linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
    ax.hist(standard_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6,  edgecolor="k", linewidth=0.5, label=f"Standard: {len(standard_catalog['magnitude'])}")
    ax.legend()
    ax.autoscale(enable=True, axis='x', tight=True)
    ax.set_xlabel("Magnitude")
    ax.set_ylabel("Frequency")
    ax.set_yscale('log')
    fig.savefig(f"{data_path}/earthquake_magnitude_frequency.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_magnitude_frequency.pdf", bbox_inches="tight")
    plt.show()
    fig, ax = plt.subplots()
    ax.plot(gamma_catalog["time"], gamma_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
    ax.plot(hypodd_ct_catalog["time"], hypodd_ct_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
    ax.plot(standard_catalog["time"], standard_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"Standard: {len(standard_catalog['magnitude'])}")
    ax.set_xlim(config["starttime"], config["endtime"])
    ax.set_ylabel("Magnitude")
    # ax.set_xlabel("Date")
    ax.set_ylim(bottom=-1)
    ax.legend(markerscale=2)
    ax.grid()
    # ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    fig.autofmt_xdate()
    fig.savefig(f"{data_path}/earthquake_magnitude_time.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_magnitude_time.pdf", bbox_inches="tight", dpi=300)
    plt.show()
    try:
        hypodd_cc_catalog = pd.read_csv(
            hypodd_catalog_cc,
            sep="\s+",
            names=[
                "ID",
                "LAT",
                "LON",
                "DEPTH",
                "X",
                "Y",
                "Z",
                "EX",
                "EY",
                "EZ",
                "YR",
                "MO",
                "DY",
                "HR",
                "MI",
                "SC",
                "MAG",
                "NCCP",
                "NCCS",
                "NCTP",
                "NCTS",
                "RCC",
                "RCT",
                "CID",
            ],
        )
        hypodd_cc_catalog["time"] = hypodd_cc_catalog.apply(
            lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
            axis=1,
        )
        hypodd_cc_catalog["time"] = hypodd_cc_catalog["time"].apply(
            lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f")
        )
        plot3d(
            hypodd_cc_catalog["LON"],
            hypodd_cc_catalog["LAT"],
            hypodd_cc_catalog["DEPTH"],
            f"{data_path}/hypodd_cc_catalog.html",
        )
    except Exception as err:
        print(f"{err}")
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd_ct_catalog.html",
            f"{data_path}/hypodd_ct_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_catalog.html",
            f"{data_path}/gamma_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/standard_catalog.html",
            f"{data_path}/standard_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_frequency_time.png",
            f"{data_path}/earthquake_frequency_time.png",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_magnitude_frequency.png",
            f"{data_path}/earthquake_magnitude_frequency.png",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_magnitude_time.png",
            f"{data_path}/earthquake_magnitude_time.png",
        )
        if os.path.exists(f"{data_path}/hypodd_cc_catalog.html"):
            minioClient.fput_object(
                bucket_name,
                f"{config['region']}/hypodd_cc_catalog.html",
                f"{data_path}/hypodd_cc_catalog.html",
            )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
def visulization(
    config_json: InputPath("json"),
    hypodd_catalog_ct: InputPath(str),
    hypodd_catalog_cc: InputPath(str),
    gamma_catalog: InputPath(str),
    standard_catalog: InputPath(str),
    data_path: str = "./",
    bucket_name: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = True,
):
    import pandas as pd
    import plotly.graph_objects as go
    import numpy as np
    import json
    import os
    from datetime import datetime
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    with open(config_json, "r") as fp:
        config = json.load(fp)
    def plot3d(x, y, z, fig_name):
        fig = go.Figure(
            data=[
                go.Scatter3d(
                    x=x,
                    y=y,
                    z=z,
                    mode="markers",
                    marker=dict(size=3.0, color=-z, cmin=-60, cmax=2, colorscale="Viridis", opacity=0.8),
                )
            ],
        )
        fig.update_layout(
            scene=dict(
                xaxis=dict(
                    nticks=4,
                    range=config["xlim_degree"],
                ),
                yaxis=dict(
                    nticks=4,
                    range=config["ylim_degree"],
                ),
                zaxis=dict(
                    nticks=4,
                    # range=[z.max(), z.min()],
                    range=[60, -2],
                ),
                #         aspectratio = dict(x=(xlim[1]-xlim[0])/2, y=(ylim[1]-ylim[0])/2, z=1),
                aspectratio=dict(x=1, y=1, z=0.5),
            ),
            margin=dict(r=0, l=0, b=0, t=0),
        )
        fig.write_html(fig_name)
    hypodd_ct_catalog = pd.read_csv(
        hypodd_catalog_ct,
        sep="\s+",
        names=[
            "ID",
            "LAT",
            "LON",
            "DEPTH",
            "X",
            "Y",
            "Z",
            "EX",
            "EY",
            "EZ",
            "YR",
            "MO",
            "DY",
            "HR",
            "MI",
            "SC",
            "MAG",
            "NCCP",
            "NCCS",
            "NCTP",
            "NCTS",
            "RCC",
            "RCT",
            "CID",
        ],
    )
    hypodd_ct_catalog["time"] = hypodd_ct_catalog.apply(
        lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
        axis=1,
    )
    hypodd_ct_catalog["magnitude"] = hypodd_ct_catalog["MAG"]
    hypodd_ct_catalog["time"] = hypodd_ct_catalog["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
    plot3d(
        hypodd_ct_catalog["LON"],
        hypodd_ct_catalog["LAT"],
        hypodd_ct_catalog["DEPTH"],
        f"{data_path}/hypodd_ct_catalog.html",
    )
    gamma_catalog = pd.read_csv(gamma_catalog, parse_dates=["time"])
    gamma_catalog["depth_km"] = gamma_catalog["depth(m)"] / 1e3
    plot3d(
        gamma_catalog["longitude"],
        gamma_catalog["latitude"],
        gamma_catalog["depth_km"],
        f"{data_path}/gamma_catalog.html",
    )
    standard_catalog = pd.read_csv(standard_catalog, parse_dates=["time"])
    standard_catalog["depth_km"] = standard_catalog["depth(m)"] / 1e3
    plot3d(
        standard_catalog["longitude"],
        standard_catalog["latitude"],
        standard_catalog["depth_km"],
        f"{data_path}/standard_catalog.html",
    )
    ## histogram
    bins = 30
    config["starttime"] = datetime.fromisoformat(config["starttime"])
    config["endtime"] = datetime.fromisoformat(config["endtime"])
    fig, ax = plt.subplots()
    ax.hist(gamma_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=1.0, linewidth=0.5, label=f"GaMMA: {len(gamma_catalog)}")
    ax.hist(hypodd_ct_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.8, linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog)}")
    ax.hist(standard_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.6, linewidth=0.5, label=f"Standard: {len(standard_catalog)}")
    # ax.set_xlabel("Date")
    ax.set_ylabel("Frequency")
    ax.autoscale(enable=True, axis='x', tight=True)
    # ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    fig.autofmt_xdate()
    ax.legend()
    fig.savefig(f"{data_path}/earthquake_frequency_time.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_number.pdf", bbox_inches="tight")
    plt.show()
    fig, ax = plt.subplots()
    xrange = (-1., max(standard_catalog["magnitude"].max(), gamma_catalog["magnitude"].max()))
    ax.hist(gamma_catalog["magnitude"], range=xrange, bins=bins, alpha=1.0,  edgecolor="k", linewidth=0.5, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
    ax.hist(hypodd_ct_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6,  edgecolor="k", linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
    ax.hist(standard_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6,  edgecolor="k", linewidth=0.5, label=f"Standard: {len(standard_catalog['magnitude'])}")
    ax.legend()
    ax.autoscale(enable=True, axis='x', tight=True)
    ax.set_xlabel("Magnitude")
    ax.set_ylabel("Frequency")
    ax.set_yscale('log')
    fig.savefig(f"{data_path}/earthquake_magnitude_frequency.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_magnitude_frequency.pdf", bbox_inches="tight")
    plt.show()
    fig, ax = plt.subplots()
    ax.plot(gamma_catalog["time"], gamma_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
    ax.plot(hypodd_ct_catalog["time"], hypodd_ct_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
    ax.plot(standard_catalog["time"], standard_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"Standard: {len(standard_catalog['magnitude'])}")
    ax.set_xlim(config["starttime"], config["endtime"])
    ax.set_ylabel("Magnitude")
    # ax.set_xlabel("Date")
    ax.set_ylim(bottom=-1)
    ax.legend(markerscale=2)
    ax.grid()
    # ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
    fig.autofmt_xdate()
    fig.savefig(f"{data_path}/earthquake_magnitude_time.png", bbox_inches="tight", dpi=300)
    # fig.savefig(f"{data_path}/earthquake_magnitude_time.pdf", bbox_inches="tight", dpi=300)
    plt.show()
    try:
        hypodd_cc_catalog = pd.read_csv(
            hypodd_catalog_cc,
            sep="\s+",
            names=[
                "ID",
                "LAT",
                "LON",
                "DEPTH",
                "X",
                "Y",
                "Z",
                "EX",
                "EY",
                "EZ",
                "YR",
                "MO",
                "DY",
                "HR",
                "MI",
                "SC",
                "MAG",
                "NCCP",
                "NCCS",
                "NCTP",
                "NCTS",
                "RCC",
                "RCT",
                "CID",
            ],
        )
        hypodd_cc_catalog["time"] = hypodd_cc_catalog.apply(
            lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
            axis=1,
        )
        hypodd_cc_catalog["time"] = hypodd_cc_catalog["time"].apply(
            lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f")
        )
        plot3d(
            hypodd_cc_catalog["LON"],
            hypodd_cc_catalog["LAT"],
            hypodd_cc_catalog["DEPTH"],
            f"{data_path}/hypodd_cc_catalog.html",
        )
    except Exception as err:
        print(f"{err}")
    ## upload to s3 bucket
    try:
        from minio import Minio
        minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
        if not minioClient.bucket_exists(bucket_name):
            minioClient.make_bucket(bucket_name)
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/hypodd_ct_catalog.html",
            f"{data_path}/hypodd_ct_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/gamma_catalog.html",
            f"{data_path}/gamma_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/standard_catalog.html",
            f"{data_path}/standard_catalog.html",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_frequency_time.png",
            f"{data_path}/earthquake_frequency_time.png",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_magnitude_frequency.png",
            f"{data_path}/earthquake_magnitude_frequency.png",
        )
        minioClient.fput_object(
            bucket_name,
            f"{config['region']}/earthquake_magnitude_time.png",
            f"{data_path}/earthquake_magnitude_time.png",
        )
        if os.path.exists(f"{data_path}/hypodd_cc_catalog.html"):
            minioClient.fput_object(
                bucket_name,
                f"{config['region']}/hypodd_cc_catalog.html",
                f"{data_path}/hypodd_cc_catalog.html",
            )
    except Exception as err:
        print(f"ERROR: can not access minio service! \n{err}")
        pass
    
        In [ ]:
                Copied!
                
                
            if run_local:
    visulization(
        root_dir("config.json"),
        root_dir("hypodd_ct_catalog.txt"),
        root_dir("hypodd_cc_catalog.txt"),
        root_dir("gamma_catalog.csv"),
        root_dir("standard_catalog.csv"),
        root_dir(""),
    )
if run_local:
    visulization(
        root_dir("config.json"),
        root_dir("hypodd_ct_catalog.txt"),
        root_dir("hypodd_cc_catalog.txt"),
        root_dir("gamma_catalog.csv"),
        root_dir("standard_catalog.csv"),
        root_dir(""),
    )
    
        In [ ]:
                Copied!
                
                
            visulization_op = comp.func_to_container_op(
    visulization,
    base_image="python:3.8",
    packages_to_install=["matplotlib", "pandas", "plotly", "minio"],
)
visulization_op = comp.func_to_container_op(
    visulization,
    base_image="python:3.8",
    packages_to_install=["matplotlib", "pandas", "plotly", "minio"],
)
    
        9. Parallel process on cloud¶
In [ ]:
                Copied!
                
                
            @dsl.pipeline(name="QuakeFlow", description="")
def quakeflow_pipeline(
    data_path: str = "/tmp/",
    num_parallel=0,
    bucket_catalog: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = False,
):
    config = config_op(
        region_name, num_parallel, bucket_name=f"catalogs", s3_url=s3_url, secure=secure
    ).add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
    events = (
        download_events_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_retry(3)
        .set_memory_request("1G")
        .set_display_name("Download Events")
    )
    stations = (
        download_stations_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_retry(3)
        .set_display_name("Download Stations")
    )
    with kfp.dsl.ParallelFor(config.outputs["output"]) as i:
        vop_ = dsl.VolumeOp(
            name=f"Create volume {region_name}",
            resource_name=f"data-volume-{str(i)}",
            size="100Gi",
            modes=dsl.VOLUME_MODE_RWO,
        )
        # ).set_retry(3)
        download_op_ = (
            download_waveform_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                config.outputs["datetime_json"],
                stations.outputs["station_pkl"],
                data_path=data_path,
                bucket_name=f"waveforms",
                s3_url=s3_url,
                secure=secure,
            )
            .add_pvolumes({data_path: vop_.volume})
            .set_cpu_request("700m")
            # .set_cpu_request("350m")
            .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
            .set_retry(5)
            .set_display_name("Download Waveforms")
        )
        download_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        phasenet_op_ = (
            phasenet_op(
                download_op_.outputs["Output"],
                download_op_.outputs["fname_csv"],
                stations.outputs["station_json"],
            )
            .add_pvolumes({data_path: download_op_.pvolume})
            .set_memory_request("9G")
            .set_retry(5)
            .set_display_name("PhaseNet Picking")
        )
        phasenet_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        phasenet_op_.set_image_pull_policy("Always")
        gamma_op_ = (
            gamma_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                phasenet_op_.outputs["picks"],
                stations.outputs["station_json"],
                bucket_name=f"catalogs",
                s3_url=s3_url,
                secure=secure,
            )
            .set_cpu_request("800m")
            .set_retry(3)
            .set_display_name("GaMMA Association")
        )
        gamma_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    merge_op_ = (
        merge_op(
            config.outputs["index_json"],
            config.outputs["config_json"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        )
        .after(gamma_op_)
        .set_memory_request("12G")
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-64g")
        .set_display_name("Merge Catalog")
    )
    merge_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    convert_station_op_ = (
        convert_station_op(
            config_json=config.outputs["config_json"],
            station_json=stations.outputs["station_json"],
            bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_display_name("Convert Station Format")
    )
    split_hypodd_op_ = (
        split_hypodd_op(
            config.outputs["config_json"],
            catalog_csv=merge_op_.outputs["catalog_csv"],
        )
        .after(merge_op_)
        .set_display_name("Split Catalog")
    )
    split_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    split_hypodd_op_.set_image_pull_policy("Always")
    with kfp.dsl.ParallelFor(split_hypodd_op_.outputs["output"]) as i:
        convert_phase_op_ = convert_phase_op(
            i,
            config_json=config.outputs["config_json"],
            picks_csv=merge_op_.outputs["picks_csv"],
            catalog_csv=merge_op_.outputs["catalog_csv"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("Convert Phase Format")
        convert_phase_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        convert_phase_op_.set_image_pull_policy("Always")
        ph2dt_op_ = ph2dt_op(
            i,
            config_json=config.outputs["config_json"],
            hypodd_phase=convert_phase_op_.outputs["hypodd_phase"],
            station_dat=convert_station_op_.outputs["hypodd_station"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("PH2DT")
        ph2dt_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        ph2dt_op_.set_image_pull_policy("Always")
        hypodd_ct_op_ = hypodd_ct_op(
            i,
            config_json=config.outputs["config_json"],
            ct=ph2dt_op_.outputs["ct"],
            event=ph2dt_op_.outputs["hypodd_event"],
            station=convert_station_op_.outputs["hypodd_station"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("HypoDD")
        hypodd_ct_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        hypodd_ct_op_.set_image_pull_policy("Always")
        # cc_op_ = cc_op(
        #     ct=ph2dt_op_.outputs["ct"],
        #     picks=merge_op_.outputs["picks_csv"],
        #     catalog=merge_op_.outputs["catalog_csv"],
        # ).set_display_name('Cross Correlation')
        # cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        # cc_op_.set_image_pull_policy("Always")
        # hypodd_cc_op_ = hypodd_cc_op(
        #     i,
        #     config_json=config.outputs["config_json"],
        #     ct=ph2dt_op_.outputs["ct"],
        #     cc=cc_op_.outputs["cc"],
        #     event=ph2dt_op_.outputs["hypodd_event"],
        #     station=convert_station_op_.outputs["hypodd_station"],
        #     bucket_name="catalogs",
        #     s3_url=s3_url,
        #     secure=secure,
        # ).set_display_name('HypoDD + CC')
        # hypodd_cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        # hypodd_cc_op_.set_image_pull_policy("Always")
    merge_hypodd_op_ = (
        merge_hypodd_op(
            split_hypodd_op_.outputs["output"],
            config_json=config.outputs["config_json"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        ).after(hypodd_ct_op_)
        # .after(hypodd_cc_op_)
        .set_display_name("Merge Catalog")
    )
    merge_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
    merge_hypodd_op_.set_image_pull_policy("Always")
    visulization_op_ = (
        visulization_op(
            config_json=config.outputs["config_json"],
            hypodd_catalog_ct=merge_hypodd_op_.outputs["catalog_ct"],
            hypodd_catalog_cc=merge_hypodd_op_.outputs["catalog_cc"],
            gamma_catalog=merge_op_.outputs["catalog_csv"],
            standard_catalog=events.outputs["standard_catalog"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        )
        .after(merge_hypodd_op_)
        .set_display_name("Visulization")
    )
    visulization_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
    visulization_op_.set_image_pull_policy("Always")
    # vop_.delete().after(merge_hypodd_op_)
@dsl.pipeline(name="QuakeFlow", description="")
def quakeflow_pipeline(
    data_path: str = "/tmp/",
    num_parallel=0,
    bucket_catalog: str = "catalogs",
    s3_url: str = "minio-service:9000",
    secure: bool = False,
):
    config = config_op(
        region_name, num_parallel, bucket_name=f"catalogs", s3_url=s3_url, secure=secure
    ).add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
    events = (
        download_events_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_retry(3)
        .set_memory_request("1G")
        .set_display_name("Download Events")
    )
    stations = (
        download_stations_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_retry(3)
        .set_display_name("Download Stations")
    )
    with kfp.dsl.ParallelFor(config.outputs["output"]) as i:
        vop_ = dsl.VolumeOp(
            name=f"Create volume {region_name}",
            resource_name=f"data-volume-{str(i)}",
            size="100Gi",
            modes=dsl.VOLUME_MODE_RWO,
        )
        # ).set_retry(3)
        download_op_ = (
            download_waveform_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                config.outputs["datetime_json"],
                stations.outputs["station_pkl"],
                data_path=data_path,
                bucket_name=f"waveforms",
                s3_url=s3_url,
                secure=secure,
            )
            .add_pvolumes({data_path: vop_.volume})
            .set_cpu_request("700m")
            # .set_cpu_request("350m")
            .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
            .set_retry(5)
            .set_display_name("Download Waveforms")
        )
        download_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        phasenet_op_ = (
            phasenet_op(
                download_op_.outputs["Output"],
                download_op_.outputs["fname_csv"],
                stations.outputs["station_json"],
            )
            .add_pvolumes({data_path: download_op_.pvolume})
            .set_memory_request("9G")
            .set_retry(5)
            .set_display_name("PhaseNet Picking")
        )
        phasenet_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        phasenet_op_.set_image_pull_policy("Always")
        gamma_op_ = (
            gamma_op(
                i,
                config.outputs["index_json"],
                config.outputs["config_json"],
                phasenet_op_.outputs["picks"],
                stations.outputs["station_json"],
                bucket_name=f"catalogs",
                s3_url=s3_url,
                secure=secure,
            )
            .set_cpu_request("800m")
            .set_retry(3)
            .set_display_name("GaMMA Association")
        )
        gamma_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    merge_op_ = (
        merge_op(
            config.outputs["index_json"],
            config.outputs["config_json"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        )
        .after(gamma_op_)
        .set_memory_request("12G")
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-64g")
        .set_display_name("Merge Catalog")
    )
    merge_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    convert_station_op_ = (
        convert_station_op(
            config_json=config.outputs["config_json"],
            station_json=stations.outputs["station_json"],
            bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
        .add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
        .set_display_name("Convert Station Format")
    )
    split_hypodd_op_ = (
        split_hypodd_op(
            config.outputs["config_json"],
            catalog_csv=merge_op_.outputs["catalog_csv"],
        )
        .after(merge_op_)
        .set_display_name("Split Catalog")
    )
    split_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
    split_hypodd_op_.set_image_pull_policy("Always")
    with kfp.dsl.ParallelFor(split_hypodd_op_.outputs["output"]) as i:
        convert_phase_op_ = convert_phase_op(
            i,
            config_json=config.outputs["config_json"],
            picks_csv=merge_op_.outputs["picks_csv"],
            catalog_csv=merge_op_.outputs["catalog_csv"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("Convert Phase Format")
        convert_phase_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        convert_phase_op_.set_image_pull_policy("Always")
        ph2dt_op_ = ph2dt_op(
            i,
            config_json=config.outputs["config_json"],
            hypodd_phase=convert_phase_op_.outputs["hypodd_phase"],
            station_dat=convert_station_op_.outputs["hypodd_station"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("PH2DT")
        ph2dt_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        ph2dt_op_.set_image_pull_policy("Always")
        hypodd_ct_op_ = hypodd_ct_op(
            i,
            config_json=config.outputs["config_json"],
            ct=ph2dt_op_.outputs["ct"],
            event=ph2dt_op_.outputs["hypodd_event"],
            station=convert_station_op_.outputs["hypodd_station"],
            bucket_name="catalogs",
            s3_url=s3_url,
            secure=secure,
        ).set_display_name("HypoDD")
        hypodd_ct_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        hypodd_ct_op_.set_image_pull_policy("Always")
        # cc_op_ = cc_op(
        #     ct=ph2dt_op_.outputs["ct"],
        #     picks=merge_op_.outputs["picks_csv"],
        #     catalog=merge_op_.outputs["catalog_csv"],
        # ).set_display_name('Cross Correlation')
        # cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        # cc_op_.set_image_pull_policy("Always")
        # hypodd_cc_op_ = hypodd_cc_op(
        #     i,
        #     config_json=config.outputs["config_json"],
        #     ct=ph2dt_op_.outputs["ct"],
        #     cc=cc_op_.outputs["cc"],
        #     event=ph2dt_op_.outputs["hypodd_event"],
        #     station=convert_station_op_.outputs["hypodd_station"],
        #     bucket_name="catalogs",
        #     s3_url=s3_url,
        #     secure=secure,
        # ).set_display_name('HypoDD + CC')
        # hypodd_cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
        # hypodd_cc_op_.set_image_pull_policy("Always")
    merge_hypodd_op_ = (
        merge_hypodd_op(
            split_hypodd_op_.outputs["output"],
            config_json=config.outputs["config_json"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        ).after(hypodd_ct_op_)
        # .after(hypodd_cc_op_)
        .set_display_name("Merge Catalog")
    )
    merge_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
    merge_hypodd_op_.set_image_pull_policy("Always")
    visulization_op_ = (
        visulization_op(
            config_json=config.outputs["config_json"],
            hypodd_catalog_ct=merge_hypodd_op_.outputs["catalog_ct"],
            hypodd_catalog_cc=merge_hypodd_op_.outputs["catalog_cc"],
            gamma_catalog=merge_op_.outputs["catalog_csv"],
            standard_catalog=events.outputs["standard_catalog"],
            bucket_name=f"catalogs",
            s3_url=s3_url,
            secure=secure,
        )
        .after(merge_hypodd_op_)
        .set_display_name("Visulization")
    )
    visulization_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
    visulization_op_.set_image_pull_policy("Always")
    # vop_.delete().after(merge_hypodd_op_)
    
        In [ ]:
                Copied!
                
                
            import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json"
experiment_name = "QuakeFlow"
pipeline_func = quakeflow_pipeline
run_name = f"{pipeline_func.__name__}_{region_name.lower()}"
arguments = {
    "data_path": "/tmp",
    "num_parallel": 0,
    "bucket_catalog": "catalogs",
    "s3_url": "minio-service:9000",
    "secure": False,
}
if not run_local:
    pipeline_conf = kfp.dsl.PipelineConf()
    pipeline_conf.set_image_pull_policy("Always")
    pipeline_conf.ttl_seconds_after_finished = 60 * 10
    client = kfp.Client(host="670838cea8a18858-dot-us-west1.pipelines.googleusercontent.com")
    # client = kfp.Client(host="http://localhost:8080")
    kfp.compiler.Compiler().compile(pipeline_func, "{}.zip".format(experiment_name), pipeline_conf=pipeline_conf)
    results = client.create_run_from_pipeline_func(
        pipeline_func,
        experiment_name=experiment_name,
        run_name=run_name,
        arguments=arguments,
    )
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json"
experiment_name = "QuakeFlow"
pipeline_func = quakeflow_pipeline
run_name = f"{pipeline_func.__name__}_{region_name.lower()}"
arguments = {
    "data_path": "/tmp",
    "num_parallel": 0,
    "bucket_catalog": "catalogs",
    "s3_url": "minio-service:9000",
    "secure": False,
}
if not run_local:
    pipeline_conf = kfp.dsl.PipelineConf()
    pipeline_conf.set_image_pull_policy("Always")
    pipeline_conf.ttl_seconds_after_finished = 60 * 10
    client = kfp.Client(host="670838cea8a18858-dot-us-west1.pipelines.googleusercontent.com")
    # client = kfp.Client(host="http://localhost:8080")
    kfp.compiler.Compiler().compile(pipeline_func, "{}.zip".format(experiment_name), pipeline_conf=pipeline_conf)
    results = client.create_run_from_pipeline_func(
        pipeline_func,
        experiment_name=experiment_name,
        run_name=run_name,
        arguments=arguments,
    )
    
        In [ ]:
                Copied!