Earthquake Detection Workflow using QuakeFlow¶
Outline¶
Here we show an example of the current modules in QuakeFlow
Download data using Obpsy:
PhaseNet for picking P/S phases
Find more details in PhaseNet github page
GaMMA for associating picking and estimate approximate location and magnitude
Find more details in GaMMA github page
Earthquake location, magnitude estimation, etc.
git clone https://github.com/wayneweiqiang/PhaseNet.git
git clone https://github.com/wayneweiqiang/GaMMA.git
conda env update -f=env.yml -n base
Second option: install to quakeflow environment, but need to select jupyter notebook kernel to quakeflow
conda env create -f=env.yml -n quakeflow
python -m ipykernel install --user --name=quakeflow
In [19]:
Copied!
import warnings
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
warnings.filterwarnings("ignore")
import warnings
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
warnings.filterwarnings("ignore")
1. Set configurations¶
In [20]:
Copied!
import os
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("agg")
%matplotlib inline
region_name = "Demo"
# region_name = "Ridgecrest"
# region_name = "SaltonSea"
# region_name = "Ridgecrest"
# region_name = "SanSimeon"
# region_name = "Italy"
# region_name = "PNSN"
# region_name = "Hawaii2"
# region_name = "SierraNegra"
# region_name = "PuertoRico"
# region_name = "SmithValley"
# region_name = "Antilles"
# region_name = "LongValley"
dir_name = region_name
if not os.path.exists(dir_name):
os.mkdir(dir_name)
root_dir = lambda x: os.path.join(dir_name, x)
run_local = True
import os
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("agg")
%matplotlib inline
region_name = "Demo"
# region_name = "Ridgecrest"
# region_name = "SaltonSea"
# region_name = "Ridgecrest"
# region_name = "SanSimeon"
# region_name = "Italy"
# region_name = "PNSN"
# region_name = "Hawaii2"
# region_name = "SierraNegra"
# region_name = "PuertoRico"
# region_name = "SmithValley"
# region_name = "Antilles"
# region_name = "LongValley"
dir_name = region_name
if not os.path.exists(dir_name):
os.mkdir(dir_name)
root_dir = lambda x: os.path.join(dir_name, x)
run_local = True
In [21]:
Copied!
def set_config(
region_name,
index_json: OutputPath("json"),
config_json: OutputPath("json"),
datetime_json: OutputPath("json"),
num_parallel: int = 1,
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> list:
import datetime
import json
import os
import pickle
import numpy as np
import obspy
degree2km = np.pi * 6371 / 180
if region_name == "Demo":
center = (-117.504, 35.705)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2019-07-04T17")
endtime = obspy.UTCDateTime("2019-07-04T19")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Ridgecrest":
center = (-117.504, 35.705)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2019-07-04T00")
endtime = obspy.UTCDateTime("2019-07-10T00")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Hawaii":
center = (-155.32, 19.39)
horizontal_degree = 2.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2018-01-01T00")
endtime = obspy.UTCDateTime("2022-08-12T00")
client = "IRIS"
network_list = ["HV", "PT"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Hawaii2":
center = (-155.32, 19.39)
horizontal_degree = 2.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2022-08-01T00")
endtime = obspy.UTCDateTime("2022-08-16T00")
client = "IRIS"
network_list = ["HV", "PT"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "PuertoRico":
center = (-66.5, 18)
horizontal_degree = 3.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2018-05-01T00")
endtime = obspy.UTCDateTime("2021-11-01T00")
client = "IRIS"
network_list = ["*"]
channel_list = "HH*,BH*,HN*"
if region_name == "SaltonSea":
center = (-115.53, 32.98)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2020-10-01T00")
endtime = obspy.UTCDateTime("2020-10-01T02")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "2003SanSimeon":
center = (-121.101, 35.701)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2003-12-22T00")
endtime = obspy.UTCDateTime("2003-12-24T00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Italy":
center = (13.188, 42.723)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2016-08-24T00")
endtime = obspy.UTCDateTime("2016-08-26T00")
client = "INGV"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "SmithValley":
center = (-119.5, 38.51)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2021-07-08T00:00")
endtime = obspy.UTCDateTime("2021-07-16T00:00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Antilles":
center = (-61.14867, 14.79683)
horizontal_degree = 0.2
vertical_degree = 0.2
starttime = obspy.UTCDateTime("2021-04-10T00")
endtime = obspy.UTCDateTime("2021-04-15T00")
client = "RESIF"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "LongValley":
# center = (-118.8, 37.6+0.3)
# horizontal_degree = 3.0
# vertical_degree = 3.0
center = (-118.8 - 0.1, 37.6)
horizontal_degree = 1.5
vertical_degree = 1.5
starttime = obspy.UTCDateTime("2020-01-01T00")
endtime = obspy.UTCDateTime("2022-08-11T00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,EH*"
if region_name == "SierraNegra":
center = (-91.13, -0.81)
horizontal_degree = 1.5
vertical_degree = 1.5
starttime = obspy.UTCDateTime("2009-07-22T00")
endtime = obspy.UTCDateTime("2011-06-20T00")
client = "IRIS"
network_list = ["*"]
channel_list = "BH*,HH*,EH*"
####### save config ########
config = {}
config["region"] = region_name
config["center"] = center
config["xlim_degree"] = [
center[0] - horizontal_degree / 2,
center[0] + horizontal_degree / 2,
]
config["ylim_degree"] = [
center[1] - vertical_degree / 2,
center[1] + vertical_degree / 2,
]
config["min_longitude"] = center[0] - horizontal_degree / 2
config["max_longitude"] = center[0] + horizontal_degree / 2
config["min_latitude"] = center[1] - vertical_degree / 2
config["max_latitude"] = center[1] + vertical_degree / 2
config["degree2km"] = degree2km
config["starttime"] = starttime.datetime.isoformat(timespec="milliseconds")
config["endtime"] = endtime.datetime.isoformat(timespec="milliseconds")
config["networks"] = network_list
config["channels"] = channel_list
config["client"] = client
## PhaseNet
config["phasenet"] = {}
## GaMMA
config["gamma"] = {}
## HypoDD
config["hypodd"] = {"MAXEVENT": 1e4}
with open(config_json, "w") as fp:
json.dump(config, fp, indent=2)
print(json.dumps(config, indent=4))
####### set paraell for cloud ########
## split data by hours
# one_day = datetime.timedelta(days=1)
one_hour = datetime.timedelta(hours=1)
starttimes = []
tmp_start = starttime
while tmp_start < endtime:
starttimes.append(tmp_start.datetime.isoformat(timespec="milliseconds"))
tmp_start += one_hour
with open(datetime_json, "w") as fp:
json.dump(
{"starttimes": starttimes, "interval": one_hour.total_seconds()},
fp,
indent=2,
)
## split stattimes into N parallel process
if num_parallel == 0:
num_parallel = min(60, int((len(starttimes) - 1) // 6 + 1))
# num_parallel = min(30, int((len(starttimes)-1)//6+1))
# num_parallel = min(24, len(starttimes))
idx = [x.tolist() for x in np.array_split(np.arange(len(starttimes)), num_parallel)]
with open(index_json, "w") as fp:
json.dump(idx, fp, indent=2)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/config.json",
config_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/datetime.json",
datetime_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/index.json",
index_json,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
return list(range(num_parallel))
def set_config(
region_name,
index_json: OutputPath("json"),
config_json: OutputPath("json"),
datetime_json: OutputPath("json"),
num_parallel: int = 1,
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> list:
import datetime
import json
import os
import pickle
import numpy as np
import obspy
degree2km = np.pi * 6371 / 180
if region_name == "Demo":
center = (-117.504, 35.705)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2019-07-04T17")
endtime = obspy.UTCDateTime("2019-07-04T19")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Ridgecrest":
center = (-117.504, 35.705)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2019-07-04T00")
endtime = obspy.UTCDateTime("2019-07-10T00")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Hawaii":
center = (-155.32, 19.39)
horizontal_degree = 2.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2018-01-01T00")
endtime = obspy.UTCDateTime("2022-08-12T00")
client = "IRIS"
network_list = ["HV", "PT"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Hawaii2":
center = (-155.32, 19.39)
horizontal_degree = 2.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2022-08-01T00")
endtime = obspy.UTCDateTime("2022-08-16T00")
client = "IRIS"
network_list = ["HV", "PT"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "PuertoRico":
center = (-66.5, 18)
horizontal_degree = 3.0
vertical_degree = 2.0
starttime = obspy.UTCDateTime("2018-05-01T00")
endtime = obspy.UTCDateTime("2021-11-01T00")
client = "IRIS"
network_list = ["*"]
channel_list = "HH*,BH*,HN*"
if region_name == "SaltonSea":
center = (-115.53, 32.98)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2020-10-01T00")
endtime = obspy.UTCDateTime("2020-10-01T02")
client = "SCEDC"
network_list = ["CI"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "2003SanSimeon":
center = (-121.101, 35.701)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2003-12-22T00")
endtime = obspy.UTCDateTime("2003-12-24T00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Italy":
center = (13.188, 42.723)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2016-08-24T00")
endtime = obspy.UTCDateTime("2016-08-26T00")
client = "INGV"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "SmithValley":
center = (-119.5, 38.51)
horizontal_degree = 1.0
vertical_degree = 1.0
starttime = obspy.UTCDateTime("2021-07-08T00:00")
endtime = obspy.UTCDateTime("2021-07-16T00:00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "Antilles":
center = (-61.14867, 14.79683)
horizontal_degree = 0.2
vertical_degree = 0.2
starttime = obspy.UTCDateTime("2021-04-10T00")
endtime = obspy.UTCDateTime("2021-04-15T00")
client = "RESIF"
network_list = ["*"]
channel_list = "HH*,BH*,EH*,HN*"
if region_name == "LongValley":
# center = (-118.8, 37.6+0.3)
# horizontal_degree = 3.0
# vertical_degree = 3.0
center = (-118.8 - 0.1, 37.6)
horizontal_degree = 1.5
vertical_degree = 1.5
starttime = obspy.UTCDateTime("2020-01-01T00")
endtime = obspy.UTCDateTime("2022-08-11T00")
client = "NCEDC"
network_list = ["*"]
channel_list = "HH*,EH*"
if region_name == "SierraNegra":
center = (-91.13, -0.81)
horizontal_degree = 1.5
vertical_degree = 1.5
starttime = obspy.UTCDateTime("2009-07-22T00")
endtime = obspy.UTCDateTime("2011-06-20T00")
client = "IRIS"
network_list = ["*"]
channel_list = "BH*,HH*,EH*"
####### save config ########
config = {}
config["region"] = region_name
config["center"] = center
config["xlim_degree"] = [
center[0] - horizontal_degree / 2,
center[0] + horizontal_degree / 2,
]
config["ylim_degree"] = [
center[1] - vertical_degree / 2,
center[1] + vertical_degree / 2,
]
config["min_longitude"] = center[0] - horizontal_degree / 2
config["max_longitude"] = center[0] + horizontal_degree / 2
config["min_latitude"] = center[1] - vertical_degree / 2
config["max_latitude"] = center[1] + vertical_degree / 2
config["degree2km"] = degree2km
config["starttime"] = starttime.datetime.isoformat(timespec="milliseconds")
config["endtime"] = endtime.datetime.isoformat(timespec="milliseconds")
config["networks"] = network_list
config["channels"] = channel_list
config["client"] = client
## PhaseNet
config["phasenet"] = {}
## GaMMA
config["gamma"] = {}
## HypoDD
config["hypodd"] = {"MAXEVENT": 1e4}
with open(config_json, "w") as fp:
json.dump(config, fp, indent=2)
print(json.dumps(config, indent=4))
####### set paraell for cloud ########
## split data by hours
# one_day = datetime.timedelta(days=1)
one_hour = datetime.timedelta(hours=1)
starttimes = []
tmp_start = starttime
while tmp_start < endtime:
starttimes.append(tmp_start.datetime.isoformat(timespec="milliseconds"))
tmp_start += one_hour
with open(datetime_json, "w") as fp:
json.dump(
{"starttimes": starttimes, "interval": one_hour.total_seconds()},
fp,
indent=2,
)
## split stattimes into N parallel process
if num_parallel == 0:
num_parallel = min(60, int((len(starttimes) - 1) // 6 + 1))
# num_parallel = min(30, int((len(starttimes)-1)//6+1))
# num_parallel = min(24, len(starttimes))
idx = [x.tolist() for x in np.array_split(np.arange(len(starttimes)), num_parallel)]
with open(index_json, "w") as fp:
json.dump(idx, fp, indent=2)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/config.json",
config_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/datetime.json",
datetime_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/index.json",
index_json,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
return list(range(num_parallel))
In [22]:
Copied!
if run_local:
idx = set_config(
region_name,
root_dir("index.json"),
root_dir("config.json"),
root_dir("datetimes.json"),
num_parallel=1,
)
if run_local:
idx = set_config(
region_name,
root_dir("index.json"),
root_dir("config.json"),
root_dir("datetimes.json"),
num_parallel=1,
)
{ "region": "Demo", "center": [ -117.504, 35.705 ], "xlim_degree": [ -118.004, -117.004 ], "ylim_degree": [ 35.205, 36.205 ], "min_longitude": -118.004, "max_longitude": -117.004, "min_latitude": 35.205, "max_latitude": 36.205, "degree2km": 111.19492664455873, "starttime": "2019-07-04T17:00:00.000", "endtime": "2019-07-04T19:00:00.000", "networks": [ "CI" ], "channels": "HH*,BH*,EH*,HN*", "client": "SCEDC", "phasenet": {}, "gamma": {}, "hypodd": { "MAXEVENT": 10000.0 } } ERROR: can not access minio service! No module named 'minio'
In [23]:
Copied!
config_op = comp.func_to_container_op(
set_config,
base_image="python:3.8",
packages_to_install=["numpy", "obspy", "minio"],
)
config_op = comp.func_to_container_op(
set_config,
base_image="python:3.8",
packages_to_install=["numpy", "obspy", "minio"],
)
2. Download events of the standard catalog¶
This catalog is not used by QuakeFolow. It is only used for comparing detection results.
In [24]:
Copied!
def download_events(
config_json: InputPath("json"),
standard_catalog: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
import pickle
from collections import defaultdict
import matplotlib
import matplotlib.pyplot as plt
import obspy
import pandas as pd
from obspy.clients.fdsn import Client
# matplotlib.use("agg")
# %matplotlib inline
with open(config_json, "r") as fp:
config = json.load(fp)
####### IRIS catalog ########
try:
events = Client(config["client"]).get_events(
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
# filename='events.xml',
)
except:
events = Client("iris").get_events(
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
# filename='events.xml',
)
# events = obspy.read_events('events.xml')
print(f"Number of events: {len(events)}")
# events.plot('local', outfile="events.png")
# events.plot('local')
####### Save catalog ########
catalog = defaultdict(list)
for event in events:
if len(event.magnitudes) > 0:
catalog["time"].append(event.origins[0].time.datetime)
catalog["magnitude"].append(event.magnitudes[0].mag)
catalog["longitude"].append(event.origins[0].longitude)
catalog["latitude"].append(event.origins[0].latitude)
catalog["depth(m)"].append(event.origins[0].depth)
catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
catalog.to_csv(
standard_catalog,
# sep="\t",
index=False,
float_format="%.3f",
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=["time", "magnitude", "longitude", "latitude", "depth(m)"],
)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/standard_catalog.csv",
standard_catalog,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
####### Plot catalog ########
plt.figure()
plt.plot(catalog["longitude"], catalog["latitude"], ".", markersize=1)
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.axis("scaled")
plt.xlim(config["xlim_degree"])
plt.ylim(config["ylim_degree"])
# plt.savefig(os.path.join(data_path, "events_loc.png"))
plt.show()
plt.figure()
plt.plot_date(catalog["time"], catalog["magnitude"], ".", markersize=1)
plt.gcf().autofmt_xdate()
plt.ylabel("Magnitude")
plt.title(f"Number of events: {len(events)}")
# plt.savefig(os.path.join(data_path, "events_mag_time.png"))
plt.show()
def download_events(
config_json: InputPath("json"),
standard_catalog: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
import pickle
from collections import defaultdict
import matplotlib
import matplotlib.pyplot as plt
import obspy
import pandas as pd
from obspy.clients.fdsn import Client
# matplotlib.use("agg")
# %matplotlib inline
with open(config_json, "r") as fp:
config = json.load(fp)
####### IRIS catalog ########
try:
events = Client(config["client"]).get_events(
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
# filename='events.xml',
)
except:
events = Client("iris").get_events(
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
# filename='events.xml',
)
# events = obspy.read_events('events.xml')
print(f"Number of events: {len(events)}")
# events.plot('local', outfile="events.png")
# events.plot('local')
####### Save catalog ########
catalog = defaultdict(list)
for event in events:
if len(event.magnitudes) > 0:
catalog["time"].append(event.origins[0].time.datetime)
catalog["magnitude"].append(event.magnitudes[0].mag)
catalog["longitude"].append(event.origins[0].longitude)
catalog["latitude"].append(event.origins[0].latitude)
catalog["depth(m)"].append(event.origins[0].depth)
catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
catalog.to_csv(
standard_catalog,
# sep="\t",
index=False,
float_format="%.3f",
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=["time", "magnitude", "longitude", "latitude", "depth(m)"],
)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/standard_catalog.csv",
standard_catalog,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
####### Plot catalog ########
plt.figure()
plt.plot(catalog["longitude"], catalog["latitude"], ".", markersize=1)
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.axis("scaled")
plt.xlim(config["xlim_degree"])
plt.ylim(config["ylim_degree"])
# plt.savefig(os.path.join(data_path, "events_loc.png"))
plt.show()
plt.figure()
plt.plot_date(catalog["time"], catalog["magnitude"], ".", markersize=1)
plt.gcf().autofmt_xdate()
plt.ylabel("Magnitude")
plt.title(f"Number of events: {len(events)}")
# plt.savefig(os.path.join(data_path, "events_mag_time.png"))
plt.show()
In [25]:
Copied!
if run_local:
download_events(root_dir("config.json"), root_dir("standard_catalog.csv"))
if run_local:
download_events(root_dir("config.json"), root_dir("standard_catalog.csv"))
Number of events: 252 ERROR: can not access minio service! No module named 'minio'
In [26]:
Copied!
download_events_op = comp.func_to_container_op(
download_events,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"obspy",
"pandas",
"matplotlib",
"minio",
],
)
download_events_op = comp.func_to_container_op(
download_events,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"obspy",
"pandas",
"matplotlib",
"minio",
],
)
3. Download stations¶
In [27]:
Copied!
def download_stations(
config_json: InputPath("json"),
station_json: OutputPath("json"),
station_pkl: OutputPath("pickle"),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
import pickle
from collections import defaultdict
import matplotlib
import matplotlib.pyplot as plt
import obspy
import pandas as pd
from obspy.clients.fdsn import Client
# matplotlib.use("agg")
# %matplotlib inline
with open(config_json, "r") as fp:
config = json.load(fp)
print("Network:", ",".join(config["networks"]))
####### Download stations ########
stations = Client(config["client"]).get_stations(
network=",".join(config["networks"]),
station="*",
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
channel=config["channels"],
level="response",
# filename="stations.xml"
)
# stations = obspy.read_inventory("stations.xml")
print("Number of stations: {}".format(sum([len(x) for x in stations])))
# stations.plot('local', outfile="stations.png")
# stations.plot('local')
####### Save stations ########
station_locs = defaultdict(dict)
for network in stations:
for station in network:
for chn in station:
sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
if sid in station_locs:
if chn.code[-1] not in station_locs[sid]["component"]:
station_locs[sid]["component"].append(chn.code[-1])
station_locs[sid]["response"].append(round(chn.response.instrument_sensitivity.value, 2))
else:
tmp_dict = {
"longitude": chn.longitude,
"latitude": chn.latitude,
"elevation(m)": chn.elevation,
"component": [
chn.code[-1],
],
"response": [
round(chn.response.instrument_sensitivity.value, 2),
],
"unit": chn.response.instrument_sensitivity.input_units.lower(),
}
station_locs[sid] = tmp_dict
with open(station_json, "w") as fp:
json.dump(station_locs, fp, indent=2)
with open(station_pkl, "wb") as fp:
pickle.dump(stations, fp)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/stations.json",
station_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/stations.pkl",
station_pkl,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
######## Plot stations ########
station_locs = pd.DataFrame.from_dict(station_locs, orient="index")
plt.figure()
plt.plot(station_locs["longitude"], station_locs["latitude"], "^", label="Stations")
plt.xlabel("X (km)")
plt.ylabel("Y (km)")
plt.axis("scaled")
plt.xlim(config["xlim_degree"])
plt.ylim(config["ylim_degree"])
plt.legend()
plt.title(f"Number of stations: {len(station_locs)}")
# plt.savefig(os.path.join(data_path, "stations_loc.png"))
plt.show()
def download_stations(
config_json: InputPath("json"),
station_json: OutputPath("json"),
station_pkl: OutputPath("pickle"),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
import pickle
from collections import defaultdict
import matplotlib
import matplotlib.pyplot as plt
import obspy
import pandas as pd
from obspy.clients.fdsn import Client
# matplotlib.use("agg")
# %matplotlib inline
with open(config_json, "r") as fp:
config = json.load(fp)
print("Network:", ",".join(config["networks"]))
####### Download stations ########
stations = Client(config["client"]).get_stations(
network=",".join(config["networks"]),
station="*",
starttime=config["starttime"],
endtime=config["endtime"],
minlongitude=config["xlim_degree"][0],
maxlongitude=config["xlim_degree"][1],
minlatitude=config["ylim_degree"][0],
maxlatitude=config["ylim_degree"][1],
channel=config["channels"],
level="response",
# filename="stations.xml"
)
# stations = obspy.read_inventory("stations.xml")
print("Number of stations: {}".format(sum([len(x) for x in stations])))
# stations.plot('local', outfile="stations.png")
# stations.plot('local')
####### Save stations ########
station_locs = defaultdict(dict)
for network in stations:
for station in network:
for chn in station:
sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
if sid in station_locs:
if chn.code[-1] not in station_locs[sid]["component"]:
station_locs[sid]["component"].append(chn.code[-1])
station_locs[sid]["response"].append(round(chn.response.instrument_sensitivity.value, 2))
else:
tmp_dict = {
"longitude": chn.longitude,
"latitude": chn.latitude,
"elevation(m)": chn.elevation,
"component": [
chn.code[-1],
],
"response": [
round(chn.response.instrument_sensitivity.value, 2),
],
"unit": chn.response.instrument_sensitivity.input_units.lower(),
}
station_locs[sid] = tmp_dict
with open(station_json, "w") as fp:
json.dump(station_locs, fp, indent=2)
with open(station_pkl, "wb") as fp:
pickle.dump(stations, fp)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/stations.json",
station_json,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/stations.pkl",
station_pkl,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
######## Plot stations ########
station_locs = pd.DataFrame.from_dict(station_locs, orient="index")
plt.figure()
plt.plot(station_locs["longitude"], station_locs["latitude"], "^", label="Stations")
plt.xlabel("X (km)")
plt.ylabel("Y (km)")
plt.axis("scaled")
plt.xlim(config["xlim_degree"])
plt.ylim(config["ylim_degree"])
plt.legend()
plt.title(f"Number of stations: {len(station_locs)}")
# plt.savefig(os.path.join(data_path, "stations_loc.png"))
plt.show()
In [28]:
Copied!
if run_local:
download_stations(root_dir("config.json"), root_dir("stations.json"), root_dir("stations.pkl"))
if run_local:
download_stations(root_dir("config.json"), root_dir("stations.json"), root_dir("stations.pkl"))
Network: CI Number of stations: 17 ERROR: can not access minio service! No module named 'minio'
In [29]:
Copied!
download_stations_op = comp.func_to_container_op(
download_stations,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"obspy",
"pandas",
"matplotlib",
"minio",
],
)
download_stations_op = comp.func_to_container_op(
download_stations,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"obspy",
"pandas",
"matplotlib",
"minio",
],
)
4. Download waveform data¶
In [30]:
Copied!
def download_waveform(
node_i: int,
index_json: InputPath("json"),
config_json: InputPath("json"),
datetime_json: InputPath("json"),
station_pkl: InputPath("pickle"),
fname_csv: OutputPath(str),
data_path: str,
bucket_name: str = "waveforms",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> str:
import json
import os
import pickle
import random
import threading
import time
import obspy
from obspy.clients.fdsn import Client
lock = threading.Lock()
upload_minio = False
# try:
# from minio import Minio
# minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
# if not minioClient.bucket_exists(bucket_name):
# minioClient.make_bucket(bucket_name)
# upload_minio = True
# except Exception as err:
# # print(f"ERROR: can not access minio service! \n{err}")
# pass
with open(index_json, "r") as fp:
index = json.load(fp)
idx = index[node_i]
with open(config_json, "r") as fp:
config = json.load(fp)
with open(datetime_json, "r") as fp:
tmp = json.load(fp)
starttimes = tmp["starttimes"]
interval = tmp["interval"]
with open(station_pkl, "rb") as fp:
stations = pickle.load(fp)
waveform_dir = os.path.join(data_path, config["region"], "waveforms")
if not os.path.exists(waveform_dir):
os.makedirs(waveform_dir)
####### Download data ########
client = Client(config["client"])
fname_list = ["fname"]
def download(i):
# for i in idx:
starttime = obspy.UTCDateTime(starttimes[i])
endtime = starttime + interval
fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
if not upload_minio:
if os.path.exists(os.path.join(waveform_dir, fname)):
print(f"{fname} exists")
fname_list.append(fname)
return
else:
try:
minioClient.fget_object(
bucket_name,
os.path.join(config["region"], fname),
os.path.join(waveform_dir, fname),
)
print(
f"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}"
)
fname_list.append(fname)
return
except Exception as err:
print(err)
max_retry = 10
stream = obspy.Stream()
print(f"{fname} download starts")
num_sta = 0
for network in stations:
for station in network:
print(f"********{network.code}.{station.code}********")
retry = 0
while retry < max_retry:
try:
tmp = client.get_waveforms(
network.code,
station.code,
"*",
config["channels"],
starttime,
endtime,
)
# for trace in tmp:
# if trace.stats.sampling_rate != 100:
# print(trace)
# trace = trace.interpolate(100, method="linear")
# trace = trace.detrend("spline", order=2, dspline=5*trace.stats.sampling_rate)
# stream.append(trace)
stream += tmp
num_sta += len(tmp)
break
except Exception as err:
print("Error {}.{}: {}".format(network.code, station.code, err))
message = "No data available for request."
if str(err)[: len(message)] == message:
break
retry += 1
time.sleep(5)
continue
if retry == max_retry:
print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}")
if len(stream) > 0:
# stream = stream.merge(fill_value=0)
# stream = stream.trim(starttime, endtime, pad=True, fill_value=0)
stream.write(os.path.join(waveform_dir, fname))
print(f"{fname} download succeeds")
# if upload_minio:
# minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
# print(f"{fname} upload to minio {os.path.join(config['region'], fname)}")
else:
print(f"{fname} empty data")
lock.acquire()
fname_list.append(fname)
lock.release()
threads = []
MAX_THREADS = 2
# MAX_THREADS = 1
for ii, i in enumerate(idx):
t = threading.Thread(target=download, args=(i,))
t.start()
time.sleep(1)
threads.append(t)
if ii % MAX_THREADS == MAX_THREADS - 1:
for t in threads:
t.join()
threads = []
for t in threads:
t.join()
with open(fname_csv, "w") as fp:
fp.write("\n".join(fname_list))
return waveform_dir
def download_waveform(
node_i: int,
index_json: InputPath("json"),
config_json: InputPath("json"),
datetime_json: InputPath("json"),
station_pkl: InputPath("pickle"),
fname_csv: OutputPath(str),
data_path: str,
bucket_name: str = "waveforms",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> str:
import json
import os
import pickle
import random
import threading
import time
import obspy
from obspy.clients.fdsn import Client
lock = threading.Lock()
upload_minio = False
# try:
# from minio import Minio
# minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
# if not minioClient.bucket_exists(bucket_name):
# minioClient.make_bucket(bucket_name)
# upload_minio = True
# except Exception as err:
# # print(f"ERROR: can not access minio service! \n{err}")
# pass
with open(index_json, "r") as fp:
index = json.load(fp)
idx = index[node_i]
with open(config_json, "r") as fp:
config = json.load(fp)
with open(datetime_json, "r") as fp:
tmp = json.load(fp)
starttimes = tmp["starttimes"]
interval = tmp["interval"]
with open(station_pkl, "rb") as fp:
stations = pickle.load(fp)
waveform_dir = os.path.join(data_path, config["region"], "waveforms")
if not os.path.exists(waveform_dir):
os.makedirs(waveform_dir)
####### Download data ########
client = Client(config["client"])
fname_list = ["fname"]
def download(i):
# for i in idx:
starttime = obspy.UTCDateTime(starttimes[i])
endtime = starttime + interval
fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
if not upload_minio:
if os.path.exists(os.path.join(waveform_dir, fname)):
print(f"{fname} exists")
fname_list.append(fname)
return
else:
try:
minioClient.fget_object(
bucket_name,
os.path.join(config["region"], fname),
os.path.join(waveform_dir, fname),
)
print(
f"{bucket_name}/{os.path.join(config['region'], fname)} download to {os.path.join(waveform_dir, fname)}"
)
fname_list.append(fname)
return
except Exception as err:
print(err)
max_retry = 10
stream = obspy.Stream()
print(f"{fname} download starts")
num_sta = 0
for network in stations:
for station in network:
print(f"********{network.code}.{station.code}********")
retry = 0
while retry < max_retry:
try:
tmp = client.get_waveforms(
network.code,
station.code,
"*",
config["channels"],
starttime,
endtime,
)
# for trace in tmp:
# if trace.stats.sampling_rate != 100:
# print(trace)
# trace = trace.interpolate(100, method="linear")
# trace = trace.detrend("spline", order=2, dspline=5*trace.stats.sampling_rate)
# stream.append(trace)
stream += tmp
num_sta += len(tmp)
break
except Exception as err:
print("Error {}.{}: {}".format(network.code, station.code, err))
message = "No data available for request."
if str(err)[: len(message)] == message:
break
retry += 1
time.sleep(5)
continue
if retry == max_retry:
print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code}")
if len(stream) > 0:
# stream = stream.merge(fill_value=0)
# stream = stream.trim(starttime, endtime, pad=True, fill_value=0)
stream.write(os.path.join(waveform_dir, fname))
print(f"{fname} download succeeds")
# if upload_minio:
# minioClient.fput_object(bucket_name, os.path.join(config['region'], fname), os.path.join(waveform_dir, fname))
# print(f"{fname} upload to minio {os.path.join(config['region'], fname)}")
else:
print(f"{fname} empty data")
lock.acquire()
fname_list.append(fname)
lock.release()
threads = []
MAX_THREADS = 2
# MAX_THREADS = 1
for ii, i in enumerate(idx):
t = threading.Thread(target=download, args=(i,))
t.start()
time.sleep(1)
threads.append(t)
if ii % MAX_THREADS == MAX_THREADS - 1:
for t in threads:
t.join()
threads = []
for t in threads:
t.join()
with open(fname_csv, "w") as fp:
fp.write("\n".join(fname_list))
return waveform_dir
In [31]:
Copied!
if run_local:
waveform_path = download_waveform(
0,
root_dir("index.json"),
root_dir("config.json"),
root_dir("datetimes.json"),
root_dir("stations.pkl"),
root_dir("fname.csv"),
data_path=root_dir(""),
)
if run_local:
waveform_path = download_waveform(
0,
root_dir("index.json"),
root_dir("config.json"),
root_dir("datetimes.json"),
root_dir("stations.pkl"),
root_dir("fname.csv"),
data_path=root_dir(""),
)
2019-07-04T17:00:00.mseed exists 2019-07-04T18:00:00.mseed exists
In [32]:
Copied!
download_waveform_op = comp.func_to_container_op(
download_waveform,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=["obspy", "minio"],
)
download_waveform_op = comp.func_to_container_op(
download_waveform,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=["obspy", "minio"],
)
5. Run PhaseNet to pick P/S picks¶
In [33]:
Copied!
def phasenet_op(data_path: str, data_list: str, stations: str):
return dsl.ContainerOp(
name="PhaseNet Picking",
image="zhuwq0/phasenet-api:1.0",
command=["python"],
arguments=[
"phasenet/predict.py",
"--model",
"model/190703-214543",
"--data_dir",
data_path,
"--data_list",
dsl.InputArgumentPath(data_list),
"--stations",
dsl.InputArgumentPath(stations),
# '--result_dir', "results",
"--format",
"mseed_array",
"--amplitude",
],
file_outputs={"picks": "/opt/results/picks.csv"},
)
def phasenet_op(data_path: str, data_list: str, stations: str):
return dsl.ContainerOp(
name="PhaseNet Picking",
image="zhuwq0/phasenet-api:1.0",
command=["python"],
arguments=[
"phasenet/predict.py",
"--model",
"model/190703-214543",
"--data_dir",
data_path,
"--data_list",
dsl.InputArgumentPath(data_list),
"--stations",
dsl.InputArgumentPath(stations),
# '--result_dir', "results",
"--format",
"mseed_array",
"--amplitude",
],
file_outputs={"picks": "/opt/results/picks.csv"},
)
In [37]:
Copied!
# %%capture
if run_local:
command = f"python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir={root_dir(root_dir('waveforms'))} --data_list={root_dir('fname.csv')} --stations={root_dir('stations.json')} --result_dir={root_dir('phasenet')} --format=mseed_array --amplitude"# --upload_waveform"
print(command)
!{command}
# %%capture
if run_local:
command = f"python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir={root_dir(root_dir('waveforms'))} --data_list={root_dir('fname.csv')} --stations={root_dir('stations.json')} --result_dir={root_dir('phasenet')} --format=mseed_array --amplitude"# --upload_waveform"
print(command)
!{command}
python ../PhaseNet/phasenet/predict.py --model=../PhaseNet/model/190703-214543 --data_dir=Demo/Demo/waveforms --data_list=Demo/fname.csv --stations=Demo/stations.json --result_dir=Demo/phasenet --format=mseed_array --amplitude 2023-03-15 14:40:16.834011: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. longitude latitude elevation(m) component response unit CI.CCC..BH -117.364530 35.524950 670.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.CCC..HH -117.364530 35.524950 670.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.CCC..HN -117.364530 35.524950 670.0 [E, N, Z] [213979.0, 214322.0, 213808.0] m/s**2 CI.CLC..BH -117.597510 35.815740 775.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.CLC..HH -117.597510 35.815740 775.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.CLC..HN -117.597510 35.815740 775.0 [E, N, Z] [213945.0, 213808.0, 213740.0] m/s**2 CI.DTP..BH -117.845810 35.267420 908.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.DTP..HH -117.845810 35.267420 908.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.DTP..HN -117.845810 35.267420 908.0 [E, N, Z] [214399.0, 213971.0, 214484.0] m/s**2 CI.JRC2..BH -117.808850 35.982490 1469.0 [E, N, Z] [784866000.0, 784866000.0, 790478000.0] m/s CI.JRC2..HH -117.808850 35.982490 1469.0 [E, N, Z] [784866000.0, 784866000.0, 790478000.0] m/s CI.JRC2..HN -117.808850 35.982490 1469.0 [E, N, Z] [213808.0, 213945.0, 214185.0] m/s**2 CI.LRL..BH -117.682120 35.479540 1340.0 [E, N, Z] [628306000.0, 629984000.0, 627467000.0] m/s CI.LRL..HH -117.682120 35.479540 1340.0 [E, N, Z] [628306000.0, 629984000.0, 627467000.0] m/s CI.LRL..HN -117.682120 35.479540 1340.0 [E, N, Z] [213757.0, 213671.0, 213201.0] m/s**2 CI.LRL.2C.HN -117.682120 35.479540 1340.0 [E, N, Z] [213757.0, 213671.0, 213201.0] m/s**2 CI.MPM..BH -117.489010 36.057990 1839.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.MPM..HH -117.489010 36.057990 1839.0 [E, N, Z] [627368000.0, 627368000.0, 627368000.0] m/s CI.MPM..HN -117.489010 36.057990 1839.0 [E, N, Z] [213911.0, 214219.0, 213911.0] m/s**2 CI.Q0072.01.HN -117.666721 35.609617 695.0 [E, N, Z] [256354.0, 256354.0, 256354.0] m/s**2 CI.SLA..BH -117.283320 35.890950 1174.0 [E, N, Z] [622338000.0, 618992000.0, 616482000.0] m/s CI.SLA..HH -117.283320 35.890950 1174.0 [E, N, Z] [622338000.0, 618992000.0, 616482000.0] m/s CI.SLA..HN -117.283320 35.890950 1174.0 [E, N, Z] [214253.0, 213671.0, 213979.0] m/s**2 CI.SRT..BH -117.750510 35.692350 667.0 [E, N, Z] [633341000.0, 633341000.0, 633341000.0] m/s CI.SRT..HH -117.750510 35.692350 667.0 [E, N, Z] [629865000.0, 629865000.0, 629865000.0] m/s CI.SRT..HN -117.750510 35.692350 667.0 [E, N, Z] [214301.0, 213873.0, 214087.0] m/s**2 CI.TOW2..BH -117.764880 35.808560 685.0 [E, N, Z] [626915000.0, 626915000.0, 626843000.0] m/s CI.TOW2..HH -117.764880 35.808560 685.0 [E, N, Z] [626911000.0, 626911000.0, 626839000.0] m/s CI.TOW2..HN -117.764880 35.808560 685.0 [E, N, Z] [213800.0, 214142.0, 214356.0] m/s**2 CI.WBM..BH -117.890490 35.608390 892.0 [E, N, Z] [314573000.0, 314573000.0, 314573000.0] m/s CI.WBM..HH -117.890490 35.608390 892.0 [E, N, Z] [314573000.0, 314573000.0, 314573000.0] m/s CI.WBM..HN -117.890490 35.608390 892.0 [E, N, Z] [213550.0, 214064.0, 213550.0] m/s**2 CI.WBM.2C.HN -117.890490 35.608390 892.0 [E, N, Z] [213550.0, 214064.0, 213550.0] m/s**2 CI.WCS2..BH -117.765260 36.025210 1143.0 [E, N, Z] [626910000.0, 626910000.0, 626838000.0] m/s CI.WCS2..HH -117.765260 36.025210 1143.0 [E, N, Z] [626910000.0, 626910000.0, 626838000.0] m/s CI.WCS2..HN -117.765260 36.025210 1143.0 [E, N, Z] [213757.0, 213329.0, 213415.0] m/s**2 CI.WMF..BH -117.854860 36.117580 1537.4 [E, N, Z] [629963000.0, 631652000.0, 629963000.0] m/s CI.WMF..HH -117.854860 36.117580 1537.4 [E, N, Z] [626506000.0, 628185000.0, 626506000.0] m/s CI.WMF..HN -117.854860 36.117580 1537.4 [E, N, Z] [214087.0, 214087.0, 214087.0] m/s**2 CI.WMF.2C.HN -117.854860 36.117580 1537.4 [E, N, Z] [213952.0, 213952.0, 213952.0] m/s**2 CI.WNM..EH -117.906160 35.842200 974.3 [Z] [69306200.0] m/s CI.WNM..HN -117.906160 35.842200 974.3 [E, N, Z] [214054.0, 213926.0, 214054.0] m/s**2 CI.WNM.2C.HN -117.906160 35.842200 974.3 [E, N, Z] [214048.0, 213920.0, 214048.0] m/s**2 CI.WRC2..BH -117.650380 35.947900 943.0 [E, N, Z] [629145000.0, 629145000.0, 629145000.0] m/s CI.WRC2..HH -117.650380 35.947900 943.0 [E, N, Z] [629145000.0, 629145000.0, 629145000.0] m/s CI.WRC2..HN -117.650380 35.947900 943.0 [E, N, Z] [214227.0, 213970.0, 214056.0] m/s**2 CI.WRV2..EH -117.890400 36.007740 1070.0 [Z] [71427500.0] m/s CI.WRV2..HN -117.890400 36.007740 1070.0 [E, N, Z] [213883.0, 235224.0, 235139.0] m/s**2 CI.WRV2.2C.HN -117.890400 36.007740 1070.0 [E, N, Z] [213877.0, 235218.0, 235132.0] m/s**2 CI.WVP2..EH -117.817690 35.949390 1465.0 [Z] [68041300.0] m/s CI.WVP2..HN -117.817690 35.949390 1465.0 [E, N, Z] [213764.0, 213550.0, 213721.0] m/s**2 CI.WVP2.2C.HN -117.817690 35.949390 1465.0 [E, N, Z] [213782.0, 213569.0, 213740.0] m/s**2 2023-03-15 14:40:21,519 Resampling CI.CCC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,618 Resampling CI.CCC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,626 Resampling CI.CCC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,633 Resampling CI.CLC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,640 Resampling CI.CLC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,646 Resampling CI.CLC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,653 Resampling CI.DTP..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,660 Resampling CI.DTP..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,667 Resampling CI.DTP..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,674 Resampling CI.JRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,681 Resampling CI.JRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,687 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,694 Resampling CI.LRL..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,699 Resampling CI.LRL..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,705 Resampling CI.LRL..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,711 Resampling CI.MPM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,717 Resampling CI.MPM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,722 Resampling CI.MPM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,728 Resampling CI.SLA..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,735 Resampling CI.SLA..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,741 Resampling CI.SLA..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,748 Resampling CI.SRT..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,755 Resampling CI.SRT..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,762 Resampling CI.SRT..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,768 Resampling CI.TOW2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,774 Resampling CI.TOW2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,780 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,787 Resampling CI.WBM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,793 Resampling CI.WBM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,798 Resampling CI.WBM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,804 Resampling CI.WCS2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,811 Resampling CI.WCS2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,817 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,823 Resampling CI.WMF..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,830 Resampling CI.WMF..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,836 Resampling CI.WMF..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:21,842 Resampling CI.WRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:21,847 Resampling CI.WRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:21,853 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz Empty trace: CI.LRL.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.LRL.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.LRL.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNZ 2019-07-04T16:59:59.994500Z 2023-03-15 14:40:32,078 Pred log: Demo/phasenet 2023-03-15 14:40:32,078 Dataset size: 2 2023-03-15 14:40:32.210711: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. 2023-03-15 14:40:32.219257: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 80. Tune using inter_op_parallelism_threads for best performance. 2023-03-15 14:40:32,265 Model: depths 5, filters 8, filter size 7x1, pool size: 4x1, dilation rate: 1x1 2023-03-15 14:40:34.157725: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 80. Tune using inter_op_parallelism_threads for best performance. 2023-03-15 14:40:34.286939: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:357] MLIR V1 optimization pass is not enabled 2023-03-15 14:40:34,616 restoring model ../PhaseNet/model/190703-214543/model_95.ckpt Pred: 0%| | 0/2 [00:00<?, ?it/s]2023-03-15 14:40:35,609 Resampling CI.CCC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,618 Resampling CI.CCC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,624 Resampling CI.CCC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,630 Resampling CI.CLC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,637 Resampling CI.CLC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,643 Resampling CI.CLC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,649 Resampling CI.DTP..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,655 Resampling CI.DTP..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,661 Resampling CI.DTP..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,668 Resampling CI.JRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,674 Resampling CI.JRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,680 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,686 Resampling CI.LRL..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,692 Resampling CI.LRL..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,698 Resampling CI.LRL..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,705 Resampling CI.MPM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,711 Resampling CI.MPM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,717 Resampling CI.MPM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,724 Resampling CI.SLA..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,730 Resampling CI.SLA..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,737 Resampling CI.SLA..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,743 Resampling CI.SRT..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,750 Resampling CI.SRT..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,756 Resampling CI.SRT..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,763 Resampling CI.TOW2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,769 Resampling CI.TOW2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,776 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,782 Resampling CI.WBM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,789 Resampling CI.WBM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,795 Resampling CI.WBM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,802 Resampling CI.WCS2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,808 Resampling CI.WCS2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,815 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,822 Resampling CI.WMF..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,829 Resampling CI.WMF..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,836 Resampling CI.WMF..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:35,843 Resampling CI.WRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:35,849 Resampling CI.WRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:35,856 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz Empty trace: CI.LRL.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.LRL.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.LRL.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.Q0072.01.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WBM.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WMF.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WNM.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WRV2.2C.HNZ 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNE 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNN 2019-07-04T16:59:59.994500Z Empty trace: CI.WVP2.2C.HNZ 2019-07-04T16:59:59.994500Z 2023-03-15 14:40:52,818 Resampling CI.CCC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,828 Resampling CI.CCC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,834 Resampling CI.CCC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,841 Resampling CI.CLC..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,847 Resampling CI.CLC..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,853 Resampling CI.CLC..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,860 Resampling CI.DTP..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,866 Resampling CI.DTP..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,873 Resampling CI.DTP..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,879 Resampling CI.JRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,885 Resampling CI.JRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,892 Resampling CI.JRC2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,898 Resampling CI.LRL..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,904 Resampling CI.LRL..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,910 Resampling CI.LRL..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,916 Resampling CI.MPM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,923 Resampling CI.MPM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,929 Resampling CI.MPM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,935 Resampling CI.SLA..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,942 Resampling CI.SLA..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,948 Resampling CI.SLA..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,954 Resampling CI.SRT..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,961 Resampling CI.SRT..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,967 Resampling CI.SRT..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,974 Resampling CI.TOW2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:52,980 Resampling CI.TOW2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:52,987 Resampling CI.TOW2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:52,993 Resampling CI.WBM..BHE from 40.0 to 100 Hz 2023-03-15 14:40:53,000 Resampling CI.WBM..BHN from 40.0 to 100 Hz 2023-03-15 14:40:53,006 Resampling CI.WBM..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:53,013 Resampling CI.WCS2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:53,019 Resampling CI.WCS2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:53,026 Resampling CI.WCS2..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:53,032 Resampling CI.WMF..BHE from 40.0 to 100 Hz 2023-03-15 14:40:53,038 Resampling CI.WMF..BHN from 40.0 to 100 Hz 2023-03-15 14:40:53,045 Resampling CI.WMF..BHZ from 40.0 to 100 Hz 2023-03-15 14:40:53,051 Resampling CI.WRC2..BHE from 40.0 to 100 Hz 2023-03-15 14:40:53,057 Resampling CI.WRC2..BHN from 40.0 to 100 Hz 2023-03-15 14:40:53,063 Resampling CI.WRC2..BHZ from 40.0 to 100 Hz Empty trace: CI.LRL.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.LRL.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.LRL.2C.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.Q0072.01.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.Q0072.01.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.Q0072.01.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.WBM.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.WBM.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.WBM.2C.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.WMF.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.WMF.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.WMF.2C.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.WNM.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.WNM.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.WNM.2C.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.WRV2.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.WRV2.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.WRV2.2C.HNZ 2019-07-04T17:59:59.994500Z Empty trace: CI.WVP2.2C.HNE 2019-07-04T17:59:59.994500Z Empty trace: CI.WVP2.2C.HNN 2019-07-04T17:59:59.994500Z Empty trace: CI.WVP2.2C.HNZ 2019-07-04T17:59:59.994500Z Pred: 100%|███████████████████████████████████████| 2/2 [01:06<00:00, 33.49s/it] Done with 8856 P-picks and 9294 S-picks
6. Run GaMMA to associate P/S picks¶
In [38]:
Copied!
def gamma(
node_i: int,
index_json: InputPath("json"),
config_json: InputPath("json"),
pick_csv: InputPath("csv"),
station_json: InputPath("json"),
gamma_catalog_csv: OutputPath(str),
gamma_pick_csv: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> str:
import json
import os
import pickle
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from gamma.utils import association, convert_picks_csv, from_seconds
from pyproj import Proj
from tqdm import tqdm
catalog_dir = os.path.join("/tmp/", bucket_name)
if not os.path.exists(catalog_dir):
os.makedirs(catalog_dir)
## read config
with open(index_json, "r") as fp:
index = json.load(fp)
idx = index[node_i]
with open(config_json, "r") as fp:
config = json.load(fp)
## read picks
# picks = pd.read_json(pick_json)
picks = pd.read_csv(pick_csv, parse_dates=["phase_time"])
picks["id"] = picks["station_id"]
picks["timestamp"] = picks["phase_time"]
picks["amp"] = picks["phase_amp"]
picks["type"] = picks["phase_type"]
picks["prob"] = picks["phase_score"]
## read stations
# stations = pd.read_csv(station_csv, delimiter="\t")
with open(station_json, "r") as fp:
stations = json.load(fp)
stations = pd.DataFrame.from_dict(stations, orient="index")
# stations = stations.rename(columns={"station": "id"})
stations["id"] = stations.index
proj = Proj(f"+proj=sterea +lon_0={config['center'][0]} +lat_0={config['center'][1]} +units=km")
stations[["x(km)", "y(km)"]] = stations.apply(
lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1
)
stations["z(km)"] = stations["elevation(m)"].apply(lambda x: -x / 1e3)
## setting GMMA configs
config["use_dbscan"] = True
config["use_amplitude"] = True
config["method"] = "BGMM"
if config["method"] == "BGMM": ## BayesianGaussianMixture
config["oversample_factor"] = 4
if config["method"] == "GMM": ## GaussianMixture
config["oversample_factor"] = 1
# Earthquake location
config["dims"] = ["x(km)", "y(km)", "z(km)"]
config["vel"] = {"p": 6.0, "s": 6.0 / 1.73}
config["x(km)"] = (np.array(config["xlim_degree"]) - np.array(config["center"][0])) * config["degree2km"]
config["y(km)"] = (np.array(config["ylim_degree"]) - np.array(config["center"][1])) * config["degree2km"]
config["z(km)"] = (0, 60)
config["bfgs_bounds"] = (
(config["x(km)"][0] - 1, config["x(km)"][1] + 1), # x
(config["y(km)"][0] - 1, config["y(km)"][1] + 1), # y
(0, config["z(km)"][1] + 1), # z
(None, None), # t
)
# DBSCAN
config["dbscan_eps"] = 10 # second
config["dbscan_min_samples"] = 3 ## see DBSCAN
# Filtering
config["min_picks_per_eq"] = min(10, len(stations) // 2)
config["min_p_picks_per_eq"] = 0
config["min_s_picks_per_eq"] = 0
config["max_sigma11"] = 2.0 # s
config["max_sigma22"] = 2.0 # m/s
config["max_sigma12"] = 1.0 # covariance
# if use amplitude
if config["use_amplitude"]:
picks = picks[picks["amp"] != -1]
# print(config)
for k, v in config.items():
print(f"{k}: {v}")
## run GMMA association
event_idx0 = 1
assignments = []
catalogs, assignments = association(picks, stations, config, event_idx0, method=config["method"])
event_idx0 += len(catalogs)
## create catalog
catalogs = pd.DataFrame(
catalogs,
columns=["time"]
+ config["dims"]
+ [
"magnitude",
"sigma_time",
"sigma_amp",
"cov_time_amp",
"event_index",
"gamma_score",
],
)
catalogs[["longitude", "latitude"]] = catalogs.apply(
lambda x: pd.Series(proj(longitude=x["x(km)"], latitude=x["y(km)"], inverse=True)),
axis=1,
)
catalogs["depth(m)"] = catalogs["z(km)"].apply(lambda x: x * 1e3)
catalogs.sort_values(by=["time"], inplace=True)
with open(gamma_catalog_csv, "w") as fp:
catalogs.to_csv(
fp,
# sep="\t",
index=False,
float_format="%.3f",
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=[
"time",
"magnitude",
"longitude",
"latitude",
"depth(m)",
"sigma_time",
"sigma_amp",
"cov_time_amp",
"gamma_score",
"event_index",
],
)
# catalogs = catalogs[
# ['time', 'magnitude', 'longitude', 'latitude', 'depth(m)', 'sigma_time', 'sigma_amp']
# ]
## add assignment to picks
assignments = pd.DataFrame(assignments, columns=["pick_index", "event_index", "gamma_score"])
picks = picks.join(assignments.set_index("pick_index")).fillna(-1).astype({"event_index": int})
picks.sort_values(by=["timestamp"], inplace=True)
with open(gamma_pick_csv, "w") as fp:
picks.to_csv(
fp,
# sep="\t",
index=False,
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=[
"station_id",
"phase_time",
"phase_type",
"phase_score",
"phase_amp",
"gamma_score",
"event_index",
],
)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma/catalog_{node_i:03d}.csv",
gamma_catalog_csv,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma/picks_{node_i:03d}.csv",
gamma_pick_csv,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
## upload to mongodb
try:
from pymongo import MongoClient
username = "root"
password = "quakeflow123"
client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
db = client["quakeflow"]
collection = db["waveform"]
for i, p in tqdm(picks.iterrows(), desc="Uploading to mongodb"):
collection.update(
{"_id": f"{p['station_id']}_{p['timestamp'].isoformat(timespec='milliseconds')}_{p['type']}"},
{"$set": {"event_index": p["event_index"]}},
)
except Exception as err:
print(f"ERROR: can not access mongodb service! \n{err}")
pass
return f"catalog_{node_i:03d}.csv"
def gamma(
node_i: int,
index_json: InputPath("json"),
config_json: InputPath("json"),
pick_csv: InputPath("csv"),
station_json: InputPath("json"),
gamma_catalog_csv: OutputPath(str),
gamma_pick_csv: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
) -> str:
import json
import os
import pickle
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from gamma.utils import association, convert_picks_csv, from_seconds
from pyproj import Proj
from tqdm import tqdm
catalog_dir = os.path.join("/tmp/", bucket_name)
if not os.path.exists(catalog_dir):
os.makedirs(catalog_dir)
## read config
with open(index_json, "r") as fp:
index = json.load(fp)
idx = index[node_i]
with open(config_json, "r") as fp:
config = json.load(fp)
## read picks
# picks = pd.read_json(pick_json)
picks = pd.read_csv(pick_csv, parse_dates=["phase_time"])
picks["id"] = picks["station_id"]
picks["timestamp"] = picks["phase_time"]
picks["amp"] = picks["phase_amp"]
picks["type"] = picks["phase_type"]
picks["prob"] = picks["phase_score"]
## read stations
# stations = pd.read_csv(station_csv, delimiter="\t")
with open(station_json, "r") as fp:
stations = json.load(fp)
stations = pd.DataFrame.from_dict(stations, orient="index")
# stations = stations.rename(columns={"station": "id"})
stations["id"] = stations.index
proj = Proj(f"+proj=sterea +lon_0={config['center'][0]} +lat_0={config['center'][1]} +units=km")
stations[["x(km)", "y(km)"]] = stations.apply(
lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1
)
stations["z(km)"] = stations["elevation(m)"].apply(lambda x: -x / 1e3)
## setting GMMA configs
config["use_dbscan"] = True
config["use_amplitude"] = True
config["method"] = "BGMM"
if config["method"] == "BGMM": ## BayesianGaussianMixture
config["oversample_factor"] = 4
if config["method"] == "GMM": ## GaussianMixture
config["oversample_factor"] = 1
# Earthquake location
config["dims"] = ["x(km)", "y(km)", "z(km)"]
config["vel"] = {"p": 6.0, "s": 6.0 / 1.73}
config["x(km)"] = (np.array(config["xlim_degree"]) - np.array(config["center"][0])) * config["degree2km"]
config["y(km)"] = (np.array(config["ylim_degree"]) - np.array(config["center"][1])) * config["degree2km"]
config["z(km)"] = (0, 60)
config["bfgs_bounds"] = (
(config["x(km)"][0] - 1, config["x(km)"][1] + 1), # x
(config["y(km)"][0] - 1, config["y(km)"][1] + 1), # y
(0, config["z(km)"][1] + 1), # z
(None, None), # t
)
# DBSCAN
config["dbscan_eps"] = 10 # second
config["dbscan_min_samples"] = 3 ## see DBSCAN
# Filtering
config["min_picks_per_eq"] = min(10, len(stations) // 2)
config["min_p_picks_per_eq"] = 0
config["min_s_picks_per_eq"] = 0
config["max_sigma11"] = 2.0 # s
config["max_sigma22"] = 2.0 # m/s
config["max_sigma12"] = 1.0 # covariance
# if use amplitude
if config["use_amplitude"]:
picks = picks[picks["amp"] != -1]
# print(config)
for k, v in config.items():
print(f"{k}: {v}")
## run GMMA association
event_idx0 = 1
assignments = []
catalogs, assignments = association(picks, stations, config, event_idx0, method=config["method"])
event_idx0 += len(catalogs)
## create catalog
catalogs = pd.DataFrame(
catalogs,
columns=["time"]
+ config["dims"]
+ [
"magnitude",
"sigma_time",
"sigma_amp",
"cov_time_amp",
"event_index",
"gamma_score",
],
)
catalogs[["longitude", "latitude"]] = catalogs.apply(
lambda x: pd.Series(proj(longitude=x["x(km)"], latitude=x["y(km)"], inverse=True)),
axis=1,
)
catalogs["depth(m)"] = catalogs["z(km)"].apply(lambda x: x * 1e3)
catalogs.sort_values(by=["time"], inplace=True)
with open(gamma_catalog_csv, "w") as fp:
catalogs.to_csv(
fp,
# sep="\t",
index=False,
float_format="%.3f",
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=[
"time",
"magnitude",
"longitude",
"latitude",
"depth(m)",
"sigma_time",
"sigma_amp",
"cov_time_amp",
"gamma_score",
"event_index",
],
)
# catalogs = catalogs[
# ['time', 'magnitude', 'longitude', 'latitude', 'depth(m)', 'sigma_time', 'sigma_amp']
# ]
## add assignment to picks
assignments = pd.DataFrame(assignments, columns=["pick_index", "event_index", "gamma_score"])
picks = picks.join(assignments.set_index("pick_index")).fillna(-1).astype({"event_index": int})
picks.sort_values(by=["timestamp"], inplace=True)
with open(gamma_pick_csv, "w") as fp:
picks.to_csv(
fp,
# sep="\t",
index=False,
date_format="%Y-%m-%dT%H:%M:%S.%f",
columns=[
"station_id",
"phase_time",
"phase_type",
"phase_score",
"phase_amp",
"gamma_score",
"event_index",
],
)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma/catalog_{node_i:03d}.csv",
gamma_catalog_csv,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma/picks_{node_i:03d}.csv",
gamma_pick_csv,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
## upload to mongodb
try:
from pymongo import MongoClient
username = "root"
password = "quakeflow123"
client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
db = client["quakeflow"]
collection = db["waveform"]
for i, p in tqdm(picks.iterrows(), desc="Uploading to mongodb"):
collection.update(
{"_id": f"{p['station_id']}_{p['timestamp'].isoformat(timespec='milliseconds')}_{p['type']}"},
{"$set": {"event_index": p["event_index"]}},
)
except Exception as err:
print(f"ERROR: can not access mongodb service! \n{err}")
pass
return f"catalog_{node_i:03d}.csv"
In [39]:
Copied!
if run_local:
catalog = gamma(
0,
root_dir("index.json"),
root_dir("config.json"),
root_dir("phasenet/picks.csv"),
root_dir("stations.json"),
root_dir("gamma_catalog.csv"),
root_dir("gamma_picks.csv"),
bucket_name="catalogs",
s3_url="localhost:9000",
secure=False,
)
if run_local:
catalog = gamma(
0,
root_dir("index.json"),
root_dir("config.json"),
root_dir("phasenet/picks.csv"),
root_dir("stations.json"),
root_dir("gamma_catalog.csv"),
root_dir("gamma_picks.csv"),
bucket_name="catalogs",
s3_url="localhost:9000",
secure=False,
)
region: Demo center: [-117.504, 35.705] xlim_degree: [-118.004, -117.004] ylim_degree: [35.205, 36.205] min_longitude: -118.004 max_longitude: -117.004 min_latitude: 35.205 max_latitude: 36.205 degree2km: 111.19492664455873 starttime: 2019-07-04T17:00:00.000 endtime: 2019-07-04T19:00:00.000 networks: ['CI'] channels: HH*,BH*,EH*,HN* client: SCEDC phasenet: {} gamma: {} hypodd: {'MAXEVENT': 10000.0} use_dbscan: True use_amplitude: True method: BGMM oversample_factor: 4 dims: ['x(km)', 'y(km)', 'z(km)'] vel: {'p': 6.0, 's': 3.468208092485549} x(km): [-55.59746332 55.59746332] y(km): [-55.59746332 55.59746332] z(km): (0, 60) bfgs_bounds: ((-56.597463322279367, 56.597463322279367), (-56.597463322279367, 56.597463322279367), (0, 61), (None, None)) dbscan_eps: 10 dbscan_min_samples: 3 min_picks_per_eq: 10 min_p_picks_per_eq: 0 min_s_picks_per_eq: 0 max_sigma11: 2.0 max_sigma22: 2.0 max_sigma12: 1.0 Associating 81 clusters with 79 CPUs ................................................................................. Finish 99 events Finish 199 events Finish 299 events ERROR: can not access minio service! No module named 'minio'
Uploading to mongodb: 0it [00:30, ?it/s]
ERROR: can not access mongodb service! 127.0.0.1:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 64123c7172f5f960d0d7c589, topology_type: Single, servers: [<ServerDescription ('127.0.0.1', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('127.0.0.1:27017: [Errno 111] Connection refused')>]>
In [40]:
Copied!
gamma_op = comp.func_to_container_op(
gamma,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"pandas",
"numpy",
"scikit-learn",
"tqdm",
"minio",
"gmma",
"pyproj",
],
)
gamma_op = comp.func_to_container_op(
gamma,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=[
"pandas",
"numpy",
"scikit-learn",
"tqdm",
"minio",
"gmma",
"pyproj",
],
)
Plot catalogs¶
In [41]:
Copied!
if run_local:
%run plot_catalog.ipynb
if run_local:
%run plot_catalog.ipynb
Merge parallel processing on cloud¶
In [42]:
Copied!
def merge_catalog(
index_json: InputPath("json"),
config_json: InputPath("json"),
catalog_csv: OutputPath(str),
picks_csv: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from glob import glob
import pandas as pd
from minio import Minio
minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
with open(index_json, "r") as fp:
index = json.load(fp)
with open(config_json, "r") as fp:
config = json.load(fp)
objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/gamma", recursive=True)
tmp_path = lambda x: os.path.join("/tmp/", x)
for obj in objects:
print(obj._object_name)
minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
# files_catalog = sorted(glob(tmp_path("catalog_*.csv")))
# files_picks = sorted(glob(tmp_path("picks_*.csv")))
files_catalog = [tmp_path(f"catalog_{node_i:03d}.csv") for node_i in range(len(index))]
files_picks = [tmp_path(f"picks_{node_i:03d}.csv") for node_i in range(len(index))]
print(f"Merge catalog: {files_catalog}")
print(f"Merge picks: {files_picks}")
if len(files_catalog) > 0:
catalog_list = []
for f in files_catalog:
tmp = pd.read_csv(f, dtype=str)
tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
catalog_list.append(tmp)
merged_catalog = pd.concat(catalog_list).sort_values(by="time")
merged_catalog["match_id"] = merged_catalog.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
merged_catalog.sort_values(by="time", inplace=True, ignore_index=True)
merged_catalog.drop(columns=["event_index", "file_index"], inplace=True)
merged_catalog["event_index"] = merged_catalog.index.values + 1
mapping = dict(zip(merged_catalog["match_id"], merged_catalog["event_index"]))
merged_catalog.drop(columns=["match_id"], inplace=True)
merged_catalog.to_csv(catalog_csv, index=False)
del merged_catalog
pick_list = []
for f in files_picks:
tmp = pd.read_csv(f, dtype=str)
tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
pick_list.append(tmp)
merged_picks = pd.concat(pick_list).sort_values(by="phase_time")
merged_picks["match_id"] = merged_picks.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
merged_picks.drop(columns=["event_index", "file_index"], inplace=True)
merged_picks["event_index"] = merged_picks["match_id"].apply(lambda x: mapping[x] if x in mapping else -1)
merged_picks.drop(columns=["match_id"], inplace=True)
merged_picks.to_csv(picks_csv, index=False)
del merged_picks
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_catalog.csv",
catalog_csv,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_picks.csv",
picks_csv,
)
# else:
# with open(catalog_csv, "w") as fout:
# pass
# print("No catalog.csv found!")
# with open(picks_csv, "w") as fout:
# pass
# print("No picks.csv found!")
def merge_catalog(
index_json: InputPath("json"),
config_json: InputPath("json"),
catalog_csv: OutputPath(str),
picks_csv: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from glob import glob
import pandas as pd
from minio import Minio
minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
with open(index_json, "r") as fp:
index = json.load(fp)
with open(config_json, "r") as fp:
config = json.load(fp)
objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/gamma", recursive=True)
tmp_path = lambda x: os.path.join("/tmp/", x)
for obj in objects:
print(obj._object_name)
minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
# files_catalog = sorted(glob(tmp_path("catalog_*.csv")))
# files_picks = sorted(glob(tmp_path("picks_*.csv")))
files_catalog = [tmp_path(f"catalog_{node_i:03d}.csv") for node_i in range(len(index))]
files_picks = [tmp_path(f"picks_{node_i:03d}.csv") for node_i in range(len(index))]
print(f"Merge catalog: {files_catalog}")
print(f"Merge picks: {files_picks}")
if len(files_catalog) > 0:
catalog_list = []
for f in files_catalog:
tmp = pd.read_csv(f, dtype=str)
tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
catalog_list.append(tmp)
merged_catalog = pd.concat(catalog_list).sort_values(by="time")
merged_catalog["match_id"] = merged_catalog.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
merged_catalog.sort_values(by="time", inplace=True, ignore_index=True)
merged_catalog.drop(columns=["event_index", "file_index"], inplace=True)
merged_catalog["event_index"] = merged_catalog.index.values + 1
mapping = dict(zip(merged_catalog["match_id"], merged_catalog["event_index"]))
merged_catalog.drop(columns=["match_id"], inplace=True)
merged_catalog.to_csv(catalog_csv, index=False)
del merged_catalog
pick_list = []
for f in files_picks:
tmp = pd.read_csv(f, dtype=str)
tmp["file_index"] = f.rstrip(".csv").split("_")[-1]
pick_list.append(tmp)
merged_picks = pd.concat(pick_list).sort_values(by="phase_time")
merged_picks["match_id"] = merged_picks.apply(lambda x: f'{x["event_index"]}_{x["file_index"]}', axis=1)
merged_picks.drop(columns=["event_index", "file_index"], inplace=True)
merged_picks["event_index"] = merged_picks["match_id"].apply(lambda x: mapping[x] if x in mapping else -1)
merged_picks.drop(columns=["match_id"], inplace=True)
merged_picks.to_csv(picks_csv, index=False)
del merged_picks
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_catalog.csv",
catalog_csv,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_picks.csv",
picks_csv,
)
# else:
# with open(catalog_csv, "w") as fout:
# pass
# print("No catalog.csv found!")
# with open(picks_csv, "w") as fout:
# pass
# print("No picks.csv found!")
In [43]:
Copied!
merge_op = comp.func_to_container_op(
merge_catalog,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=["pandas", "minio"],
)
merge_op = comp.func_to_container_op(
merge_catalog,
# base_image='zhuwq0/quakeflow-env:latest',
base_image="python:3.8",
packages_to_install=["pandas", "minio"],
)
7. HypoDD earthquake relocation¶
Download and compile HypoDD¶
In [44]:
Copied!
if run_local:
if not os.path.exists("HYPODD"):
os.system("wget -O HYPODD_1.3.tar.gz http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz")
os.system("tar -xf HYPODD_1.3.tar.gz")
os.system("ln -s $(which gfortran) ./HYPODD/f77")
os.system("ln -s $(which gfortran) ./HYPODD/g77")
os.environ["PATH"] += os.pathsep + os.getcwd() + "/HYPODD/"
os.system("make -C HYPODD/src")
if not os.path.exists(root_dir("hypodd")):
os.mkdir(root_dir("hypodd"))
os.system(f"cp HYPODD/src/ph2dt/ph2dt {root_dir('hypodd')}/")
os.system(f"cp HYPODD/src/hypoDD/hypoDD {root_dir('hypodd')}/")
if run_local:
if not os.path.exists("HYPODD"):
os.system("wget -O HYPODD_1.3.tar.gz http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz")
os.system("tar -xf HYPODD_1.3.tar.gz")
os.system("ln -s $(which gfortran) ./HYPODD/f77")
os.system("ln -s $(which gfortran) ./HYPODD/g77")
os.environ["PATH"] += os.pathsep + os.getcwd() + "/HYPODD/"
os.system("make -C HYPODD/src")
if not os.path.exists(root_dir("hypodd")):
os.mkdir(root_dir("hypodd"))
os.system(f"cp HYPODD/src/ph2dt/ph2dt {root_dir('hypodd')}/")
os.system(f"cp HYPODD/src/hypoDD/hypoDD {root_dir('hypodd')}/")
--2023-03-15 14:46:00-- http://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz Resolving www.ldeo.columbia.edu (www.ldeo.columbia.edu)... 129.236.14.15 Connecting to www.ldeo.columbia.edu (www.ldeo.columbia.edu)|129.236.14.15|:80... connected. HTTP request sent, awaiting response... 302 Found Location: https://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz [following] --2023-03-15 14:46:00-- https://www.ldeo.columbia.edu/~felixw/HYPODD/HYPODD_1.3.tar.gz Connecting to www.ldeo.columbia.edu (www.ldeo.columbia.edu)|129.236.14.15|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 1393039 (1.3M) [application/x-gzip] Saving to: ‘HYPODD_1.3.tar.gz’ 0K .......... .......... .......... .......... .......... 3% 384K 3s 50K .......... .......... .......... .......... .......... 7% 774K 2s 100K .......... .......... .......... .......... .......... 11% 47.0M 2s 150K .......... .......... .......... .......... .......... 14% 65.5M 1s 200K .......... .......... .......... .......... .......... 18% 783K 1s 250K .......... .......... .......... .......... .......... 22% 68.0M 1s 300K .......... .......... .......... .......... .......... 25% 41.3M 1s 350K .......... .......... .......... .......... .......... 29% 213M 1s 400K .......... .......... .......... .......... .......... 33% 799K 1s 450K .......... .......... .......... .......... .......... 36% 56.2M 1s 500K .......... .......... .......... .......... .......... 40% 64.0M 0s 550K .......... .......... .......... .......... .......... 44% 110M 0s 600K .......... .......... .......... .......... .......... 47% 86.9M 0s 650K .......... .......... .......... .......... .......... 51% 80.3M 0s 700K .......... .......... .......... .......... .......... 55% 102M 0s 750K .......... .......... .......... .......... .......... 58% 161M 0s 800K .......... .......... .......... .......... .......... 62% 186M 0s 850K .......... .......... .......... .......... .......... 66% 823K 0s 900K .......... .......... .......... .......... .......... 69% 96.5M 0s 950K .......... .......... .......... .......... .......... 73% 62.2M 0s 1000K .......... .......... .......... .......... .......... 77% 43.8M 0s 1050K .......... .......... .......... .......... .......... 80% 50.9M 0s 1100K .......... .......... .......... .......... .......... 84% 76.9M 0s 1150K .......... .......... .......... .......... .......... 88% 209M 0s 1200K .......... .......... .......... .......... .......... 91% 178M 0s 1250K .......... .......... .......... .......... .......... 95% 210M 0s 1300K .......... .......... .......... .......... .......... 99% 202M 0s 1350K .......... 100% 135M=0.4s 2023-03-15 14:46:01 (3.36 MB/s) - ‘HYPODD_1.3.tar.gz’ saved [1393039/1393039]
make: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src' for d in hypoDD ncsn2pha ph2dt hista2ddsta; do (cd $d; echo $d; make all); done hypoDD make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hypoDD' g77 -I../../include -c hypoDD.f -o hypoDD.o g77 -I../../include -c aprod.f -o aprod.o g77 -I../../include -c cluster1.f -o cluster1.o
hypoDD.f:654:29: 654 | & f7.1,f7.1,f7.1,f7.1)'),xav,yav,zav,tav | 1 Warning: Legacy Extension: Comma before i/o item list at (1)
g77 -I../../include -c covar.f -o covar.o g77 -I../../include -c datum.f -o datum.o g77 -I../../include -c delaz.f -o delaz.o g77 -I../../include -c delaz2.f -o delaz2.o g77 -I../../include -c direct1.f -o direct1.o g77 -I../../include -c dist.f -o dist.o g77 -I../../include -c dtres.f -o dtres.o g77 -I../../include -c exist.f -o exist.o g77 -I../../include -c freeunit.f -o freeunit.o g77 -I../../include -c getdata.f -o getdata.o g77 -I../../include -c getinp.f -o getinp.o g77 -I../../include -c ifindi.f -o ifindi.o g77 -I../../include -c indexxi.f -o indexxi.o g77 -I../../include -c juliam.f -o juliam.o g77 -I../../include -c lsfit_lsqr.f -o lsfit_lsqr.o g77 -I../../include -c lsfit_svd.f -o lsfit_svd.o g77 -I../../include -c lsqr.f -o lsqr.o g77 -I../../include -c matmult1.f -o matmult1.o g77 -I../../include -c matmult2.f -o matmult2.o g77 -I../../include -c matmult3.f -o matmult3.o g77 -I../../include -c mdian1.f -o mdian1.o g77 -I../../include -c normlz.f -o normlz.o g77 -I../../include -c partials.f -o partials.o g77 -I../../include -c ran.f -o ran.o g77 -I../../include -c redist.f -o redist.o g77 -I../../include -c refract.f -o refract.o g77 -I../../include -c resstat.f -o resstat.o g77 -I../../include -c scopy.f -o scopy.o
refract.f:84:16: 84 | do 23151m=j1,nl | 1 Warning: Deleted feature: Start expression in DO loop at (1) must be integer refract.f:126:36: 126 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:126:24: 126 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:126:18: 126 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:53: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:47: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:40: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:34: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:27: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:155:19: 155 | xovmax=(tinj(lx)-tid(jx))*v(lx)*v(jx)/(v(lx)-v(jx)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:161:36: 161 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:161:24: 161 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) refract.f:161:18: 161 | xovmax=tinj(lx)*v(lx)*v(1)/(v(lx)-v(1)) | 1 Warning: Legacy Extension: REAL array index at (1) scopy.f:25:72: 25 | if(incx.eq.incy) if(incx-1) 5,20,60 | 1 Warning: Fortran 2018 deleted feature: Arithmetic IF statement at (1)
g77 -I../../include -c sdc2.f -o sdc2.o g77 -I../../include -c setorg.f -o setorg.o g77 -I../../include -c skip.f -o skip.o g77 -I../../include -c snrm2.f -o snrm2.o
snrm2.f:67:72: 67 | 10 assign 30 to next | 1 Warning: Deleted feature: ASSIGN statement at (1) snrm2.f:72:19: 72 | 20 go to next,(30, 50, 70, 110) | 1 Warning: Deleted feature: Assigned GOTO statement at (1) snrm2.f:74:72: 74 | assign 50 to next | 1 Warning: Deleted feature: ASSIGN statement at (1) snrm2.f:83:72: 83 | assign 70 to next | 1 Warning: Deleted feature: ASSIGN statement at (1) snrm2.f:89:72: 89 | assign 110 to next | 1 Warning: Deleted feature: ASSIGN statement at (1) snrm2.f:125:72: 125 | 95 sum = sum + sx(j)**2 | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 95 at (1)
g77 -I../../include -c sort.f -o sort.o g77 -I../../include -c sorti.f -o sorti.o g77 -I../../include -c sscal.f -o sscal.o g77 -I../../include -c svd.f -o svd.o g77 -I../../include -c tiddid.f -o tiddid.o g77 -I../../include -c trialsrc.f -o trialsrc.o g77 -I../../include -c trimlen.f -o trimlen.o g77 -I../../include -c ttime.f -o ttime.o
svd.f:49:72: 49 | DO 1100 J=1,N | 1 Warning: Fortran 2018 deleted feature: Shared DO termination label 1100 at (1) svd.f:50:72: 50 | 1100 U(I,J)=A(I,J) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1100 at (1) svd.f:59:72: 59 | 2100 S=U(J,I)**2 + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2100 at (1) svd.f:69:72: 69 | 2200 S=U(K,I)*U(K,J) + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2200 at (1) svd.f:72:72: 72 | 2300 U(K,J)=U(K,J) + F*U(K,I) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2300 at (1) svd.f:82:72: 82 | 2600 S=U(I,J)**2 + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2600 at (1) svd.f:90:72: 90 | 2650 E(J)=U(I,J)/H | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2650 at (1) svd.f:97:72: 97 | 2670 S=U(J,K)*U(I,K) + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2670 at (1) svd.f:99:72: 99 | 2690 U(J,K)=U(J,K) + S*E(K) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 2690 at (1) svd.f:117:72: 117 | 3100 V(J,I)=U(I,J)/H | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3100 at (1) svd.f:121:72: 121 | 3200 S=U(I,K)*V(K,J) + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3200 at (1) svd.f:123:72: 123 | 3300 V(K,J)=V(K,J) + S*V(K,I) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3300 at (1) svd.f:129:72: 129 | 3600 V(I,J)=0.0 | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 3600 at (1) svd.f:145:72: 145 | 4100 U(I,J)=0.0 | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4100 at (1) svd.f:152:72: 152 | 4200 S=U(K,I)*U(K,J) + S | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4200 at (1) svd.f:155:72: 155 | 4300 U(K,J)=U(K,J) + F*U(K,I) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4300 at (1) svd.f:159:72: 159 | 4550 U(J,I)=U(J,I)/G | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4550 at (1) svd.f:163:72: 163 | 4600 U(J,I)=0.0 | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4600 at (1) svd.f:164:72: 164 | 4700 U(I,I)=U(I,I) + 1.0 | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 4700 at (1) svd.f:258:72: 258 | 8200 V(J,K)=-V(J,K) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 8200 at (1) trimlen.f:11:72: 11 | 1 if(t(trimlen:trimlen).ne.' ')RETURN | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1) vmodel.f:34:72: 34 | 10 vsq(i)=v(i)*v(i) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 10 at (1) vmodel.f:53:72: 53 | 30 thk(i)=top(i+1)-top(i) | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 30 at (1)
g77 -I../../include -c vmodel.f -o vmodel.o g77 -I../../include -c weighting.f -o weighting.o gcc -O -I../../include -c -o atoangle_.o atoangle_.c
atoangle_.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif lint ^~~~ In file included from atoangle_.c:6:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~
gcc -O -I../../include -c -o atoangle.o atoangle.c gcc -O -I../../include -c -o datetime_.o datetime_.c gcc -O -I../../include -c -o hypot_.o hypot_.c gcc -O -I../../include -c -o rpad_.o rpad_.c gcc -O -I../../include -c -o sscanf3_.o sscanf3_.c g77 hypoDD.o aprod.o cluster1.o covar.o datum.o delaz.o delaz2.o direct1.o dist.o dtres.o exist.o freeunit.o getdata.o getinp.o ifindi.o indexxi.o juliam.o lsfit_lsqr.o lsfit_svd.o lsqr.o matmult1.o matmult2.o matmult3.o mdian1.o normlz.o partials.o ran.o redist.o refract.o resstat.o scopy.o sdc2.o setorg.o skip.o snrm2.o sort.o sorti.o sscal.o svd.o tiddid.o trialsrc.o trimlen.o ttime.o vmodel.o weighting.o atoangle_.o atoangle.o datetime_.o hypot_.o rpad_.o sscanf3_.o -o hypoDD make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hypoDD' ncsn2pha make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ncsn2pha' g77 -O -c ncsn2pha.f -o ncsn2pha.o
atoangle.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif lint ^~~~ atoangle.c:9:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif NULL ^~~~ In file included from datetime_.c:7:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~ In file included from rpad_.c:6:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~ In file included from sscanf3_.c:7:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~ ncsn2pha.f:279:72: 279 | 1 if(t(trimlen:trimlen).ne.' ')RETURN | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1) ncsn2pha.f:62:0: 62 | fn0= str30 (1:trimlen(str30)) | Warning: ‘__builtin_memcpy’ reading 80 bytes from a region of size 30 [-Wstringop-overflow=]
g77 ncsn2pha.o -o ncsn2pha make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ncsn2pha' ph2dt make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ph2dt' g77 -O -I../../include -c ph2dt.f -o ph2dt.o g77 -O -I../../include -c cluster.f -o cluster.o g77 -O -I../../include -c datetime.f -o datetime.o g77 -O -I../../include -c delaz.f -o delaz.o g77 -O -I../../include -c ifindi.f -o ifindi.o g77 -O -I../../include -c indexx.f -o indexx.o g77 -O -I../../include -c indexxi.f -o indexxi.o g77 -O -I../../include -c sorti.f -o sorti.o g77 -O -I../../include -c trimlen.f -o trimlen.o gcc -I../../include -O -c -o atoangle_.o atoangle_.c gcc -I../../include -O -c -o atoangle.o atoangle.c gcc -I../../include -O -c -o rpad_.o rpad_.c gcc -I../../include -O -c -o sscanf3_.o sscanf3_.c g77 ph2dt.o cluster.o datetime.o delaz.o ifindi.o indexx.o indexxi.o sorti.o trimlen.o atoangle_.o atoangle.o rpad_.o sscanf3_.o -o ph2dt
trimlen.f:11:72: 11 | 1 if(t(trimlen:trimlen).ne.' ')RETURN | 1 Warning: Fortran 2018 deleted feature: DO termination statement which is not END DO or CONTINUE with label 1 at (1) atoangle_.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif lint ^~~~ In file included from atoangle_.c:6:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~ atoangle.c:3:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif lint ^~~~ atoangle.c:9:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif NULL ^~~~ In file included from rpad_.c:6:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~ In file included from sscanf3_.c:7:0: ../../include/f77types.h:10:8: warning: extra tokens at end of #endif directive [-Wendif-labels] #endif F77TYPES_H ^~~~~~~~~~
make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/ph2dt' hista2ddsta make[1]: Entering directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hista2ddsta' f77 -O -I../../include -c hista2ddsta.f -o hista2ddsta.o f77 hista2ddsta.o -o hista2ddsta make[1]: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src/hista2ddsta' make: Leaving directory '/net/han/export/ssd-tmp-nobak2/zhuwq/QuakeFlow/kubeflow/HYPODD/src'
7.1. Convert station format¶
In [45]:
Copied!
def convert_station(
config_json: InputPath("json"),
station_json: InputPath("json"),
hypoinverse_station: OutputPath(str),
hypodd_station: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import os
import json
import pandas as pd
from tqdm import tqdm
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
hypoinv_path = os.path.join(data_path, "hypoinv")
if not os.path.exists(hypoinv_path):
os.mkdir(hypoinv_path)
stations = pd.read_json(station_json, orient="index")
converted_hypoinverse = []
converted_hypodd = {}
for sta, row in tqdm(stations.iterrows()):
network_code, station_code, comp_code, channel_code = sta.split(".")
station_weight = " "
lat_degree = int(row["latitude"])
lat_minute = (row["latitude"] - lat_degree) * 60
north = "N" if lat_degree >= 0 else "S"
lng_degree = int(row["longitude"])
lng_minute = (row["longitude"] - lng_degree) * 60
west = "W" if lng_degree <= 0 else "E"
elevation = row["elevation(m)"]
line_hypoinverse = f"{station_code:<5} {network_code:<2} {comp_code[:-1]:<1}{channel_code:<3} {station_weight}{abs(lat_degree):2.0f} {abs(lat_minute):7.4f}{north}{abs(lng_degree):3.0f} {abs(lng_minute):7.4f}{west}{elevation:4.0f}\n"
converted_hypoinverse.append(line_hypoinverse)
tmp_code = f"{station_code}{channel_code}"
converted_hypodd[
f"{station_code}{channel_code}"
] = f"{tmp_code:<8s} {row['latitude']:.3f} {row['longitude']:.3f}\n"
with open(hypoinverse_station, "w") as f:
f.writelines(converted_hypoinverse)
with open(hypodd_station, "w") as f:
for k, v in converted_hypodd.items():
f.write(v)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/stations.dat",
hypodd_station,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
def convert_station(
config_json: InputPath("json"),
station_json: InputPath("json"),
hypoinverse_station: OutputPath(str),
hypodd_station: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import os
import json
import pandas as pd
from tqdm import tqdm
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
hypoinv_path = os.path.join(data_path, "hypoinv")
if not os.path.exists(hypoinv_path):
os.mkdir(hypoinv_path)
stations = pd.read_json(station_json, orient="index")
converted_hypoinverse = []
converted_hypodd = {}
for sta, row in tqdm(stations.iterrows()):
network_code, station_code, comp_code, channel_code = sta.split(".")
station_weight = " "
lat_degree = int(row["latitude"])
lat_minute = (row["latitude"] - lat_degree) * 60
north = "N" if lat_degree >= 0 else "S"
lng_degree = int(row["longitude"])
lng_minute = (row["longitude"] - lng_degree) * 60
west = "W" if lng_degree <= 0 else "E"
elevation = row["elevation(m)"]
line_hypoinverse = f"{station_code:<5} {network_code:<2} {comp_code[:-1]:<1}{channel_code:<3} {station_weight}{abs(lat_degree):2.0f} {abs(lat_minute):7.4f}{north}{abs(lng_degree):3.0f} {abs(lng_minute):7.4f}{west}{elevation:4.0f}\n"
converted_hypoinverse.append(line_hypoinverse)
tmp_code = f"{station_code}{channel_code}"
converted_hypodd[
f"{station_code}{channel_code}"
] = f"{tmp_code:<8s} {row['latitude']:.3f} {row['longitude']:.3f}\n"
with open(hypoinverse_station, "w") as f:
f.writelines(converted_hypoinverse)
with open(hypodd_station, "w") as f:
for k, v in converted_hypodd.items():
f.write(v)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/stations.dat",
hypodd_station,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
In [46]:
Copied!
if run_local:
convert_station(
root_dir("config.json"),
root_dir("stations.json"),
root_dir("hypoinv/stations.dat"),
root_dir("hypodd/stations.dat"),
root_dir(""),
)
if run_local:
convert_station(
root_dir("config.json"),
root_dir("stations.json"),
root_dir("hypoinv/stations.dat"),
root_dir("hypodd/stations.dat"),
root_dir(""),
)
52it [00:00, 11423.83it/s]
ERROR: can not access minio service! No module named 'minio'
In [47]:
Copied!
convert_station_op = comp.func_to_container_op(
convert_station,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
convert_station_op = comp.func_to_container_op(
convert_station,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
Split large catalog due to memory and time¶
In [48]:
Copied!
def split_hypodd(
config_json: InputPath("json"),
catalog_csv: InputPath(str),
) -> list:
import json
import pandas as pd
with open(config_json, "r") as fp:
config = json.load(fp)
events = pd.read_csv(catalog_csv)
if "MAXEVENT" in config["hypodd"]:
MAXEVENT = config["hypodd"]["MAXEVENT"]
else:
MAXEVENT = 1e4 ## segment by time
MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
num_parallel = int((len(events) - 1) // MAXEVENT + 1)
return list(range(num_parallel))
def split_hypodd(
config_json: InputPath("json"),
catalog_csv: InputPath(str),
) -> list:
import json
import pandas as pd
with open(config_json, "r") as fp:
config = json.load(fp)
events = pd.read_csv(catalog_csv)
if "MAXEVENT" in config["hypodd"]:
MAXEVENT = config["hypodd"]["MAXEVENT"]
else:
MAXEVENT = 1e4 ## segment by time
MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
num_parallel = int((len(events) - 1) // MAXEVENT + 1)
return list(range(num_parallel))
In [49]:
Copied!
if run_local:
nodes = split_hypodd(
root_dir("config.json"),
root_dir("gamma_catalog.csv"),
)
if run_local:
nodes = split_hypodd(
root_dir("config.json"),
root_dir("gamma_catalog.csv"),
)
In [50]:
Copied!
split_hypodd_op = comp.func_to_container_op(
split_hypodd,
base_image="python:3.8",
packages_to_install=[
"pandas",
],
)
split_hypodd_op = comp.func_to_container_op(
split_hypodd,
base_image="python:3.8",
packages_to_install=[
"pandas",
],
)
7.2. Convert phase format¶
In [51]:
Copied!
def convert_phase(
node_i: int,
config_json: InputPath("json"),
picks_csv: InputPath(str),
catalog_csv: InputPath(str),
hypodd_phase: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
import pandas as pd
from tqdm import tqdm
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
picks = pd.read_csv(picks_csv)
events = pd.read_csv(catalog_csv)
if "MAXEVENT" in config["hypodd"]:
MAXEVENT = config["hypodd"]["MAXEVENT"]
else:
MAXEVENT = 1e4 ## segment by time
MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
num_parallel = int((len(events) - 1) // MAXEVENT + 1)
events.sort_values("time", inplace=True)
events = events.iloc[node_i::num_parallel]
picks = picks.loc[picks["event_index"].isin(events["event_index"])]
# output_lines = []
output_file = open(hypodd_phase, "w")
picks_by_event = picks.groupby("event_index").groups
# for i in tqdm(range(node_i, len(events), num_parallel)):
# event = events.iloc[i]
for i, event in events.iterrows():
event_time = datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%S.%f")
lat = event["latitude"]
lng = event["longitude"]
dep = event["depth(m)"] / 1e3
mag = event["magnitude"]
EH = 0
EZ = 0
RMS = event["sigma_time"]
year, month, day, hour, min, sec = (
event_time.year,
event_time.month,
event_time.day,
event_time.hour,
event_time.minute,
float(event_time.strftime("%S.%f")),
)
event_line = f"# {year:4d} {month:2d} {day:2d} {hour:2d} {min:2d} {sec:5.2f} {lat:7.4f} {lng:9.4f} {dep:5.2f} {mag:5.2f} {EH:5.2f} {EZ:5.2f} {RMS:5.2f} {event['event_index']:9d}\n"
# output_lines.append(event_line)
output_file.write(event_line)
picks_idx = picks_by_event[event["event_index"]]
for j in picks_idx:
# pick = picks.iloc[j]
pick = picks.loc[j]
network_code, station_code, comp_code, channel_code = pick["station_id"].split(".")
phase_type = pick["phase_type"].upper()
phase_score = pick["phase_score"]
pick_time = (datetime.strptime(pick["phase_time"], "%Y-%m-%dT%H:%M:%S.%f") - event_time).total_seconds()
tmp_code = f"{station_code}{channel_code}"
pick_line = f"{tmp_code:<7s} {pick_time:6.3f} {phase_score:5.4f} {phase_type}\n"
# output_lines.append(pick_line)
output_file.write(pick_line)
# with open(hypodd_phase, "w") as fp:
# fp.writelines(output_lines)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/phase_{node_i:03d}.pha",
hypodd_phase,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
return hypodd_phase
def convert_phase(
node_i: int,
config_json: InputPath("json"),
picks_csv: InputPath(str),
catalog_csv: InputPath(str),
hypodd_phase: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
import pandas as pd
from tqdm import tqdm
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
picks = pd.read_csv(picks_csv)
events = pd.read_csv(catalog_csv)
if "MAXEVENT" in config["hypodd"]:
MAXEVENT = config["hypodd"]["MAXEVENT"]
else:
MAXEVENT = 1e4 ## segment by time
MAXEVENT = len(events) // ((len(events) - 1) // MAXEVENT + 1) + 1
num_parallel = int((len(events) - 1) // MAXEVENT + 1)
events.sort_values("time", inplace=True)
events = events.iloc[node_i::num_parallel]
picks = picks.loc[picks["event_index"].isin(events["event_index"])]
# output_lines = []
output_file = open(hypodd_phase, "w")
picks_by_event = picks.groupby("event_index").groups
# for i in tqdm(range(node_i, len(events), num_parallel)):
# event = events.iloc[i]
for i, event in events.iterrows():
event_time = datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%S.%f")
lat = event["latitude"]
lng = event["longitude"]
dep = event["depth(m)"] / 1e3
mag = event["magnitude"]
EH = 0
EZ = 0
RMS = event["sigma_time"]
year, month, day, hour, min, sec = (
event_time.year,
event_time.month,
event_time.day,
event_time.hour,
event_time.minute,
float(event_time.strftime("%S.%f")),
)
event_line = f"# {year:4d} {month:2d} {day:2d} {hour:2d} {min:2d} {sec:5.2f} {lat:7.4f} {lng:9.4f} {dep:5.2f} {mag:5.2f} {EH:5.2f} {EZ:5.2f} {RMS:5.2f} {event['event_index']:9d}\n"
# output_lines.append(event_line)
output_file.write(event_line)
picks_idx = picks_by_event[event["event_index"]]
for j in picks_idx:
# pick = picks.iloc[j]
pick = picks.loc[j]
network_code, station_code, comp_code, channel_code = pick["station_id"].split(".")
phase_type = pick["phase_type"].upper()
phase_score = pick["phase_score"]
pick_time = (datetime.strptime(pick["phase_time"], "%Y-%m-%dT%H:%M:%S.%f") - event_time).total_seconds()
tmp_code = f"{station_code}{channel_code}"
pick_line = f"{tmp_code:<7s} {pick_time:6.3f} {phase_score:5.4f} {phase_type}\n"
# output_lines.append(pick_line)
output_file.write(pick_line)
# with open(hypodd_phase, "w") as fp:
# fp.writelines(output_lines)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/phase_{node_i:03d}.pha",
hypodd_phase,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
return hypodd_phase
In [52]:
Copied!
if run_local:
for node_i in nodes:
convert_phase(
node_i,
root_dir("config.json"),
root_dir("gamma_picks.csv"),
root_dir("gamma_catalog.csv"),
root_dir("hypodd/hypodd_phase.pha"),
root_dir(""),
)
if run_local:
for node_i in nodes:
convert_phase(
node_i,
root_dir("config.json"),
root_dir("gamma_picks.csv"),
root_dir("gamma_catalog.csv"),
root_dir("hypodd/hypodd_phase.pha"),
root_dir(""),
)
ERROR: can not access minio service! No module named 'minio'
In [53]:
Copied!
convert_phase_op = comp.func_to_container_op(
convert_phase,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
convert_phase_op = comp.func_to_container_op(
convert_phase,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
7.3. Run ph2dt to calculate differential time between phases¶
In [56]:
Copied!
def ph2dt(
node_i: int,
config_json: InputPath("json"),
hypodd_phase: InputPath(str),
station_dat: InputPath(str),
ct_file: OutputPath(str),
hypodd_event: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
# try:
# minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
# minioClient.fget_object(bucket_name, f"{config['region']}/hypodd_{node_i:03d}.pha", os.path.join(hypodd_path, f"hypodd_{node_i:03d}.pha"))
# except Exception as err:
# print(f"ERROR: can not access minio service! \n{err}")
# pass
ph2dt = """* ph2dt.inp - input control file for program ph2dt
* Input station file:
stations.dat
* Input phase file:
hypodd.pha
*MINWGHT: min. pick weight allowed [0]
*MAXDIST: max. distance in km between event pair and stations [200]
*MAXSEP: max. hypocentral separation in km [10]
*MAXNGH: max. number of neighbors per event [10]
*MINLNK: min. number of links required to define a neighbor [8]
*MINOBS: min. number of links per pair saved [8]
*MAXOBS: max. number of links per pair saved [20]
*MINWGHT MAXDIST MAXSEP MAXNGH MINLNK MINOBS MAXOBS
0 120 10 50 8 8 100
"""
with open(os.path.join(hypodd_path, "ph2dt.inp"), "w") as fp:
fp.writelines(ph2dt)
def copy_file(fp_from, fp_to):
with open(fp_from, "r") as fp:
lines = fp.readlines()
with open(fp_to, "w") as fp:
fp.writelines(lines)
copy_file(hypodd_phase, os.path.join(hypodd_path, "hypodd.pha"))
copy_file(station_dat, os.path.join(hypodd_path, "stations.dat"))
PH2DT_CMD = f"cd {hypodd_path} && ./ph2dt ph2dt.inp"
print(PH2DT_CMD)
if os.system(PH2DT_CMD) != 0:
raise ("{PH2DT_CMD}" + " failed!")
copy_file(os.path.join(hypodd_path, "dt.ct"), ct_file)
copy_file(os.path.join(hypodd_path, "event.sel"), hypodd_event)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/dt_{node_i:03d}.ct",
ct_file,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/event_{node_i:03d}.sel",
hypodd_event,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
# try:
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/dt_{node_i:03d}.ct",
# f"{os.path.join(hypodd_path, 'dt.ct')}",
# )
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/event_{node_i:03d}.dat",
# f"{os.path.join(hypodd_path, 'event.dat')}",
# )
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/event_{node_i:03d}.sel",
# f"{os.path.join(hypodd_path, 'event.sel')}",
# )
# except Exception as err:
# print(f"ERROR: can not access minio service! \n{err}")
# pass
return 0
def ph2dt(
node_i: int,
config_json: InputPath("json"),
hypodd_phase: InputPath(str),
station_dat: InputPath(str),
ct_file: OutputPath(str),
hypodd_event: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
if not os.path.exists(hypodd_path):
os.mkdir(hypodd_path)
# try:
# minioClient = Minio(s3_url, access_key='minio', secret_key='minio123', secure=secure)
# minioClient.fget_object(bucket_name, f"{config['region']}/hypodd_{node_i:03d}.pha", os.path.join(hypodd_path, f"hypodd_{node_i:03d}.pha"))
# except Exception as err:
# print(f"ERROR: can not access minio service! \n{err}")
# pass
ph2dt = """* ph2dt.inp - input control file for program ph2dt
* Input station file:
stations.dat
* Input phase file:
hypodd.pha
*MINWGHT: min. pick weight allowed [0]
*MAXDIST: max. distance in km between event pair and stations [200]
*MAXSEP: max. hypocentral separation in km [10]
*MAXNGH: max. number of neighbors per event [10]
*MINLNK: min. number of links required to define a neighbor [8]
*MINOBS: min. number of links per pair saved [8]
*MAXOBS: max. number of links per pair saved [20]
*MINWGHT MAXDIST MAXSEP MAXNGH MINLNK MINOBS MAXOBS
0 120 10 50 8 8 100
"""
with open(os.path.join(hypodd_path, "ph2dt.inp"), "w") as fp:
fp.writelines(ph2dt)
def copy_file(fp_from, fp_to):
with open(fp_from, "r") as fp:
lines = fp.readlines()
with open(fp_to, "w") as fp:
fp.writelines(lines)
copy_file(hypodd_phase, os.path.join(hypodd_path, "hypodd.pha"))
copy_file(station_dat, os.path.join(hypodd_path, "stations.dat"))
PH2DT_CMD = f"cd {hypodd_path} && ./ph2dt ph2dt.inp"
print(PH2DT_CMD)
if os.system(PH2DT_CMD) != 0:
raise ("{PH2DT_CMD}" + " failed!")
copy_file(os.path.join(hypodd_path, "dt.ct"), ct_file)
copy_file(os.path.join(hypodd_path, "event.sel"), hypodd_event)
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/dt_{node_i:03d}.ct",
ct_file,
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/event_{node_i:03d}.sel",
hypodd_event,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
# try:
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/dt_{node_i:03d}.ct",
# f"{os.path.join(hypodd_path, 'dt.ct')}",
# )
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/event_{node_i:03d}.dat",
# f"{os.path.join(hypodd_path, 'event.dat')}",
# )
# minioClient.fput_object(
# bucket_name,
# f"{config['region']}/event_{node_i:03d}.sel",
# f"{os.path.join(hypodd_path, 'event.sel')}",
# )
# except Exception as err:
# print(f"ERROR: can not access minio service! \n{err}")
# pass
return 0
In [57]:
Copied!
if run_local:
ph2dt(
0,
root_dir("config.json"),
root_dir("hypodd/hypodd_phase.pha"),
root_dir("hypodd/stations.dat"),
root_dir("hypodd/dt.ct"),
root_dir("hypodd/event.sel"),
root_dir(""),
)
if run_local:
ph2dt(
0,
root_dir("config.json"),
root_dir("hypodd/hypodd_phase.pha"),
root_dir("hypodd/stations.dat"),
root_dir("hypodd/dt.ct"),
root_dir("hypodd/event.sel"),
root_dir(""),
)
cd Demo/hypodd && ./ph2dt ph2dt.inp starting ph2dt (v1.3 - 08/2010)... reading data ... > stations = 46 > events total = 363 > events selected = 363 > phases = 16187 forming dtimes... > P-phase pairs total = 145432 > S-phase pairs total = 173413 > outliers = 3467 ( 1 %) > phases at stations not in station list = 0 > phases at distances larger than MAXDIST = 0 > P-phase pairs selected = 138381 ( 95 %) > S-phase pairs selected = 166806 ( 96 %) > weakly linked events = 86 ( 23 %) > linked event pairs = 9869 > average links per pair = 30 > average offset (km) betw. linked events = 4.81116629 > average offset (km) betw. strongly linked events = 4.81116629 > maximum offset (km) betw. strongly linked events = 9.99739456 Done. Output files: dt.ct; event.dat; event.sel; ph2dt.log ph2dt parameters were: (minwght,maxdist,maxsep,maxngh,minlnk,minobs,maxobs) 0.00000000 120.000000 10.0000000 50 8 8 100 ERROR: can not access minio service! No module named 'minio'
In [58]:
Copied!
ph2dt_op = comp.func_to_container_op(ph2dt, base_image="zhuwq0/hypodd-api:1.0")
ph2dt_op = comp.func_to_container_op(ph2dt, base_image="zhuwq0/hypodd-api:1.0")
7.4. Run HypoDD re-location¶
In [59]:
Copied!
def hypodd_ct(
node_i: int,
config_json: InputPath("json"),
ct_file: InputPath(str),
event: InputPath(str),
station: InputPath(str),
catalog_txt: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
ct_inp = """* RELOC.INP:
*--- input file selection
* cross correlation diff times:
*
*catalog P diff times:
dt.ct
*
* event file:
event.sel
*
* station file:
stations.dat
*
*--- output file selection
* original locations:
hypodd.loc
* relocations:
hypodd.reloc
* station information:
hypodd.sta
* residual information:
hypodd.res
* source paramater information:
hypodd.src
*
*--- data type selection:
* IDAT: 0 = synthetics; 1= cross corr; 2= catalog; 3= cross & cat
* IPHA: 1= P; 2= S; 3= P&S
* DIST:max dist [km] between cluster centroid and station
* IDAT IPHA DIST
2 3 120
*
*--- event clustering:
* OBSCC: min # of obs/pair for crosstime data (0= no clustering)
* OBSCT: min # of obs/pair for network data (0= no clustering)
* OBSCC OBSCT
0 8
*
*--- solution control:
* ISTART: 1 = from single source; 2 = from network sources
* ISOLV: 1 = SVD, 2=lsqr
* NSET: number of sets of iteration with specifications following
* ISTART ISOLV NSET
2 2 4
*
*--- data weighting and re-weighting:
* NITER: last iteration to used the following weights
* WTCCP, WTCCS: weight cross P, S
* WTCTP, WTCTS: weight catalog P, S
* WRCC, WRCT: residual threshold in sec for cross, catalog data
* WDCC, WDCT: max dist [km] between cross, catalog linked pairs
* DAMP: damping (for lsqr only)
* --- CROSS DATA ----- ----CATALOG DATA ----
* NITER WTCCP WTCCS WRCC WDCC WTCTP WTCTS WRCT WDCT DAMP
4 -9 -9 -9 -9 1 1 8 -9 70
4 -9 -9 -9 -9 1 1 6 4 70
4 -9 -9 -9 -9 1 0.8 4 2 70
4 -9 -9 -9 -9 1 0.8 3 2 70
*
*--- 1D model:
* NLAY: number of model layers
* RATIO: vp/vs ratio
* TOP: depths of top of layer (km)
* VEL: layer velocities (km/s)
* NLAY RATIO
12 1.82
* TOP
0.0 1.0 3.0 5.0 7.0 9.0 11.0 13.0 17.0 21.0 31.00 31.10
* VEL
5.30 5.65 5.93 6.20 6.20 6.20 6.20 6.20 6.20 6.20 7.50 8.11
*
*--- event selection:
* CID: cluster to be relocated (0 = all)
* ID: cuspids of event to be relocated (8 per line)
* CID
0
* ID
"""
with open(os.path.join(hypodd_path, "ct.inp"), "w") as fp:
fp.writelines(ct_inp)
def copy_file(fp_from, fp_to):
with open(fp_from, "r") as fp:
lines = fp.readlines()
with open(fp_to, "w") as fp:
fp.writelines(lines)
copy_file(ct_file, os.path.join(hypodd_path, "dt.ct"))
copy_file(event, os.path.join(hypodd_path, "event.sel"))
copy_file(station, os.path.join(hypodd_path, "stations.dat"))
# os.system(f"cat {ct_file}")
# os.system(f"cat {event}")
# os.system(f"cat {station}")
HYPODD_CMD = f"cd {hypodd_path} && ./hypoDD ct.inp"
print(HYPODD_CMD)
if os.system(HYPODD_CMD) != 0:
raise ("{HYPODD_CMD}" + " failed!")
copy_file(os.path.join(hypodd_path, "hypodd.reloc"), catalog_txt)
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/hypodd_ct_{node_i:03d}.reloc",
catalog_txt,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
def hypodd_ct(
node_i: int,
config_json: InputPath("json"),
ct_file: InputPath(str),
event: InputPath(str),
station: InputPath(str),
catalog_txt: OutputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from datetime import datetime
with open(config_json, "r") as fp:
config = json.load(fp)
hypodd_path = os.path.join(data_path, "hypodd")
ct_inp = """* RELOC.INP:
*--- input file selection
* cross correlation diff times:
*
*catalog P diff times:
dt.ct
*
* event file:
event.sel
*
* station file:
stations.dat
*
*--- output file selection
* original locations:
hypodd.loc
* relocations:
hypodd.reloc
* station information:
hypodd.sta
* residual information:
hypodd.res
* source paramater information:
hypodd.src
*
*--- data type selection:
* IDAT: 0 = synthetics; 1= cross corr; 2= catalog; 3= cross & cat
* IPHA: 1= P; 2= S; 3= P&S
* DIST:max dist [km] between cluster centroid and station
* IDAT IPHA DIST
2 3 120
*
*--- event clustering:
* OBSCC: min # of obs/pair for crosstime data (0= no clustering)
* OBSCT: min # of obs/pair for network data (0= no clustering)
* OBSCC OBSCT
0 8
*
*--- solution control:
* ISTART: 1 = from single source; 2 = from network sources
* ISOLV: 1 = SVD, 2=lsqr
* NSET: number of sets of iteration with specifications following
* ISTART ISOLV NSET
2 2 4
*
*--- data weighting and re-weighting:
* NITER: last iteration to used the following weights
* WTCCP, WTCCS: weight cross P, S
* WTCTP, WTCTS: weight catalog P, S
* WRCC, WRCT: residual threshold in sec for cross, catalog data
* WDCC, WDCT: max dist [km] between cross, catalog linked pairs
* DAMP: damping (for lsqr only)
* --- CROSS DATA ----- ----CATALOG DATA ----
* NITER WTCCP WTCCS WRCC WDCC WTCTP WTCTS WRCT WDCT DAMP
4 -9 -9 -9 -9 1 1 8 -9 70
4 -9 -9 -9 -9 1 1 6 4 70
4 -9 -9 -9 -9 1 0.8 4 2 70
4 -9 -9 -9 -9 1 0.8 3 2 70
*
*--- 1D model:
* NLAY: number of model layers
* RATIO: vp/vs ratio
* TOP: depths of top of layer (km)
* VEL: layer velocities (km/s)
* NLAY RATIO
12 1.82
* TOP
0.0 1.0 3.0 5.0 7.0 9.0 11.0 13.0 17.0 21.0 31.00 31.10
* VEL
5.30 5.65 5.93 6.20 6.20 6.20 6.20 6.20 6.20 6.20 7.50 8.11
*
*--- event selection:
* CID: cluster to be relocated (0 = all)
* ID: cuspids of event to be relocated (8 per line)
* CID
0
* ID
"""
with open(os.path.join(hypodd_path, "ct.inp"), "w") as fp:
fp.writelines(ct_inp)
def copy_file(fp_from, fp_to):
with open(fp_from, "r") as fp:
lines = fp.readlines()
with open(fp_to, "w") as fp:
fp.writelines(lines)
copy_file(ct_file, os.path.join(hypodd_path, "dt.ct"))
copy_file(event, os.path.join(hypodd_path, "event.sel"))
copy_file(station, os.path.join(hypodd_path, "stations.dat"))
# os.system(f"cat {ct_file}")
# os.system(f"cat {event}")
# os.system(f"cat {station}")
HYPODD_CMD = f"cd {hypodd_path} && ./hypoDD ct.inp"
print(HYPODD_CMD)
if os.system(HYPODD_CMD) != 0:
raise ("{HYPODD_CMD}" + " failed!")
copy_file(os.path.join(hypodd_path, "hypodd.reloc"), catalog_txt)
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd/hypodd_ct_{node_i:03d}.reloc",
catalog_txt,
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
In [60]:
Copied!
if run_local:
hypodd_ct(
0,
root_dir("config.json"),
root_dir("hypodd/dt.ct"),
root_dir("hypodd/event.sel"),
root_dir("hypodd/stations.dat"),
root_dir("hypodd_ct_catalog.txt"),
root_dir(""),
)
if run_local:
hypodd_ct(
0,
root_dir("config.json"),
root_dir("hypodd/dt.ct"),
root_dir("hypodd/event.sel"),
root_dir("hypodd/stations.dat"),
root_dir("hypodd_ct_catalog.txt"),
root_dir(""),
)
cd Demo/hypodd && ./hypoDD ct.inp starting hypoDD (v1.3 - 11/2010)... Wed Mar 15 14:47:00 2023 INPUT FILES: cross dtime data: catalog dtime data: dt.ct events: event.sel stations: stations.dat OUTPUT FILES: initial locations: hypodd.loc relocated events: hypodd.reloc event pair residuals: hypodd.res station residuals: hypodd.sta source parameters: hypodd.src Relocate all clusters Relocate all events Reading data ... Wed Mar 15 14:47:00 2023 # events = 363 # stations < maxdist = 46 # catalog P dtimes = 138381 # catalog S dtimes = 166806 # dtimes total = 305187 # events after dtime match = 351 # stations = 45 clustering ... Clustered events: 351 Isolated events: 0 # clusters: 1 Cluster 1: 351 events RELOCATION OF CLUSTER: 1 Wed Mar 15 14:47:01 2023 ---------------------- Initial trial sources = 351 IT EV CT RMSCT RMSST DX DY DZ DT OS AQ CND % % ms % ms m m m ms m 1 100 100 230 -21.4 0 798 660 2348 177 0 8 205 2 98 99 229 -0.4 0 773 631 2302 168 0 1 202 3 1 97 99 228 -0.4 410 770 629 2301 167 1128 0 202 4 97 98 187 -18.0 410 407 252 2664 77 1128 13 192 5 93 97 185 -1.5 410 392 230 1314 71 1128 1 189 6 2 93 97 184 -0.5 383 389 229 1287 71 958 0 189 7 93 96 163 -11.1 383 254 135 666 44 958 3 180 8 3 92 95 162 -1.0 335 248 134 645 43 551 0 177 9 92 95 152 -6.0 335 188 89 912 28 551 2 175 10 4 91 94 151 -0.5 327 189 90 526 27 697 0 174 11 5 87 49 135 -10.4 269 167 116 379 30 735 0 135 12 6 86 46 115 -14.9 236 92 64 245 17 771 0 131 13 7 85 45 109 -5.8 222 60 42 158 12 750 0 129 14 8 85 44 106 -2.7 219 39 26 99 9 728 0 121 15 9 79 14 76 -27.8 146 43 35 83 7 568 0 66 16 10 78 13 64 -15.9 120 30 24 59 5 573 0 64 17 11 78 13 60 -7.0 112 22 18 47 4 563 0 60 18 12 77 12 57 -4.2 108 16 14 34 3 561 0 60 19 13 77 12 47 -17.6 86 17 15 33 3 558 0 59 20 14 77 11 43 -7.6 78 16 13 28 2 557 0 57 21 15 77 11 41 -5.1 75 14 11 26 2 560 0 54 22 16 77 11 40 -3.5 72 12 9 21 2 557 0 55 writing out results ... ERROR: can not access minio service! No module named 'minio'
In [61]:
Copied!
hypodd_ct_op = comp.func_to_container_op(hypodd_ct, base_image="zhuwq0/hypodd-api:1.0")
hypodd_ct_op = comp.func_to_container_op(hypodd_ct, base_image="zhuwq0/hypodd-api:1.0")
In [62]:
Copied!
if run_local:
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd
catalog_hypodd = pd.read_csv(
root_dir(f"hypodd/hypodd.reloc"),
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
catalog_hypodd["time"] = catalog_hypodd.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
catalog_hypodd["time"] = catalog_hypodd["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
plt.figure()
plt.plot(catalog_hypodd["LON"], catalog_hypodd["LAT"], ".")
plt.show()
if run_local:
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd
catalog_hypodd = pd.read_csv(
root_dir(f"hypodd/hypodd.reloc"),
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
catalog_hypodd["time"] = catalog_hypodd.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
catalog_hypodd["time"] = catalog_hypodd["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
plt.figure()
plt.plot(catalog_hypodd["LON"], catalog_hypodd["LAT"], ".")
plt.show()
In [ ]:
Copied!
def cross_correlation(
ct_file: InputPath(str),
catalog_file: InputPath(str),
picks_file: InputPath(str),
cc_file: OutputPath(str),
):
import time
from multiprocessing import Manager, Process
import numpy as np
import pandas as pd
from pymongo import MongoClient
from tqdm import tqdm
catalog = pd.read_csv(
catalog_file,
sep="\t",
parse_dates=["time"],
index_col=["event_idx"],
dtype={"event_idx": str},
)
picks = pd.read_csv(picks_file, sep="\t", parse_dates=["timestamp"], dtype={"event_idx": str})
picks["station"] = picks["id"].apply(lambda x: x.split(".")[1] + x.split(".")[3])
picks = picks.set_index(["event_idx", "station", "type"])
picks = picks.sort_index()
pick_index = 100
lo = pick_index - 50
hi = pick_index + 100
dt = 0.01
ct_dict = Manager().dict()
cc_dict = Manager().dict()
with open(ct_file) as fct:
meta = fct.readlines()
for i, line in enumerate(meta):
if line[0] == "#":
if i > 0:
ct_dict[key] = value
key = line
value = []
continue
value.append(line)
ct_dict[key] = value
keys = sorted(list(ct_dict.keys()))
def calc_cross_correlation(keys, ct_dict, cc_dict):
username = "root"
password = "quakeflow123"
# client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
client = MongoClient(f"mongodb://{username}:{password}@quakeflow-mongodb.default.svc.cluster.local:27017")
db = client["quakeflow"]
collection = db["waveform"]
# normalize = lambda x: (x - np.mean(x, axis=0, keepdims=True)) / np.std(x, axis=0, keepdims=True)
for key in keys:
tmp = key.split()
ID1, ID2 = tmp[1], tmp[2]
key_cc = f"# {ID1} {ID2} 0.0\n"
lines_cc = []
for line in ct_dict[key]:
tmp = line.split()
STA, TT1, TT2, WGT, PHA = (
tmp[0],
tmp[1],
tmp[2],
tmp[3],
tmp[4],
) ##HypoDD format
for i, row1 in picks.loc[(ID1, STA, PHA)].iterrows():
data1 = collection.find_one(
{"_id": f"{row1['id']}_{row1['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
)
for j, row2 in picks.loc[(ID2, STA, PHA)].iterrows():
data2 = collection.find_one(
{"_id": f"{row2['id']}_{row2['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
)
# if PHA == "P": # Z
# waveform1 = np.array(data1["waveform"])[lo:hi, -1:]
# waveform2 = np.array(data2["waveform"])[lo:hi, -1:]
# elif PHA == "S": # E, N
# waveform1 = np.array(data1["waveform"])[lo:hi, :-1]
# waveform2 = np.array(data2["waveform"])[lo:hi, :-1]
# else:
# raise (Exception("PHA must be P or S"))
waveform1 = np.array(data1["waveform"])[lo:hi, :]
waveform2 = np.array(data2["waveform"])[lo:hi, :]
cc = np.zeros(waveform1.shape[0])
for k in range(waveform1.shape[1]):
cc += np.correlate(waveform1[:, k], waveform2[:, k], mode="same")
norm = np.sqrt(np.sum(waveform1**2) * np.sum(waveform2**2))
if norm == 0:
continue
else:
cc /= norm
shift = (np.argmax(np.abs(cc)) - waveform1.shape[0] // 2) * dt + float(TT1) - float(TT2)
coeff = np.max(np.abs(cc))
if not np.isnan(coeff):
lines_cc.append(f"{STA:<7s} {shift:.5f} {coeff:.3f} {PHA}\n")
cc_dict[key_cc] = lines_cc
return 0
t0 = time.time()
processes = []
num_process = 16
for i in range(num_process):
p = Process(target=calc_cross_correlation, args=(keys[i::num_process], ct_dict, cc_dict))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"{num_process} process: time = {time.time()-t0:.1f}")
with open(cc_file, "w") as fcc:
for key in cc_dict:
fcc.write(key)
for line in cc_dict[key]:
fcc.write(line)
def cross_correlation(
ct_file: InputPath(str),
catalog_file: InputPath(str),
picks_file: InputPath(str),
cc_file: OutputPath(str),
):
import time
from multiprocessing import Manager, Process
import numpy as np
import pandas as pd
from pymongo import MongoClient
from tqdm import tqdm
catalog = pd.read_csv(
catalog_file,
sep="\t",
parse_dates=["time"],
index_col=["event_idx"],
dtype={"event_idx": str},
)
picks = pd.read_csv(picks_file, sep="\t", parse_dates=["timestamp"], dtype={"event_idx": str})
picks["station"] = picks["id"].apply(lambda x: x.split(".")[1] + x.split(".")[3])
picks = picks.set_index(["event_idx", "station", "type"])
picks = picks.sort_index()
pick_index = 100
lo = pick_index - 50
hi = pick_index + 100
dt = 0.01
ct_dict = Manager().dict()
cc_dict = Manager().dict()
with open(ct_file) as fct:
meta = fct.readlines()
for i, line in enumerate(meta):
if line[0] == "#":
if i > 0:
ct_dict[key] = value
key = line
value = []
continue
value.append(line)
ct_dict[key] = value
keys = sorted(list(ct_dict.keys()))
def calc_cross_correlation(keys, ct_dict, cc_dict):
username = "root"
password = "quakeflow123"
# client = MongoClient(f"mongodb://{username}:{password}@127.0.0.1:27017")
client = MongoClient(f"mongodb://{username}:{password}@quakeflow-mongodb.default.svc.cluster.local:27017")
db = client["quakeflow"]
collection = db["waveform"]
# normalize = lambda x: (x - np.mean(x, axis=0, keepdims=True)) / np.std(x, axis=0, keepdims=True)
for key in keys:
tmp = key.split()
ID1, ID2 = tmp[1], tmp[2]
key_cc = f"# {ID1} {ID2} 0.0\n"
lines_cc = []
for line in ct_dict[key]:
tmp = line.split()
STA, TT1, TT2, WGT, PHA = (
tmp[0],
tmp[1],
tmp[2],
tmp[3],
tmp[4],
) ##HypoDD format
for i, row1 in picks.loc[(ID1, STA, PHA)].iterrows():
data1 = collection.find_one(
{"_id": f"{row1['id']}_{row1['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
)
for j, row2 in picks.loc[(ID2, STA, PHA)].iterrows():
data2 = collection.find_one(
{"_id": f"{row2['id']}_{row2['timestamp'].isoformat(timespec='milliseconds')}_{PHA}"}
)
# if PHA == "P": # Z
# waveform1 = np.array(data1["waveform"])[lo:hi, -1:]
# waveform2 = np.array(data2["waveform"])[lo:hi, -1:]
# elif PHA == "S": # E, N
# waveform1 = np.array(data1["waveform"])[lo:hi, :-1]
# waveform2 = np.array(data2["waveform"])[lo:hi, :-1]
# else:
# raise (Exception("PHA must be P or S"))
waveform1 = np.array(data1["waveform"])[lo:hi, :]
waveform2 = np.array(data2["waveform"])[lo:hi, :]
cc = np.zeros(waveform1.shape[0])
for k in range(waveform1.shape[1]):
cc += np.correlate(waveform1[:, k], waveform2[:, k], mode="same")
norm = np.sqrt(np.sum(waveform1**2) * np.sum(waveform2**2))
if norm == 0:
continue
else:
cc /= norm
shift = (np.argmax(np.abs(cc)) - waveform1.shape[0] // 2) * dt + float(TT1) - float(TT2)
coeff = np.max(np.abs(cc))
if not np.isnan(coeff):
lines_cc.append(f"{STA:<7s} {shift:.5f} {coeff:.3f} {PHA}\n")
cc_dict[key_cc] = lines_cc
return 0
t0 = time.time()
processes = []
num_process = 16
for i in range(num_process):
p = Process(target=calc_cross_correlation, args=(keys[i::num_process], ct_dict, cc_dict))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"{num_process} process: time = {time.time()-t0:.1f}")
with open(cc_file, "w") as fcc:
for key in cc_dict:
fcc.write(key)
for line in cc_dict[key]:
fcc.write(line)
In [ ]:
Copied!
cc_op = comp.func_to_container_op(
cross_correlation,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio", "pymongo"],
)
cc_op = comp.func_to_container_op(
cross_correlation,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio", "pymongo"],
)
In [ ]:
Copied!
def hypodd_cc(
i: int,
config_json: InputPath("json"),
ct_file: InputPath(str),
cc_file: InputPath(str),
event: InputPath(str),
station: InputPath(str),
inp_file: str = "hypodd_cc.inp",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from minio import Minio
with open(config_json, "r") as fp:
config = json.load(fp)
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
os.system(f"cat {ct_file} > dt.ct")
os.system(f"cat {cc_file} > dt.cc")
os.system(f"cat {event} > event.sel")
os.system(f"cat {station} > stations_hypodd.dat ")
HYPODD_CMD = f"HYPODD/src/hypodd/hypodd {inp_file}"
if os.system(HYPODD_CMD) != 0:
raise ("{HYPODD_CMD}" + " failed!")
os.system(f"mv hypodd.reloc hypodd_cc_{i:03d}.reloc")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_{i:03d}.reloc",
f"hypodd_cc_{i:03d}.reloc",
)
def hypodd_cc(
i: int,
config_json: InputPath("json"),
ct_file: InputPath(str),
cc_file: InputPath(str),
event: InputPath(str),
station: InputPath(str),
inp_file: str = "hypodd_cc.inp",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from minio import Minio
with open(config_json, "r") as fp:
config = json.load(fp)
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
os.system(f"cat {ct_file} > dt.ct")
os.system(f"cat {cc_file} > dt.cc")
os.system(f"cat {event} > event.sel")
os.system(f"cat {station} > stations_hypodd.dat ")
HYPODD_CMD = f"HYPODD/src/hypodd/hypodd {inp_file}"
if os.system(HYPODD_CMD) != 0:
raise ("{HYPODD_CMD}" + " failed!")
os.system(f"mv hypodd.reloc hypodd_cc_{i:03d}.reloc")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_{i:03d}.reloc",
f"hypodd_cc_{i:03d}.reloc",
)
In [ ]:
Copied!
hypodd_cc_op = comp.func_to_container_op(hypodd_cc, base_image="zhuwq0/hypodd-api:1.0")
hypodd_cc_op = comp.func_to_container_op(hypodd_cc, base_image="zhuwq0/hypodd-api:1.0")
Merge hypodd results¶
In [ ]:
Copied!
def merge_hypodd(
index: list,
config_json: InputPath("json"),
catalog_ct: OutputPath(str),
catalog_cc: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from glob import glob
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
with open(config_json, "r") as fp:
config = json.load(fp)
objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/hypodd/hypodd_", recursive=True)
tmp_path = lambda x: os.path.join("/tmp/", x)
for obj in objects:
print(obj._object_name)
minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
# tmp_ct_catalogs = sorted(glob(tmp_path("hypodd_ct_*.reloc")))
hypodd_ct_catalogs = [tmp_path(f"hypodd_ct_{i:03d}.reloc") for i in index]
print(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
os.system(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_ct_catalog.txt",
tmp_path("hypodd_ct_catalog.txt"),
)
os.system(f"cat {tmp_path('hypodd_ct_catalog.txt')} > {catalog_ct}")
# hypodd_cc_catalogs = sorted(glob(tmp_path("hypodd_cc_*.reloc")))
hypodd_cc_catalogs = [tmp_path(f"hypodd_cc_{i:03d}.reloc") for i in index]
print(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
os.system(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_catalog.txt",
tmp_path("hypodd_cc_catalog.txt"),
)
os.system(f"cat {tmp_path('hypodd_cc_catalog.txt')} > {catalog_cc}")
return 0
def merge_hypodd(
index: list,
config_json: InputPath("json"),
catalog_ct: OutputPath(str),
catalog_cc: OutputPath(str),
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import json
import os
from glob import glob
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
with open(config_json, "r") as fp:
config = json.load(fp)
objects = minioClient.list_objects(bucket_name, prefix=f"{config['region']}/hypodd/hypodd_", recursive=True)
tmp_path = lambda x: os.path.join("/tmp/", x)
for obj in objects:
print(obj._object_name)
minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name.split("/")[-1]))
# tmp_ct_catalogs = sorted(glob(tmp_path("hypodd_ct_*.reloc")))
hypodd_ct_catalogs = [tmp_path(f"hypodd_ct_{i:03d}.reloc") for i in index]
print(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
os.system(f"cat {' '.join(hypodd_ct_catalogs)} > {tmp_path('hypodd_ct_catalog.txt')}")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_ct_catalog.txt",
tmp_path("hypodd_ct_catalog.txt"),
)
os.system(f"cat {tmp_path('hypodd_ct_catalog.txt')} > {catalog_ct}")
# hypodd_cc_catalogs = sorted(glob(tmp_path("hypodd_cc_*.reloc")))
hypodd_cc_catalogs = [tmp_path(f"hypodd_cc_{i:03d}.reloc") for i in index]
print(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
os.system(f"cat {' '.join(hypodd_cc_catalogs)} > {tmp_path('hypodd_cc_catalog.txt')}")
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_catalog.txt",
tmp_path("hypodd_cc_catalog.txt"),
)
os.system(f"cat {tmp_path('hypodd_cc_catalog.txt')} > {catalog_cc}")
return 0
In [ ]:
Copied!
merge_hypodd_op = comp.func_to_container_op(
merge_hypodd,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
merge_hypodd_op = comp.func_to_container_op(
merge_hypodd,
base_image="python:3.8",
packages_to_install=["pandas", "tqdm", "minio"],
)
8. Visulization¶
In [ ]:
Copied!
def visulization(
config_json: InputPath("json"),
hypodd_catalog_ct: InputPath(str),
hypodd_catalog_cc: InputPath(str),
gamma_catalog: InputPath(str),
standard_catalog: InputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import pandas as pd
import plotly.graph_objects as go
import numpy as np
import json
import os
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
with open(config_json, "r") as fp:
config = json.load(fp)
def plot3d(x, y, z, fig_name):
fig = go.Figure(
data=[
go.Scatter3d(
x=x,
y=y,
z=z,
mode="markers",
marker=dict(size=3.0, color=-z, cmin=-60, cmax=2, colorscale="Viridis", opacity=0.8),
)
],
)
fig.update_layout(
scene=dict(
xaxis=dict(
nticks=4,
range=config["xlim_degree"],
),
yaxis=dict(
nticks=4,
range=config["ylim_degree"],
),
zaxis=dict(
nticks=4,
# range=[z.max(), z.min()],
range=[60, -2],
),
# aspectratio = dict(x=(xlim[1]-xlim[0])/2, y=(ylim[1]-ylim[0])/2, z=1),
aspectratio=dict(x=1, y=1, z=0.5),
),
margin=dict(r=0, l=0, b=0, t=0),
)
fig.write_html(fig_name)
hypodd_ct_catalog = pd.read_csv(
hypodd_catalog_ct,
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
hypodd_ct_catalog["time"] = hypodd_ct_catalog.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
hypodd_ct_catalog["magnitude"] = hypodd_ct_catalog["MAG"]
hypodd_ct_catalog["time"] = hypodd_ct_catalog["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
plot3d(
hypodd_ct_catalog["LON"],
hypodd_ct_catalog["LAT"],
hypodd_ct_catalog["DEPTH"],
f"{data_path}/hypodd_ct_catalog.html",
)
gamma_catalog = pd.read_csv(gamma_catalog, parse_dates=["time"])
gamma_catalog["depth_km"] = gamma_catalog["depth(m)"] / 1e3
plot3d(
gamma_catalog["longitude"],
gamma_catalog["latitude"],
gamma_catalog["depth_km"],
f"{data_path}/gamma_catalog.html",
)
standard_catalog = pd.read_csv(standard_catalog, parse_dates=["time"])
standard_catalog["depth_km"] = standard_catalog["depth(m)"] / 1e3
plot3d(
standard_catalog["longitude"],
standard_catalog["latitude"],
standard_catalog["depth_km"],
f"{data_path}/standard_catalog.html",
)
## histogram
bins = 30
config["starttime"] = datetime.fromisoformat(config["starttime"])
config["endtime"] = datetime.fromisoformat(config["endtime"])
fig, ax = plt.subplots()
ax.hist(gamma_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=1.0, linewidth=0.5, label=f"GaMMA: {len(gamma_catalog)}")
ax.hist(hypodd_ct_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.8, linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog)}")
ax.hist(standard_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.6, linewidth=0.5, label=f"Standard: {len(standard_catalog)}")
# ax.set_xlabel("Date")
ax.set_ylabel("Frequency")
ax.autoscale(enable=True, axis='x', tight=True)
# ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
fig.autofmt_xdate()
ax.legend()
fig.savefig(f"{data_path}/earthquake_frequency_time.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_number.pdf", bbox_inches="tight")
plt.show()
fig, ax = plt.subplots()
xrange = (-1., max(standard_catalog["magnitude"].max(), gamma_catalog["magnitude"].max()))
ax.hist(gamma_catalog["magnitude"], range=xrange, bins=bins, alpha=1.0, edgecolor="k", linewidth=0.5, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
ax.hist(hypodd_ct_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6, edgecolor="k", linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
ax.hist(standard_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6, edgecolor="k", linewidth=0.5, label=f"Standard: {len(standard_catalog['magnitude'])}")
ax.legend()
ax.autoscale(enable=True, axis='x', tight=True)
ax.set_xlabel("Magnitude")
ax.set_ylabel("Frequency")
ax.set_yscale('log')
fig.savefig(f"{data_path}/earthquake_magnitude_frequency.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_magnitude_frequency.pdf", bbox_inches="tight")
plt.show()
fig, ax = plt.subplots()
ax.plot(gamma_catalog["time"], gamma_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
ax.plot(hypodd_ct_catalog["time"], hypodd_ct_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
ax.plot(standard_catalog["time"], standard_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"Standard: {len(standard_catalog['magnitude'])}")
ax.set_xlim(config["starttime"], config["endtime"])
ax.set_ylabel("Magnitude")
# ax.set_xlabel("Date")
ax.set_ylim(bottom=-1)
ax.legend(markerscale=2)
ax.grid()
# ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
fig.autofmt_xdate()
fig.savefig(f"{data_path}/earthquake_magnitude_time.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_magnitude_time.pdf", bbox_inches="tight", dpi=300)
plt.show()
try:
hypodd_cc_catalog = pd.read_csv(
hypodd_catalog_cc,
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
hypodd_cc_catalog["time"] = hypodd_cc_catalog.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
hypodd_cc_catalog["time"] = hypodd_cc_catalog["time"].apply(
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f")
)
plot3d(
hypodd_cc_catalog["LON"],
hypodd_cc_catalog["LAT"],
hypodd_cc_catalog["DEPTH"],
f"{data_path}/hypodd_cc_catalog.html",
)
except Exception as err:
print(f"{err}")
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_ct_catalog.html",
f"{data_path}/hypodd_ct_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_catalog.html",
f"{data_path}/gamma_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/standard_catalog.html",
f"{data_path}/standard_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_frequency_time.png",
f"{data_path}/earthquake_frequency_time.png",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_magnitude_frequency.png",
f"{data_path}/earthquake_magnitude_frequency.png",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_magnitude_time.png",
f"{data_path}/earthquake_magnitude_time.png",
)
if os.path.exists(f"{data_path}/hypodd_cc_catalog.html"):
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_catalog.html",
f"{data_path}/hypodd_cc_catalog.html",
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
def visulization(
config_json: InputPath("json"),
hypodd_catalog_ct: InputPath(str),
hypodd_catalog_cc: InputPath(str),
gamma_catalog: InputPath(str),
standard_catalog: InputPath(str),
data_path: str = "./",
bucket_name: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = True,
):
import pandas as pd
import plotly.graph_objects as go
import numpy as np
import json
import os
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
with open(config_json, "r") as fp:
config = json.load(fp)
def plot3d(x, y, z, fig_name):
fig = go.Figure(
data=[
go.Scatter3d(
x=x,
y=y,
z=z,
mode="markers",
marker=dict(size=3.0, color=-z, cmin=-60, cmax=2, colorscale="Viridis", opacity=0.8),
)
],
)
fig.update_layout(
scene=dict(
xaxis=dict(
nticks=4,
range=config["xlim_degree"],
),
yaxis=dict(
nticks=4,
range=config["ylim_degree"],
),
zaxis=dict(
nticks=4,
# range=[z.max(), z.min()],
range=[60, -2],
),
# aspectratio = dict(x=(xlim[1]-xlim[0])/2, y=(ylim[1]-ylim[0])/2, z=1),
aspectratio=dict(x=1, y=1, z=0.5),
),
margin=dict(r=0, l=0, b=0, t=0),
)
fig.write_html(fig_name)
hypodd_ct_catalog = pd.read_csv(
hypodd_catalog_ct,
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
hypodd_ct_catalog["time"] = hypodd_ct_catalog.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
hypodd_ct_catalog["magnitude"] = hypodd_ct_catalog["MAG"]
hypodd_ct_catalog["time"] = hypodd_ct_catalog["time"].apply(lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"))
plot3d(
hypodd_ct_catalog["LON"],
hypodd_ct_catalog["LAT"],
hypodd_ct_catalog["DEPTH"],
f"{data_path}/hypodd_ct_catalog.html",
)
gamma_catalog = pd.read_csv(gamma_catalog, parse_dates=["time"])
gamma_catalog["depth_km"] = gamma_catalog["depth(m)"] / 1e3
plot3d(
gamma_catalog["longitude"],
gamma_catalog["latitude"],
gamma_catalog["depth_km"],
f"{data_path}/gamma_catalog.html",
)
standard_catalog = pd.read_csv(standard_catalog, parse_dates=["time"])
standard_catalog["depth_km"] = standard_catalog["depth(m)"] / 1e3
plot3d(
standard_catalog["longitude"],
standard_catalog["latitude"],
standard_catalog["depth_km"],
f"{data_path}/standard_catalog.html",
)
## histogram
bins = 30
config["starttime"] = datetime.fromisoformat(config["starttime"])
config["endtime"] = datetime.fromisoformat(config["endtime"])
fig, ax = plt.subplots()
ax.hist(gamma_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=1.0, linewidth=0.5, label=f"GaMMA: {len(gamma_catalog)}")
ax.hist(hypodd_ct_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.8, linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog)}")
ax.hist(standard_catalog["time"], range=(config["starttime"], config["endtime"]), bins=bins, edgecolor="k", alpha=0.6, linewidth=0.5, label=f"Standard: {len(standard_catalog)}")
# ax.set_xlabel("Date")
ax.set_ylabel("Frequency")
ax.autoscale(enable=True, axis='x', tight=True)
# ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
fig.autofmt_xdate()
ax.legend()
fig.savefig(f"{data_path}/earthquake_frequency_time.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_number.pdf", bbox_inches="tight")
plt.show()
fig, ax = plt.subplots()
xrange = (-1., max(standard_catalog["magnitude"].max(), gamma_catalog["magnitude"].max()))
ax.hist(gamma_catalog["magnitude"], range=xrange, bins=bins, alpha=1.0, edgecolor="k", linewidth=0.5, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
ax.hist(hypodd_ct_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6, edgecolor="k", linewidth=0.5, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
ax.hist(standard_catalog["magnitude"], range=xrange, bins=bins, alpha=0.6, edgecolor="k", linewidth=0.5, label=f"Standard: {len(standard_catalog['magnitude'])}")
ax.legend()
ax.autoscale(enable=True, axis='x', tight=True)
ax.set_xlabel("Magnitude")
ax.set_ylabel("Frequency")
ax.set_yscale('log')
fig.savefig(f"{data_path}/earthquake_magnitude_frequency.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_magnitude_frequency.pdf", bbox_inches="tight")
plt.show()
fig, ax = plt.subplots()
ax.plot(gamma_catalog["time"], gamma_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"GaMMA: {len(gamma_catalog['magnitude'])}")
ax.plot(hypodd_ct_catalog["time"], hypodd_ct_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"HypoDD: {len(hypodd_ct_catalog['magnitude'])}")
ax.plot(standard_catalog["time"], standard_catalog["magnitude"], '.', markersize=5.0, alpha=1.0, rasterized=True, label=f"Standard: {len(standard_catalog['magnitude'])}")
ax.set_xlim(config["starttime"], config["endtime"])
ax.set_ylabel("Magnitude")
# ax.set_xlabel("Date")
ax.set_ylim(bottom=-1)
ax.legend(markerscale=2)
ax.grid()
# ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
fig.autofmt_xdate()
fig.savefig(f"{data_path}/earthquake_magnitude_time.png", bbox_inches="tight", dpi=300)
# fig.savefig(f"{data_path}/earthquake_magnitude_time.pdf", bbox_inches="tight", dpi=300)
plt.show()
try:
hypodd_cc_catalog = pd.read_csv(
hypodd_catalog_cc,
sep="\s+",
names=[
"ID",
"LAT",
"LON",
"DEPTH",
"X",
"Y",
"Z",
"EX",
"EY",
"EZ",
"YR",
"MO",
"DY",
"HR",
"MI",
"SC",
"MAG",
"NCCP",
"NCCS",
"NCTP",
"NCTS",
"RCC",
"RCT",
"CID",
],
)
hypodd_cc_catalog["time"] = hypodd_cc_catalog.apply(
lambda x: f'{x["YR"]:04.0f}-{x["MO"]:02.0f}-{x["DY"]:02.0f}T{x["HR"]:02.0f}:{x["MI"]:02.0f}:{min(x["SC"], 59.999):05.3f}',
axis=1,
)
hypodd_cc_catalog["time"] = hypodd_cc_catalog["time"].apply(
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f")
)
plot3d(
hypodd_cc_catalog["LON"],
hypodd_cc_catalog["LAT"],
hypodd_cc_catalog["DEPTH"],
f"{data_path}/hypodd_cc_catalog.html",
)
except Exception as err:
print(f"{err}")
## upload to s3 bucket
try:
from minio import Minio
minioClient = Minio(s3_url, access_key="minio", secret_key="minio123", secure=secure)
if not minioClient.bucket_exists(bucket_name):
minioClient.make_bucket(bucket_name)
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_ct_catalog.html",
f"{data_path}/hypodd_ct_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/gamma_catalog.html",
f"{data_path}/gamma_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/standard_catalog.html",
f"{data_path}/standard_catalog.html",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_frequency_time.png",
f"{data_path}/earthquake_frequency_time.png",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_magnitude_frequency.png",
f"{data_path}/earthquake_magnitude_frequency.png",
)
minioClient.fput_object(
bucket_name,
f"{config['region']}/earthquake_magnitude_time.png",
f"{data_path}/earthquake_magnitude_time.png",
)
if os.path.exists(f"{data_path}/hypodd_cc_catalog.html"):
minioClient.fput_object(
bucket_name,
f"{config['region']}/hypodd_cc_catalog.html",
f"{data_path}/hypodd_cc_catalog.html",
)
except Exception as err:
print(f"ERROR: can not access minio service! \n{err}")
pass
In [ ]:
Copied!
if run_local:
visulization(
root_dir("config.json"),
root_dir("hypodd_ct_catalog.txt"),
root_dir("hypodd_cc_catalog.txt"),
root_dir("gamma_catalog.csv"),
root_dir("standard_catalog.csv"),
root_dir(""),
)
if run_local:
visulization(
root_dir("config.json"),
root_dir("hypodd_ct_catalog.txt"),
root_dir("hypodd_cc_catalog.txt"),
root_dir("gamma_catalog.csv"),
root_dir("standard_catalog.csv"),
root_dir(""),
)
In [ ]:
Copied!
visulization_op = comp.func_to_container_op(
visulization,
base_image="python:3.8",
packages_to_install=["matplotlib", "pandas", "plotly", "minio"],
)
visulization_op = comp.func_to_container_op(
visulization,
base_image="python:3.8",
packages_to_install=["matplotlib", "pandas", "plotly", "minio"],
)
9. Parallel process on cloud¶
In [ ]:
Copied!
@dsl.pipeline(name="QuakeFlow", description="")
def quakeflow_pipeline(
data_path: str = "/tmp/",
num_parallel=0,
bucket_catalog: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = False,
):
config = config_op(
region_name, num_parallel, bucket_name=f"catalogs", s3_url=s3_url, secure=secure
).add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
events = (
download_events_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(3)
.set_memory_request("1G")
.set_display_name("Download Events")
)
stations = (
download_stations_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(3)
.set_display_name("Download Stations")
)
with kfp.dsl.ParallelFor(config.outputs["output"]) as i:
vop_ = dsl.VolumeOp(
name=f"Create volume {region_name}",
resource_name=f"data-volume-{str(i)}",
size="100Gi",
modes=dsl.VOLUME_MODE_RWO,
)
# ).set_retry(3)
download_op_ = (
download_waveform_op(
i,
config.outputs["index_json"],
config.outputs["config_json"],
config.outputs["datetime_json"],
stations.outputs["station_pkl"],
data_path=data_path,
bucket_name=f"waveforms",
s3_url=s3_url,
secure=secure,
)
.add_pvolumes({data_path: vop_.volume})
.set_cpu_request("700m")
# .set_cpu_request("350m")
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(5)
.set_display_name("Download Waveforms")
)
download_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
phasenet_op_ = (
phasenet_op(
download_op_.outputs["Output"],
download_op_.outputs["fname_csv"],
stations.outputs["station_json"],
)
.add_pvolumes({data_path: download_op_.pvolume})
.set_memory_request("9G")
.set_retry(5)
.set_display_name("PhaseNet Picking")
)
phasenet_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
phasenet_op_.set_image_pull_policy("Always")
gamma_op_ = (
gamma_op(
i,
config.outputs["index_json"],
config.outputs["config_json"],
phasenet_op_.outputs["picks"],
stations.outputs["station_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.set_cpu_request("800m")
.set_retry(3)
.set_display_name("GaMMA Association")
)
gamma_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
merge_op_ = (
merge_op(
config.outputs["index_json"],
config.outputs["config_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.after(gamma_op_)
.set_memory_request("12G")
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-64g")
.set_display_name("Merge Catalog")
)
merge_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
convert_station_op_ = (
convert_station_op(
config_json=config.outputs["config_json"],
station_json=stations.outputs["station_json"],
bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_display_name("Convert Station Format")
)
split_hypodd_op_ = (
split_hypodd_op(
config.outputs["config_json"],
catalog_csv=merge_op_.outputs["catalog_csv"],
)
.after(merge_op_)
.set_display_name("Split Catalog")
)
split_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
split_hypodd_op_.set_image_pull_policy("Always")
with kfp.dsl.ParallelFor(split_hypodd_op_.outputs["output"]) as i:
convert_phase_op_ = convert_phase_op(
i,
config_json=config.outputs["config_json"],
picks_csv=merge_op_.outputs["picks_csv"],
catalog_csv=merge_op_.outputs["catalog_csv"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("Convert Phase Format")
convert_phase_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
convert_phase_op_.set_image_pull_policy("Always")
ph2dt_op_ = ph2dt_op(
i,
config_json=config.outputs["config_json"],
hypodd_phase=convert_phase_op_.outputs["hypodd_phase"],
station_dat=convert_station_op_.outputs["hypodd_station"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("PH2DT")
ph2dt_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
ph2dt_op_.set_image_pull_policy("Always")
hypodd_ct_op_ = hypodd_ct_op(
i,
config_json=config.outputs["config_json"],
ct=ph2dt_op_.outputs["ct"],
event=ph2dt_op_.outputs["hypodd_event"],
station=convert_station_op_.outputs["hypodd_station"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("HypoDD")
hypodd_ct_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
hypodd_ct_op_.set_image_pull_policy("Always")
# cc_op_ = cc_op(
# ct=ph2dt_op_.outputs["ct"],
# picks=merge_op_.outputs["picks_csv"],
# catalog=merge_op_.outputs["catalog_csv"],
# ).set_display_name('Cross Correlation')
# cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
# cc_op_.set_image_pull_policy("Always")
# hypodd_cc_op_ = hypodd_cc_op(
# i,
# config_json=config.outputs["config_json"],
# ct=ph2dt_op_.outputs["ct"],
# cc=cc_op_.outputs["cc"],
# event=ph2dt_op_.outputs["hypodd_event"],
# station=convert_station_op_.outputs["hypodd_station"],
# bucket_name="catalogs",
# s3_url=s3_url,
# secure=secure,
# ).set_display_name('HypoDD + CC')
# hypodd_cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
# hypodd_cc_op_.set_image_pull_policy("Always")
merge_hypodd_op_ = (
merge_hypodd_op(
split_hypodd_op_.outputs["output"],
config_json=config.outputs["config_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
).after(hypodd_ct_op_)
# .after(hypodd_cc_op_)
.set_display_name("Merge Catalog")
)
merge_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
merge_hypodd_op_.set_image_pull_policy("Always")
visulization_op_ = (
visulization_op(
config_json=config.outputs["config_json"],
hypodd_catalog_ct=merge_hypodd_op_.outputs["catalog_ct"],
hypodd_catalog_cc=merge_hypodd_op_.outputs["catalog_cc"],
gamma_catalog=merge_op_.outputs["catalog_csv"],
standard_catalog=events.outputs["standard_catalog"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.after(merge_hypodd_op_)
.set_display_name("Visulization")
)
visulization_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
visulization_op_.set_image_pull_policy("Always")
# vop_.delete().after(merge_hypodd_op_)
@dsl.pipeline(name="QuakeFlow", description="")
def quakeflow_pipeline(
data_path: str = "/tmp/",
num_parallel=0,
bucket_catalog: str = "catalogs",
s3_url: str = "minio-service:9000",
secure: bool = False,
):
config = config_op(
region_name, num_parallel, bucket_name=f"catalogs", s3_url=s3_url, secure=secure
).add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
events = (
download_events_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(3)
.set_memory_request("1G")
.set_display_name("Download Events")
)
stations = (
download_stations_op(config.outputs["config_json"], bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(3)
.set_display_name("Download Stations")
)
with kfp.dsl.ParallelFor(config.outputs["output"]) as i:
vop_ = dsl.VolumeOp(
name=f"Create volume {region_name}",
resource_name=f"data-volume-{str(i)}",
size="100Gi",
modes=dsl.VOLUME_MODE_RWO,
)
# ).set_retry(3)
download_op_ = (
download_waveform_op(
i,
config.outputs["index_json"],
config.outputs["config_json"],
config.outputs["datetime_json"],
stations.outputs["station_pkl"],
data_path=data_path,
bucket_name=f"waveforms",
s3_url=s3_url,
secure=secure,
)
.add_pvolumes({data_path: vop_.volume})
.set_cpu_request("700m")
# .set_cpu_request("350m")
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_retry(5)
.set_display_name("Download Waveforms")
)
download_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
phasenet_op_ = (
phasenet_op(
download_op_.outputs["Output"],
download_op_.outputs["fname_csv"],
stations.outputs["station_json"],
)
.add_pvolumes({data_path: download_op_.pvolume})
.set_memory_request("9G")
.set_retry(5)
.set_display_name("PhaseNet Picking")
)
phasenet_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
phasenet_op_.set_image_pull_policy("Always")
gamma_op_ = (
gamma_op(
i,
config.outputs["index_json"],
config.outputs["config_json"],
phasenet_op_.outputs["picks"],
stations.outputs["station_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.set_cpu_request("800m")
.set_retry(3)
.set_display_name("GaMMA Association")
)
gamma_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
merge_op_ = (
merge_op(
config.outputs["index_json"],
config.outputs["config_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.after(gamma_op_)
.set_memory_request("12G")
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-64g")
.set_display_name("Merge Catalog")
)
merge_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
convert_station_op_ = (
convert_station_op(
config_json=config.outputs["config_json"],
station_json=stations.outputs["station_json"],
bucket_name=f"catalogs", s3_url=s3_url, secure=secure)
.add_node_selector_constraint("cloud.google.com/gke-nodepool", "spot-2g")
.set_display_name("Convert Station Format")
)
split_hypodd_op_ = (
split_hypodd_op(
config.outputs["config_json"],
catalog_csv=merge_op_.outputs["catalog_csv"],
)
.after(merge_op_)
.set_display_name("Split Catalog")
)
split_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
split_hypodd_op_.set_image_pull_policy("Always")
with kfp.dsl.ParallelFor(split_hypodd_op_.outputs["output"]) as i:
convert_phase_op_ = convert_phase_op(
i,
config_json=config.outputs["config_json"],
picks_csv=merge_op_.outputs["picks_csv"],
catalog_csv=merge_op_.outputs["catalog_csv"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("Convert Phase Format")
convert_phase_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
convert_phase_op_.set_image_pull_policy("Always")
ph2dt_op_ = ph2dt_op(
i,
config_json=config.outputs["config_json"],
hypodd_phase=convert_phase_op_.outputs["hypodd_phase"],
station_dat=convert_station_op_.outputs["hypodd_station"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("PH2DT")
ph2dt_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
ph2dt_op_.set_image_pull_policy("Always")
hypodd_ct_op_ = hypodd_ct_op(
i,
config_json=config.outputs["config_json"],
ct=ph2dt_op_.outputs["ct"],
event=ph2dt_op_.outputs["hypodd_event"],
station=convert_station_op_.outputs["hypodd_station"],
bucket_name="catalogs",
s3_url=s3_url,
secure=secure,
).set_display_name("HypoDD")
hypodd_ct_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
hypodd_ct_op_.set_image_pull_policy("Always")
# cc_op_ = cc_op(
# ct=ph2dt_op_.outputs["ct"],
# picks=merge_op_.outputs["picks_csv"],
# catalog=merge_op_.outputs["catalog_csv"],
# ).set_display_name('Cross Correlation')
# cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
# cc_op_.set_image_pull_policy("Always")
# hypodd_cc_op_ = hypodd_cc_op(
# i,
# config_json=config.outputs["config_json"],
# ct=ph2dt_op_.outputs["ct"],
# cc=cc_op_.outputs["cc"],
# event=ph2dt_op_.outputs["hypodd_event"],
# station=convert_station_op_.outputs["hypodd_station"],
# bucket_name="catalogs",
# s3_url=s3_url,
# secure=secure,
# ).set_display_name('HypoDD + CC')
# hypodd_cc_op_.execution_options.caching_strategy.max_cache_staleness = "P30D"
# hypodd_cc_op_.set_image_pull_policy("Always")
merge_hypodd_op_ = (
merge_hypodd_op(
split_hypodd_op_.outputs["output"],
config_json=config.outputs["config_json"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
).after(hypodd_ct_op_)
# .after(hypodd_cc_op_)
.set_display_name("Merge Catalog")
)
merge_hypodd_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
merge_hypodd_op_.set_image_pull_policy("Always")
visulization_op_ = (
visulization_op(
config_json=config.outputs["config_json"],
hypodd_catalog_ct=merge_hypodd_op_.outputs["catalog_ct"],
hypodd_catalog_cc=merge_hypodd_op_.outputs["catalog_cc"],
gamma_catalog=merge_op_.outputs["catalog_csv"],
standard_catalog=events.outputs["standard_catalog"],
bucket_name=f"catalogs",
s3_url=s3_url,
secure=secure,
)
.after(merge_hypodd_op_)
.set_display_name("Visulization")
)
visulization_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
visulization_op_.set_image_pull_policy("Always")
# vop_.delete().after(merge_hypodd_op_)
In [ ]:
Copied!
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json"
experiment_name = "QuakeFlow"
pipeline_func = quakeflow_pipeline
run_name = f"{pipeline_func.__name__}_{region_name.lower()}"
arguments = {
"data_path": "/tmp",
"num_parallel": 0,
"bucket_catalog": "catalogs",
"s3_url": "minio-service:9000",
"secure": False,
}
if not run_local:
pipeline_conf = kfp.dsl.PipelineConf()
pipeline_conf.set_image_pull_policy("Always")
pipeline_conf.ttl_seconds_after_finished = 60 * 10
client = kfp.Client(host="670838cea8a18858-dot-us-west1.pipelines.googleusercontent.com")
# client = kfp.Client(host="http://localhost:8080")
kfp.compiler.Compiler().compile(pipeline_func, "{}.zip".format(experiment_name), pipeline_conf=pipeline_conf)
results = client.create_run_from_pipeline_func(
pipeline_func,
experiment_name=experiment_name,
run_name=run_name,
arguments=arguments,
)
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/weiqiang/.dotbot/cloud/quakeflow_zhuwq.json"
experiment_name = "QuakeFlow"
pipeline_func = quakeflow_pipeline
run_name = f"{pipeline_func.__name__}_{region_name.lower()}"
arguments = {
"data_path": "/tmp",
"num_parallel": 0,
"bucket_catalog": "catalogs",
"s3_url": "minio-service:9000",
"secure": False,
}
if not run_local:
pipeline_conf = kfp.dsl.PipelineConf()
pipeline_conf.set_image_pull_policy("Always")
pipeline_conf.ttl_seconds_after_finished = 60 * 10
client = kfp.Client(host="670838cea8a18858-dot-us-west1.pipelines.googleusercontent.com")
# client = kfp.Client(host="http://localhost:8080")
kfp.compiler.Compiler().compile(pipeline_func, "{}.zip".format(experiment_name), pipeline_conf=pipeline_conf)
results = client.create_run_from_pipeline_func(
pipeline_func,
experiment_name=experiment_name,
run_name=run_name,
arguments=arguments,
)
In [ ]:
Copied!