lyquor/
lyquor.rs

1use std::path::{Path, PathBuf};
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alloy_node_bindings::{Anvil, AnvilInstance};
7use barstrainer_client::SigningIdentity;
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio_util::sync::CancellationToken;
12
13use lyquor_api::{
14    anyhow::{self, Context as _Context},
15    config::{DB, NodeConfig},
16    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
17    profile::{LyquorProfile, NetworkType},
18    subkey_builder,
19};
20use lyquor_db::{MemDB, RocksDB};
21use lyquor_image_store::{DirectoryOciStore, DirectoryRepo, LyquidImageRepo, OciDistributionStore};
22use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
23use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
24use lyquor_lib::db::{DBFlusher, DBFlusherHandle};
25use lyquor_lib::dns_updater::publish_external_host_dns;
26use lyquor_lib::image_resolver::ImageResolver;
27use lyquor_lib::lyquid_log::LyquidLog;
28use lyquor_lib::side_effects::{SideEffectRuntime, build_side_effect_runtime};
29use lyquor_lib::{http, lyquid, utils};
30use lyquor_primitives::{Address, Bytes, LyteLog};
31use lyquor_seq::{SequenceBackend, eth, fco::FCO};
32
33subkey_builder!(RootSubKey(
34    ([0x01])-lyquid() => Key,
35    ([0x02])-fco() => Key,
36    ([0x03])-log() => Key,
37    ([0x04])-side_effect_store() => Key,
38));
39
40fn subkey() -> &'static RootSubKey {
41    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
42    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
43}
44
45const ANVIL_STATE_FILENAME: &str = "anvil-state.json";
46
47static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
48
49async fn setup_eth_backend(
50    profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
51    shutdown: CancellationToken,
52) -> anyhow::Result<eth::Backend> {
53    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
54        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
55
56    Ok(eth::Backend::new(
57        eth::Config::builder()
58            .finality_tag(finality_tag)
59            .registry(reg)
60            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
61            .build(),
62        jsonrpc_client,
63        shutdown,
64    ))
65}
66
67#[derive(Default)]
68struct Subsystems {
69    eth_backend: Option<lyquor_seq::eth::Backend>,
70    fco: Option<FCO>,
71    log: Option<LyquidLog>,
72    pool: Option<lyquid::LyquidPool>,
73    api: Option<http::HttpServer>,
74    db: Option<DBFlusherHandle>,
75    discovery: Option<lyquor_lib::discovery::LyquidDiscovery>,
76    client: Option<ClientHandle>,
77}
78debug_struct_name!(Subsystems);
79
80impl Subsystems {
81    #[tracing::instrument(level = "trace")]
82    async fn stop(&mut self, shutdown: CancellationToken) {
83        if let Some(api) = self.api.take() {
84            api.stop();
85        }
86        if let Some(eth_backend) = self.eth_backend.take() {
87            eth_backend.shutdown().await;
88        }
89        if let Some(discovery) = self.discovery.take() {
90            discovery.shutdown().await;
91        }
92        if let Some(pool) = self.pool.take() {
93            pool.shutdown().await;
94        }
95        if let Some(db) = self.db.take() {
96            db.stop().await;
97        }
98        shutdown.cancel();
99        if let Some(client) = self.client.take() {
100            client.wait_for_shutdown().await;
101        }
102        if let Some(fco) = self.fco.take() {
103            fco.shutdown().await;
104        }
105        if let Some(log) = self.log.take() {
106            log.shutdown().await;
107        }
108    }
109}
110
111struct Node {
112    sys: Subsystems,
113    config: Arc<NodeConfig>,
114    profile: Arc<LyquorProfile>,
115    seq_eth_url: String,
116    http_tls: Option<http::httptls::HttpTls>,
117    started_anvil: bool,
118    network: Option<Arc<lyquor_net::hub::Hub>>,
119    shutdown: CancellationToken,
120}
121
122lyquor_primitives::debug_struct_name!(Node);
123
124impl Node {
125    #[tracing::instrument(level = "trace", skip_all)]
126    async fn start(mut self) -> anyhow::Result<Self> {
127        self.sys = Self::init(
128            self.config.clone(),
129            self.profile.clone(),
130            self.seq_eth_url.clone(),
131            self.http_tls.clone(),
132            self.started_anvil,
133            self.network.take().expect("Node: Network not found."),
134            self.shutdown.clone(),
135        )
136        .await?;
137        Ok(self)
138    }
139
140    #[tracing::instrument(level = "trace", skip_all)]
141    async fn shutdown(&mut self) {
142        self.sys.stop(self.shutdown.clone()).await;
143    }
144
145    #[tracing::instrument(level = "trace", skip_all)]
146    async fn init(
147        config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String,
148        http_tls: Option<http::httptls::HttpTls>, started_anvil: bool, network: Arc<lyquor_net::hub::Hub>,
149        shutdown: CancellationToken,
150    ) -> anyhow::Result<Subsystems> {
151        let mut sys = Subsystems::default();
152
153        // Set up Ethereum sequence backend.
154        let jsonrpc_client = ClientConfig::builder()
155            .url(seq_eth_url.parse()?)
156            .build()
157            .into_client(shutdown.child_token());
158        sys.client = Some(jsonrpc_client.clone());
159        let sequence_backend_chain_id: u64 = jsonrpc_client
160            .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
161            .await
162            .context("Failed to query sequence backend chain ID.")?
163            .0
164            .to();
165        let sequence_backend_bartender_addr =
166            Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
167        let resolved_db_dir = config.resolved_db_dir();
168        let resolved_image_repo_dir = config.resolved_image_repo_dir();
169
170        //// 1. DB & Storage
171        let (store, lyq_store): (Arc<ShadowKVStore<Box<dyn KVStore>>>, Arc<dyn utils::LVMStoreFactory>) =
172            match &config.storage.db {
173                DB::MemDb => {
174                    tracing::warn!("using MemDB, all data will be lost upon exit");
175                    let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
176                    (store, Arc::new(utils::LVMMemStore))
177                }
178                DB::RocksDb { .. } => {
179                    let base_dir = resolved_db_dir.clone();
180                    let store = Arc::new(ShadowKVStore::new(
181                        Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
182                    ));
183                    let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
184                        store.clone(),
185                        subkey().lyquid(),
186                    ))));
187
188                    if started_anvil {
189                        let state_file = anvil_state_path(&base_dir, profile.id());
190                        if state_file.exists() {
191                            tracing::info!("Using Anvil state file {}.", state_file.display());
192                        } else {
193                            tracing::info!(
194                                "No existing Anvil state file found at {}, starting with fresh state.",
195                                state_file.display()
196                            );
197                        }
198                    }
199
200                    (store, lyq_store)
201                }
202            };
203        ////
204
205        //// 2. Fate-Constrainted Ordering Subsystem.
206        // Start discovering Lyquid and sequencing contracts.
207        let discovery =
208            lyquor_lib::discovery::LyquidDiscovery::new(*profile.bartender_id(), sequence_backend_bartender_addr);
209        let discovery_handle = discovery.handle();
210        sys.discovery = Some(discovery);
211
212        let eth_backend = setup_eth_backend(
213            &profile,
214            discovery_handle.contract_registry_service(),
215            jsonrpc_client.clone(),
216            shutdown.child_token(),
217        )
218        .await?;
219        sys.eth_backend = Some(eth_backend.clone());
220        let SideEffectRuntime {
221            store: side_effect_store,
222            provider: side_effect_provider,
223        } = build_side_effect_runtime(config.as_ref(), store.clone(), subkey().side_effect_store())?;
224        let archive_inbound_store = (!side_effect_provider.is_selective_hosting()).then(|| side_effect_store.clone());
225
226        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
227        let fco = FCO::new(
228            eth_backend.clone(),
229            Arc::new(fco_store),
230            Some(profile.init_chain_position()),
231            side_effect_provider,
232            archive_inbound_store,
233        )?;
234        let fco_handle = fco.handle();
235        sys.fco = Some(fco);
236        ////
237
238        //// 3. Lyquor Virtual Machine Subsystem.
239        // Configure VM log persistence (store) and runtime log hub (delivery).
240        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
241        let vm_log_store: Arc<dyn lyquor_state::SimpleStateStore> =
242            Arc::new(lyquor_state::DBSimpleStateStore::new(log_store));
243        let log = LyquidLog::new(vm_log_store.clone());
244        let log_hub = log.handle();
245        sys.log = Some(log);
246
247        let mut lvm = lyquid::LVM::new(
248            vm_log_store,
249            network.clone(),
250            Some(lyquor_vm::Sequencer {
251                inter: lyquor_api::interface::InterCallService::new(fco_handle.clone()),
252                submit: lyquor_api::interface::SubmitService::new(fco_handle.clone()),
253                fetch_oracle_info: lyquor_api::interface::OracleInfoService::new(fco_handle.clone()),
254            }),
255        );
256        ////
257
258        //// 4. UPC peers
259        for peer in &config.peers {
260            lvm.upc_mut()
261                .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
262                .await
263                .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
264        }
265        ////
266
267        //// 5. Pool of all Lyquids
268        let image_repo = Arc::new(
269            DirectoryRepo::new(&resolved_image_repo_dir)
270                .await
271                .context("Failed to initialize image store.")?,
272        );
273        let image_repo_for_resolver: Arc<dyn LyquidImageRepo> = image_repo.clone();
274        let image_repo_for_http: Arc<dyn OciDistributionStore> = Arc::new(DirectoryOciStore::new(image_repo.clone()));
275        let image_resolver = Arc::new(ImageResolver::new(
276            image_repo_for_resolver,
277            config.image.fallback_repos.clone(),
278        ));
279        let bartender_id = *profile.bartender_id();
280        let pool = lyquid::LyquidPool::new(
281            bartender_id,
282            lvm,
283            lyquid::LyquidPoolSetup {
284                fco: fco_handle.clone(),
285                store_factory: lyq_store,
286                image_resolver,
287                side_effect_store,
288                log_hub: log_hub.clone(),
289                sequence_backend_chain_id,
290                sequence_backend_bartender_addr,
291                config: lyquid::LyquidConfig::builder().build(),
292            },
293        )
294        .await?;
295        let pool_handle = pool.handle();
296        sys.pool = Some(pool);
297        ////
298
299        // Configure bartender.
300        let register_topic = LyteLog::tagged_value_topic("Register");
301        let register_count = log_hub.get_number_of_records(bartender_id, register_topic);
302
303        let node_service_context = http::NodeServiceContext::new(config.clone(), network.clone());
304        // Start the API server - use the same jsonrpc client as sequencer
305        let mut api = lyquor_lib::http::HttpServer::new(
306            &config.network.api_addr,
307            jsonrpc_client.clone(),
308            fco_handle.clone(),
309            pool_handle.clone(),
310            node_service_context,
311            image_repo_for_http,
312            shutdown.child_token(),
313            http_tls,
314        )
315        .await?;
316        api.start();
317        sys.api = Some(api);
318
319        // Wait for bartender to be registered.
320        if register_count == 0 {
321            tracing::info!("Waiting for bartender to be registered.");
322            log_hub
323                .wait_next(bartender_id, register_topic)
324                .await
325                .context("Failed to wait for bartender registration.")?;
326        }
327        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
328        let bartender_ins = pool_handle
329            .get_bartender()
330            .await
331            .context("Failed to obtain bartender.")?;
332        let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
333        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
334        bartender_ins
335            .wait_until(bartender_ln)
336            .await
337            .context("Failed to wait for the initial re-execution of bartender.")?;
338
339        // Finish up LyquidDiscovery.
340        discovery_handle.bind_log_stream(
341            log_hub
342                .subscribe(bartender_id, register_topic)
343                .await
344                .context("Failed to subscribe to log system for LyquidDiscovery.")?,
345        );
346        discovery_handle
347            .set_pool(pool_handle.clone())
348            .await
349            .context("Failed to set pool in LyquidDiscovery.")?;
350
351        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
352
353        // Start a DB writer that persist when node is idle.
354        let db = DBFlusher::new(0, store, fco_handle, Duration::from_secs(config.flush_interval_secs)).start();
355        sys.db = Some(db);
356
357        Ok(sys)
358    }
359}
360
361fn generate_unused_port() -> anyhow::Result<u16> {
362    let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
363    let port = listener.local_addr().context("Failed to get local address.")?.port();
364    Ok(port)
365}
366
367fn anvil_state_path(data_dir: &Path, profile_id: &str) -> PathBuf {
368    data_dir.join(profile_id).join(ANVIL_STATE_FILENAME)
369}
370
371fn start_devnet_anvil(state_file: Option<&Path>, state_interval_secs: u64) -> anyhow::Result<AnvilInstance> {
372    let anvil_port = generate_unused_port()?;
373    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
374    // terminate when dropped)
375    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
376    let mut sigset = SigSet::empty();
377    sigset.add(Signal::SIGINT);
378    let mut old_mask = SigSet::empty();
379    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
380        .context("Failed to mask SIGINT for anvil.")?;
381
382    let mut anvil_builder = Anvil::new().port(anvil_port).keep_stdout();
383    if let Some(state_file) = state_file {
384        if let Some(state_dir) = state_file.parent() {
385            std::fs::create_dir_all(state_dir)?;
386        }
387        anvil_builder = anvil_builder.arg("--state").arg(state_file.as_os_str());
388        if state_interval_secs > 0 {
389            anvil_builder = anvil_builder
390                .arg("--state-interval")
391                .arg(state_interval_secs.to_string());
392        }
393    }
394    let mut anvil = anvil_builder.try_spawn()?;
395
396    let stdout_reader = std::io::BufReader::new(
397        anvil
398            .child_mut()
399            .stdout
400            .take()
401            .context("Failed to read from anvil stdout.")?,
402    );
403
404    tokio::task::spawn_blocking(move || {
405        use std::io::BufRead;
406        let mut stdout_lines = stdout_reader.lines();
407        while let Some(line) = stdout_lines.next() {
408            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
409        }
410    });
411
412    tracing::info!("Anvil started at port {}.", anvil_port);
413    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
414        .context("Failed to restore old signal mask for anvil.")?;
415
416    Ok(anvil)
417}
418
419fn parse_override_value(raw: &str) -> Value {
420    if let Ok(v) = raw.parse::<bool>() {
421        return Value::from(v);
422    }
423    if let Ok(v) = raw.parse::<u64>() {
424        return Value::from(v);
425    }
426    if let Ok(v) = raw.parse::<i64>() {
427        return Value::from(v);
428    }
429    if let Ok(v) = raw.parse::<f64>() {
430        return Value::from(v);
431    }
432    Value::from(raw.to_string())
433}
434
435fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
436    let mut segments = key.split('.').peekable();
437    let mut current = overrides;
438
439    while let Some(segment) = segments.next() {
440        if segments.peek().is_none() {
441            current.insert(segment.to_string(), value);
442            return Ok(());
443        }
444
445        let entry = current
446            .entry(segment.to_string())
447            .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
448
449        match entry {
450            Value::Object(dict) => {
451                current = dict;
452            }
453            _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
454        }
455    }
456
457    Ok(())
458}
459
460fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
461    let mut overrides: Map<String, Value> = Map::new();
462
463    if let Some(entries) = matches.get_many::<String>("config-override") {
464        for entry in entries {
465            let (key, value) = entry
466                .split_once('=')
467                .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
468            insert_override_path(&mut overrides, key, parse_override_value(value))?;
469        }
470    }
471
472    Ok(overrides)
473}
474
475const NODE_RUNTIME_THREAD_NAME: &str = "lyquor-node";
476
477fn build_node_runtime() -> std::io::Result<tokio::runtime::Runtime> {
478    tokio::runtime::Builder::new_multi_thread()
479        .enable_all()
480        .thread_name(NODE_RUNTIME_THREAD_NAME)
481        .build()
482}
483
484fn main() -> anyhow::Result<()> {
485    // Install panic hook to cleanup anvil on panic
486    std::panic::set_hook(Box::new(|_| {
487        if let Some(&pid) = ANVIL_PID.get() {
488            tracing::warn!("Panic detected, killing anvil process {}", pid);
489            use nix::sys::signal::{Signal, kill};
490            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
491        }
492    }));
493
494    lyquor_cli::setup_tracing()?;
495
496    println!("{}", lyquor_cli::format_logo_banner());
497
498    let runtime = build_node_runtime().context("Failed to build node Tokio runtime")?;
499
500    runtime.block_on(run_node())
501}
502
503async fn run_node() -> anyhow::Result<()> {
504    let matches = clap::command!()
505        .version(lyquor_cli::build_version())
506        .propagate_version(true)
507        .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
508        .arg(
509            clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
510                .required(false)
511                .action(clap::ArgAction::Append)
512                .value_parser(clap::builder::NonEmptyStringValueParser::new()),
513        )
514        .get_matches();
515
516    let config_path = matches.get_one::<String>("config").map(PathBuf::from);
517    let overrides = build_config_overrides(&matches)?;
518    let config = NodeConfig::load(config_path, overrides)?;
519
520    let mut _anvil: Option<AnvilInstance> = None;
521    let seq_eth_url = match config.profile.base {
522        NetworkType::Devnet => {
523            match config.profile.sequencer.as_deref() {
524                None => {
525                    let state_file = match &config.storage.db {
526                        DB::RocksDb { .. } => Some(anvil_state_path(
527                            &config.resolved_db_dir(),
528                            config.profile.base.as_str(),
529                        )),
530                        DB::MemDb => None,
531                    };
532                    let anvil = start_devnet_anvil(state_file.as_deref(), config.flush_interval_secs)?;
533                    // Store anvil PID for panic hook
534                    ANVIL_PID.set(anvil.child().id()).ok();
535                    let ep = anvil.ws_endpoint();
536                    _anvil = Some(anvil);
537                    ep
538                }
539                Some(url) => url.to_string(),
540            }
541        }
542        _ => return Err(anyhow::anyhow!("Network is not supported.")),
543    };
544    let profile: Arc<LyquorProfile> = Arc::new(config.profile.resolve(seq_eth_url.clone())?);
545    let node_seed = config.node_key.seed_bytes()?;
546    let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed, profile.dns_suffix());
547
548    if let (Some(external_host), Some(barstrainer_addr)) = (
549        config.network.external_host.as_deref(),
550        config.network.barstrainer_addr.as_deref(),
551    ) {
552        match SigningIdentity::from_tls_config(&tls_config) {
553            Ok(barstrainer_identity) => {
554                let dns_suffix = profile.dns_suffix().to_string();
555                let external_host = external_host.to_string();
556                let barstrainer_addr = barstrainer_addr.to_string();
557                tokio::spawn(async move {
558                    if let Err(err) =
559                        publish_external_host_dns(&barstrainer_identity, &dns_suffix, &external_host, &barstrainer_addr)
560                            .await
561                    {
562                        tracing::warn!(
563                            error = ?err,
564                            external_host = %external_host,
565                            barstrainer_addr = %barstrainer_addr,
566                            "Failed to publish node DNS record"
567                        );
568                    }
569                });
570            }
571            Err(err) => {
572                tracing::warn!(
573                    error = ?err,
574                    "Skipping external DNS publication because signing identity could not be built"
575                );
576            }
577        }
578    } else {
579        if config.network.external_host.is_some() {
580            tracing::warn!("Skipping external DNS publication because network.barstrainer_addr is not configured");
581        }
582    }
583
584    let shutdown = CancellationToken::new();
585
586    // Grab TLS cert via ACME DNS-01 and barstrainer
587    // This still works without barstrainer_addr because user may supply local certs that is valid
588    let http_tls = if let Some(mut acme_config) = config.network.acme.clone() {
589        acme_config.storage_path = config
590            .resolved_acme_storage_path()
591            .expect("resolved ACME storage path should exist when ACME is configured");
592        let acme = lyquor_lib::acme::Acme::new_from_acme_config(
593            acme_config,
594            config.network.barstrainer_addr.clone().unwrap_or_default(),
595            profile.dns_suffix().to_owned(),
596            &tls_config,
597        );
598
599        let http_tls = if let Ok(acme) = acme {
600            let domain = barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), None);
601            let domains = vec![domain.clone()];
602
603            Ok(http::httptls::HttpTls::new(acme, domains).await)
604        } else {
605            Err(anyhow::anyhow!("Failed to construct ACME config"))
606        };
607
608        match http_tls {
609            Ok(t) => Some(t),
610            Err(err) => {
611                tracing::warn!("Failed to initialize HTTP TLS for api server: {:?}", err);
612                None
613            }
614        }
615    } else {
616        None
617    };
618
619    // Start network hub
620    let config = Arc::new(config);
621    let mut network = lyquor_net::hub::HubBuilder::new(tls_config, shutdown.child_token())
622        .listen_addr(config.network.upc_addr.clone())
623        .build()
624        .context("Failed to setup network.")?;
625    network.start().await.context("Failed to start network.")?;
626    let network = Arc::new(network);
627
628    // Signal handlers should be registered before starting any actors, so they'll be stopped
629    // gracefully.
630    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
631    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
632
633    let mut node = Node {
634        sys: Subsystems::default(),
635        config,
636        profile: profile.clone(),
637        seq_eth_url,
638        http_tls,
639        started_anvil: _anvil.is_some(),
640        network: Some(network.clone()),
641        shutdown,
642    }
643    .start()
644    .await?;
645
646    tokio::select! {
647        _ = sigint.recv() => {
648            tracing::info!("received SIGINT");
649        }
650        _ = sigterm.recv() => {
651            tracing::info!("received SIGTERM");
652        }
653    }
654
655    node.shutdown().await;
656    network.wait_for_shutdown().await;
657    Ok(())
658}